[Bug] Fix bug that routine load blocked with TOO_MANY_TASKS error (#4861)
When receiving empty msg from kafka, the load process will quit abnormally. Fix #4860
This commit is contained in:
@ -193,7 +193,11 @@ Status KafkaDataConsumer::group_consume(
|
||||
consumer_watch.stop();
|
||||
switch (msg->err()) {
|
||||
case RdKafka::ERR_NO_ERROR:
|
||||
if (!queue->blocking_put(msg.get())) {
|
||||
if (msg->len() == 0) {
|
||||
// ignore msg with length 0.
|
||||
// put empty msg into queue will cause the load process shutting down.
|
||||
break;
|
||||
} else if (!queue->blocking_put(msg.get())) {
|
||||
// queue is shutdown
|
||||
done = true;
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user