Modify interface (#684)
1. Add batch submit interface 2. Add Kafka Event callback to catch Kafka events
This commit is contained in:
@ -63,6 +63,14 @@ Status KafkaDataConsumer::init() {
|
||||
// TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb()
|
||||
RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0"));
|
||||
|
||||
KafkaEventCb event_cb;
|
||||
if (conf->set("event_cb", &event_cb, errstr) != RdKafka::Conf::CONF_OK) {
|
||||
std::stringstream ss;
|
||||
ss << "failed to set 'event_cb'";
|
||||
LOG(WARNING) << ss.str();
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
// create consumer
|
||||
_k_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
|
||||
if (!_k_consumer) {
|
||||
@ -110,7 +118,7 @@ Status KafkaDataConsumer::start() {
|
||||
|
||||
int64_t left_time = _ctx->kafka_info->max_interval_s;
|
||||
int64_t left_rows = _ctx->kafka_info->max_batch_rows;
|
||||
int64_t left_bytes = _ctx->kafka_info->max_batch_bytes;
|
||||
int64_t left_bytes = _ctx->kafka_info->max_batch_size;
|
||||
|
||||
LOG(INFO) << "start consumer"
|
||||
<< ". interval(s): " << left_time
|
||||
|
||||
Reference in New Issue
Block a user