Kafka 生产者代码解读

发布时间 2023-08-16 15:16:20作者: eiSouthBoy

问题引入

尽管Kafka官方提供了生产者代码案例,我还是觉得有必要对代码进行一次解读,并加入个人的理解。

? 这里有一篇写的很好的博客,建议阅读c语言使用librdkafka库实现kafka的生产和消费实例(转)

发布流程

  • 第1步:创建kafka客户端配置信息对象并初始化
rd_kafka_conf_t *conf; /* Temporary configuration object */
conf = rd_kafka_conf_new();
  • 第2步:kafka客户端配置信息对象赋值
char errstr[512];      /* librdkafka API error reporting buffer */
const char *brokers = "192.168.1.100:9092:";   /* Argument: broker list */
const char *groupid = "pub_test";

/* 配置broker,broker可以是集群,例如:ip1:9092,ip2:9092,ip3:9092 */
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
	sizeof(errstr)) != RD_KAFKA_CONF_OK) 
{
	fprintf(stderr, "%s\n", errstr);
	return 1;
}
/* 配置groupid */
if (rd_kafka_conf_set(conf, "group.id", groupid, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) 
{
    fprintf(stderr, "%s\n", errstr);
    rd_kafka_conf_destroy(conf);
    return 1;
}
/* 配置其他属性,可以查看CONFIGURATION.md文档 */
...
  • 第3步:设置消息分发(成功 or 失败)报告回调函数
static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) 
{
	if (rkmessage->err)
    {
		fprintf(stderr, "%% Message delivery failed: %s\n", rd_kafka_err2str(rkmessage->err));
    }
	else
    {
		fprintf(stderr,"%% Message delivered (%zd bytes, partition %d)\n",
			rkmessage->len, rkmessage->partition);
    }
	/* The rkmessage is destroyed automatically by librdkafka */
}

/* dr_msg_cb()函数只能被rd_kafka_poll() and rd_kafka_flush()触发 */
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
  • 第4步:创建生产者实例和主题句柄
rd_kafka_t *rk;        /* Producer instance handle */

/* 函数的第一个参数有两个选择:RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER */
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) 
{
	fprintf(stderr, "%% Failed to create new producer: %s\n",
		errstr);
	return 1;
}

rd_kafka_topic_t *rkt;
rkt = rd_kafka_topic_new(rk, topic, NULL);
if (!rkt)
{  
    fprintf(stderr, "%% Failed to create topic object: %s\n",   
            rd_kafka_err2str(rd_kafka_last_error()));  
    rd_kafka_destroy(rk);  
    return 1;  
} 

  • 第5步:生产消息
while(run)
{
    rd_kafka_resp_err_t err;
    const char *topic = "data-time";     /* Argument: topic to produce to */
    char buf[512];         /* Message value temporary buffer */
    int len;

    lib_system_datetime_string_get(buf); // 自己编写的获取系统日期时间函数
    len = strlen(buf);

    retry:
    //err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
    //			RD_KAFKA_V_VALUE(buf, len), RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END);
    err = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buf, len, NULL, 0, NULL);
    if (err) 
    {
    	fprintf(stderr, "%% Failed to produce to topic %s: %s\n", rd_kafka_topic_name(rkt), rd_kafka_err2str(rd_kafka_last_error()));
    	if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) // 消息队列空间已满
        {
    			rd_kafka_poll(rk, 1000); // block for max 1000ms
    			goto retry; // 重新再发送消息
    	}
    }
    else 
    {
    	fprintf(stderr,"%% Enqueued message (%zd bytes) for topic %s\n", len, rd_kafka_topic_name(rkt));
    }
    rd_kafka_poll(rk, 0); // non-blocking
}
  • 第6步:刷新队列并销毁内存
/* 刷新队列 */
rd_kafka_flush(rk, 10 * 1000 /* wait for max 10 seconds */);
/* 检查生产者后台队列是否还有消息未发送完成 */
if (rd_kafka_outq_len(rk) > 0)
{
	fprintf(stderr, "%% %d message(s) were not delivered\n", rd_kafka_outq_len(rk));
}
/* 销毁内存 */
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);

反思总结

kafka producer.c代码中,对于很多结构体的定义都是不可见的,结构体中的成员都是被封装的,仅给出少量的结构体和枚举的定义。

在kafka server默认配置下,允许producer在生产消息时,自动创建topic 和 partition。(经过测试)

参考引用

Kafka Examples Producer.c