需求
跨部門協作,通過Kafka進行溝通,我們是生產者,對方是消費者。因為對方部門後台是使用golang,所以默認是使用Protobuf來解析數據。
解決方法
給Kafka創建一個Protobuf的序列化類,這樣每次發送數據時就會序列化成Protobuf格式的。
根據.proto文件生成對應的Java實體類。
protoc --java_out=./ x.proto
java_out後面是生成的路徑。
x.proto是原型文件名。 protoc的安裝方法不做詳述了,可以網上搜一下。
- 創建Kafka序列化類
假設生成的實體類叫Xproto.java
public class ProtoSerializer implements Serializer<Xproto> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, Xproto data) {
if (data == null) {
return null;
}
return data.toByteArray();
}
@Override
public void close() {
}
}
- 修改Kafka生產者的配置
@Configuration
@EnableKafka
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaAutoConfig {
@Autowired private KafkaProperties kafkaProperties;
@Bean(name = “kafkaProducer”)
public KafkaTemplate < String, String > kafkaTemplate() {
Map < String, Object > configProps = new HashMap < > ();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtoSerializer.class);
configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
return new KafkaTemplate < > (new DefaultKafkaProducerFactory < > (configProps));
}