博客 / 詳情

返回

Java客户端向kafka發送protobuf序列化的數據

需求

跨部門協作,通過Kafka進行溝通,我們是生產者,對方是消費者。因為對方部門後台是使用golang,所以默認是使用Protobuf來解析數據。

解決方法

給Kafka創建一個Protobuf的序列化類,這樣每次發送數據時就會序列化成Protobuf格式的。

根據.proto文件生成對應的Java實體類。
protoc --java_out=./ x.proto
java_out後面是生成的路徑。
x.proto是原型文件名。 protoc的安裝方法不做詳述了,可以網上搜一下。

  1. 創建Kafka序列化類
    假設生成的實體類叫Xproto.java
public class ProtoSerializer implements Serializer<Xproto> {  
  
    @Override  
  public void configure(Map<String, ?> configs, boolean isKey) {  
  
    }  
  
    @Override  
  public byte[] serialize(String topic, Xproto data) {  
        if (data == null) {    
            return null;  
        }  
        return data.toByteArray();  
  }  
  
    @Override  
  public void close() {  
  
    }  
}
  1. 修改Kafka生產者的配置
@Configuration
@EnableKafka
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaAutoConfig {

@Autowired private KafkaProperties kafkaProperties;  
@Bean(name = “kafkaProducer”) 
public KafkaTemplate < String, String > kafkaTemplate() {
        Map < String, Object > configProps = new HashMap < > ();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtoSerializer.class);
        configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
        return new KafkaTemplate < > (new DefaultKafkaProducerFactory < > (configProps));
    }
user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.