kafka生产者详解

发布时间 2023-07-06 15:35:31作者: 佛系粥米

一、发送类型

  同步发送:使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功

//发送消息
  try {
    RecordMetadata recordMetadata = producer.send(record).get();   
    System.out.println(recordMetadata.offset());//获取偏移量
    }catch (Exception e){
        e.printStackTrace();
    }            

  异步发送:调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数

//发送消息
try {
    producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if(e!=null){
    e.printStackTrace();
     }
    System.out.println(recordMetadata.offset());
    }
});
}catch (Exception e){
     e.printStackTrace();
}