- 配置消费者客户端;
- 订阅主题和分区;
- 拉取消息;
- 处理消息;
- 提交消费位移;
配置消费者客户端
int CKafkaConsumer::Create()
{
std::string errorStr;
RdKafka::Conf::ConfResult errorCode;
do
{
// 1、创建配置对象
// 1.1、构造 consumer conf 对象
m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(nullptr == m_config)
{
printf("Create RdKafka Conf failed.\n");
break;
}
// 必要参数1:指定 broker 地址列表
errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(bootstrap.servers) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 必要参数2:设置消费者组 id
errorCode = m_config->set("group.id", m_groupID, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(group.id) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 设置事件回调
m_event_cb = new ConsumerEventCb;
errorCode = m_config->set("event_cb", m_event_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(event_cb) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 设置消费者组再平衡回调
m_rebalance_cb = new ConsumerRebalanceCb;
errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(rebalance_cb) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件
errorCode = m_config->set("enable.partition.eof", "false", errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(enable.partition.eof) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 每次最大拉取的数据大小
errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(max.partition.fetch.bytes) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 设置分区分配策略:range、roundrobin、cooperative-sticky
errorCode = m_config->set("partition.assignment.strategy", "range", errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(partition.assignment.strategy) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 心跳探活超时时间---1s
errorCode = m_config->set("session.timeout.ms", "6000", errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(session.timeout.ms) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 心跳保活间隔
errorCode = m_config->set("heartbeat.interval.ms", "2000", errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(heartbeat.interval.ms) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 1.2、创建 topic conf 对象
m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if (nullptr == m_topicConfig)
{
printf("Create RdKafka Topic Conf failed.\n");
break;
}
// 必要参数3:设置新到来消费者的消费起始位置,latest 消费最新的数据,earliest 从头开始消费
errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Topic Conf set(auto.offset.reset) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 默认 topic 配置,用于自动订阅 topics
errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(default_topic_conf) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 2、创建 Consumer 对象
m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
if (nullptr == m_consumer)
{
printf("Create KafkaConsumer failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
printf("Created consumer success, consumerName:%s.\n",
m_consumer->name().c_str());
return 0;
} while (0);
Destroy();
return -1;
}
订阅主题和分区
std::vector<std::string> topicsVec;
topicsVec.push_back("zd_test_topic_one");
topicsVec.push_back("zd_test_topic_two");
RdKafka::ErrorCode errorCode = m_consumer->subscribe(topicsVec);
if (RdKafka::ERR_NO_ERROR != errorCode)
{
printf("Subscribe failed, errorStr:%s\n", RdKafka::err2str(errorCode).c_str());
return;
}
拉取消息
// 可放到线程中处理
while (m_running)
{
RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
if(NULL != msg)
{
// 消费消息
ConsumeMsg_(msg, NULL);
m_consumer->commitAsync();
delete msg;
}
}
处理消息
void KafkaConsumer::ConsumeMsg_(RdKafka::Message *msg, void *opaque)
{
switch (msg->err())
{
case RdKafka::ERR__TIMED_OUT: // 超时
break;
case RdKafka::ERR_NO_ERROR: // 有消息进来
printf("Message in, topic:%s, partition:[%d], key:%s, payload:%s\n",
msg->topic_name().c_str(),
msg->partition(),
msg->key()->c_str(),
(char *)msg->payload());
// 消息处理
break;
default:
break;
}
}
提交消费位移
m_consumer->commitAsync();