一、背景
在实际工程中,难免会遇到不通系统之间通信,如何进行系统之间通信呢?(作为一个“全栈工程师”,必须要解决它!)。
系统之间通信方式很多如:系统之间调用(http/rpc等),异步间接调用如发送消息、公共存储等。目前,本人从事的项目中遇到web业务工程(Java)依赖与算法工程(C++) 处理的视频/图片分类与标记结果。两个系统之前数据通信采用了kafka消息方式。
算法工程为C/C++工程,本文将介绍如何在C/C++中如何发送与接收Kakfa消息(包含:Kafka的SASL认证方式),并提供了详细的源码和讲解。(至于Java中如何发送与接收Kakfa消息如有需要,可留言或私聊!)
二、环境依赖安装
# 下载librdkafka git clone https://github.com/edenhill/librdkafka.git # 编译 cd librdkafka ./configure --prefix=/usr/local # 安装 sudo make install # 验证:查看/usr/local/lib目录下是否有librdkafka文件 ls /usr/local/lib | grep kafka
三、编写kakfa生产者消费者
3.1 生产者
#include <rdkafka.h> // 包含C API头文件 #include <iostream> #include <cstring> #include <cerrno> int main() { const char *brokers = "xx.xx.xx.xx:7091"; // Kafka broker地址 const char *topic_name = "kafka_msg_topic_test"; const char *payload = "Hello, Kafka from librdkafka!"; size_t len = strlen(payload); // 创建配置对象 rd_kafka_conf_t *conf = rd_kafka_conf_new(); if (!conf) { std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; return 1; } // 设置broker地址 if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, NULL, 0) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 创建生产者实例 rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0); if (!rk) { std::cerr << "Failed to create producer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 创建topic句柄(可选,但推荐) rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name, NULL); if (!rkt) { std::cerr << "Failed to create topic handle: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_destroy(rk); // rd_kafka_conf_destroy(conf); return 1; } // 发送消息 int32_t partition = RD_KAFKA_PARTITION_UA; // 自动选择分区 int err = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, const_cast<char *>(payload), len, NULL, 0, NULL); if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { std::cerr << "Failed to produce to topic " << topic_name << ": " << err << std::endl; } else { std::cout << "Produced " << len << " bytes to topic " << topic_name << std::endl; } // 等待所有消息发送完成(可选,但推荐) // 在实际生产代码中,您可能需要更复杂的逻辑来处理消息的发送和确认 int msgs_sent = 0; while (rd_kafka_outq_len(rk) > 0) { rd_kafka_poll(rk, 100); // 轮询Kafka队列,直到所有消息都发送出去 msgs_sent += rd_kafka_outq_len(rk); } // 销毁topic句柄 rd_kafka_topic_destroy(rkt); // 销毁生产者实例 rd_kafka_destroy(rk); // 销毁配置对象 // rd_kafka_conf_destroy(conf); return 0; }
3.2 消费者
#include <rdkafka.h> #include <iostream> #include <cerrno> #include <cstring> #include <cstdlib> void error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) { // 错误处理回调 std::cerr << "Kafka error: " << err << ": " << reason << std::endl; } int main() { std::cerr << "start " << std::endl; const char *brokers = "xx.xx.xx.xx:7091"; // Kafka broker地址 const char *group_id = "kafka_msg_topic_test"; // 消费者组ID const char *topic_name = "kafka_msg_topic_test"; // Kafka topic名称 // 创建配置对象 rd_kafka_conf_t *conf = rd_kafka_conf_new(); if (!conf) { std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; return 1; } // 设置broker地址 if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, NULL, 0) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 设置消费者组ID if (rd_kafka_conf_set(conf, "group.id", group_id, NULL, 0) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set group.id: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 设置错误处理回调(可选) rd_kafka_conf_set_error_cb(conf, error_cb); // 创建消费者实例 rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0); if (!rk) { std::cerr << "Failed to create consumer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; return 1; } // 创建一个topic分区列表 rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1); if (!topics) { std::cerr << "Failed to create topic partition list: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_destroy(rk); return 1; } // 添加topic到分区列表 if (!rd_kafka_topic_partition_list_add(topics, topic_name, RD_KAFKA_PARTITION_UA)) { std::cerr << "Failed to add topic to partition list: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_topic_partition_list_destroy(topics); rd_kafka_destroy(rk); return 1; } // 订阅topic rd_kafka_resp_err_t err = rd_kafka_subscribe(rk, topics); if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { std::cerr << "Failed to subscribe to topic: " << rd_kafka_err2str(err) << std::endl; rd_kafka_topic_partition_list_destroy(topics); rd_kafka_destroy(rk); return 1; } // 销毁分区列表(订阅后不再需要) rd_kafka_topic_partition_list_destroy(topics); // 轮询消息 while (true) { rd_kafka_message_t *rkmessage; rkmessage = rd_kafka_consumer_poll(rk, 1000); // 等待1秒以获取消息 if (rkmessage == NULL) { // 没有消息或者超时 continue; } if (rkmessage->err) { // 处理错误 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { // 消息流的末尾 std::cout << "End of partition event" << std::endl; } else { // 打印错误并退出 std::cerr << "Kafka consumer error: " << rd_kafka_message_errstr(rkmessage) << std::endl; break; } } else { // 处理消息 std::cout << "Received message at offset " << rkmessage->offset << " from partition " << rkmessage->partition << " with key \"" << rkmessage->key << "\" and payload size "<< rkmessage->len << " value :" <<(char *)rkmessage->payload << std::endl; // 如果需要,可以在这里处理消息内容 // 例如,使用rkmessage->payload()获取消息内容 // 释放消息 rd_kafka_message_destroy(rkmessage); } } // 清理 rd_kafka_destroy(rk); return 0; }
3.3 编译运行
3.3.1 编译生产者消费者
g++ -o send_kafka SendKakfaMessage.cpp -I/usr/local/include/librdkafka -lrdkafka++ -lrdkafka -lpthread
g++ -o receive_kafka ReceiveKafkaMessage.cpp -I/usr/local/include/librdkafka -lrdkafka++ -lrdkafka -lpthread
3.3.2 运行验证
执行时,若出现错误: error while loading shared libraries: librdkafka++.so.1: cannot open shared object file: No such file or directory
则需要执行下面环境变量配置:
export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH
生产者:发送消息
消费者:接收消息
3.4 SASL认证kakfa
下面是,支持sasl认证的kakka生产者完整代码
#include <rdkafka.h> #include <iostream> #include <cstring> #include <cerrno> int main(int argc, char *argv[]) { const char *brokers = "xx.xx.xx.xx:8092"; // Kafka broker地址 const char *username = "xxx"; const char *password = "xxx"; const char *topic_name = "kafka_msg_test_sasl"; const char *payload = "Hello, Kafka from librdkafka! sasl"; size_t len = strlen(payload); // 初始化配置 rd_kafka_conf_t *conf = rd_kafka_conf_new(); if (!conf) { std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; return 1; } char errstr[512]; // 声明一个足够大的字符数组来存储错误信息 // 设置SASL相关的配置 if (rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set security.protocol: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set sasl.mechanisms: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } if (rd_kafka_conf_set(conf, "sasl.username", username, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set sasl.username: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } if (rd_kafka_conf_set(conf, "sasl.password", password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set sasl.password: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 检查配置是否设置成功 if (rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set configuration: " << errstr << std::endl; return 1; } // 创建producer实例 rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) { std::cerr << "Failed to create new producer: " << errstr << std::endl; return 1; } // 创建topic句柄(可选,但推荐) rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name, NULL); if (!rkt) { std::cerr << "Failed to create topic handle: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_destroy(rk); // rd_kafka_conf_destroy(conf); return 1; } // 发送消息 int32_t partition = RD_KAFKA_PARTITION_UA; // 自动选择分区 int err = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, const_cast<char *>(payload), len, NULL, 0, NULL); if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { std::cerr << "Failed to produce to topic " << topic_name << ": " << err << std::endl; } else { std::cout << "Produced " << len << " bytes to topic " << topic_name << std::endl; } // 等待所有消息发送完成(可选,但推荐) // 在实际生产代码中,您可能需要更复杂的逻辑来处理消息的发送和确认 int msgs_sent = 0; while (rd_kafka_outq_len(rk) > 0) { rd_kafka_poll(rk, 100); // 轮询Kafka队列,直到所有消息都发送出去 msgs_sent += rd_kafka_outq_len(rk); } // 销毁topic句柄 rd_kafka_topic_destroy(rkt); // 清理资源 rd_kafka_destroy(rk); return 0; }
在kafka map 管理界面中查看发送效果如下: