From 2c208e932bd9a69a76b75fea2bd3fe977dc91e98 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 3 Aug 2021 11:59:06 +0800 Subject: [PATCH] [Bug][RoutineLoad] Avoid TOO_MANY_TASKS error (#6342) Use `commitAsync` to commit offset to kafka, instead of using `commitSync`, which may block for a long time. Also assign a group.id to routine load if user not specified "property.group.id" property, so that all consumer of this job will use same group.id instead of a random id for each consume task. --- be/src/runtime/routine_load/data_consumer.cpp | 22 ++++++--- .../routine_load_task_executor.cpp | 5 +- .../load/routineload/KafkaRoutineLoadJob.java | 5 ++ .../load/routineload/RoutineLoadManager.java | 47 ++++++++++++++----- .../routineload/RoutineLoadTaskScheduler.java | 35 ++++---------- .../routineload/RoutineLoadManagerTest.java | 4 +- 6 files changed, 72 insertions(+), 46 deletions(-) diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index c5c54e4d7c..34cba0254e 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -33,6 +33,7 @@ namespace doris { +static const std::string PROP_GROUP_ID = "group.id"; // init kafka consumer will only set common configs such as // brokers, groupid Status KafkaDataConsumer::init(StreamLoadContext* ctx) { @@ -47,10 +48,6 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) { // conf has to be deleted finally Defer delete_conf{[conf]() { delete conf; }}; - std::stringstream ss; - ss << BackendOptions::get_localhost() << "_"; - std::string group_id = ss.str() + UniqueId::gen_uid().to_string(); - LOG(INFO) << "init kafka consumer with group id: " << group_id; std::string errstr; auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) { @@ -74,7 +71,6 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) { }; RETURN_IF_ERROR(set_conf("metadata.broker.list", ctx->kafka_info->brokers)); - RETURN_IF_ERROR(set_conf("group.id", group_id)); RETURN_IF_ERROR(set_conf("enable.partition.eof", "false")); RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false")); // TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb() @@ -108,6 +104,18 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) { _custom_properties.emplace(item.first, item.second); } + // if not specified group id, generate a random one. + // ATTN: In the new version, we have set a group.id on the FE side for jobs that have not set a groupid, + // but in order to ensure compatibility, we still do a check here. + if (_custom_properties.find(PROP_GROUP_ID) == _custom_properties.end()) { + std::stringstream ss; + ss << BackendOptions::get_localhost() << "_"; + std::string group_id = ss.str() + UniqueId::gen_uid().to_string(); + RETURN_IF_ERROR(set_conf(PROP_GROUP_ID, group_id)); + _custom_properties.emplace(PROP_GROUP_ID, group_id); + } + LOG(INFO) << "init kafka consumer with group id: " << _custom_properties[PROP_GROUP_ID]; + if (conf->set("event_cb", &_k_event_cb, errstr) != RdKafka::Conf::CONF_OK) { std::stringstream ss; ss << "PAUSE: failed to set 'event_cb'"; @@ -354,7 +362,9 @@ Status KafkaDataConsumer::reset() { } Status KafkaDataConsumer::commit(std::vector& offset) { - RdKafka::ErrorCode err = _k_consumer->commitSync(offset); + // Use async commit so that it will not block for a long time. + // Commit failure has no effect on Doris, subsequent tasks will continue to commit the new offset + RdKafka::ErrorCode err = _k_consumer->commitAsync(offset); if (err != RdKafka::ERR_NO_ERROR) { std::stringstream ss; ss << "failed to commit kafka offset : " << RdKafka::err2str(err); diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index f8f9c71bbe..a574309122 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -325,9 +325,10 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool std::for_each(topic_partitions.begin(), topic_partitions.end(), [](RdKafka::TopicPartition* tp1) { delete tp1; }); }}; - } break; + break; + } default: - return; + break; } cb(ctx); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index ead99ea9b0..a3570556e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -76,6 +76,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { private static final Logger LOG = LogManager.getLogger(KafkaRoutineLoadJob.class); public static final String KAFKA_FILE_CATALOG = "kafka"; + public static final String PROP_GROUP_ID = "group.id"; private String brokerList; private String topic; @@ -446,6 +447,10 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { if (!stmt.getCustomKafkaProperties().isEmpty()) { setCustomKafkaProperties(stmt.getCustomKafkaProperties()); } + // set group id if not specified + if (!this.customProperties.containsKey(PROP_GROUP_ID)) { + this.customProperties.put(PROP_GROUP_ID, name + "_" + UUID.randomUUID().toString()); + } } // this is a unprotected method which is called in the initialization function diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 05ebeb3519..045fbd1128 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -307,6 +307,7 @@ public class RoutineLoadManager implements Writable { // get the BE id with minimum running task on it // return -1 if no BE is available. // throw exception if unrecoverable errors happen. + // ATTN: this is only used for unit test now. public long getMinTaskBeId(String clusterName) throws LoadException { List beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName, true); if (beIdsInCluster == null) { @@ -344,30 +345,54 @@ public class RoutineLoadManager implements Writable { // check if the specified BE is available for running task // return true if it is available. return false if otherwise. // throw exception if unrecoverable errors happen. - public boolean checkBeToTask(long beId, String clusterName) throws LoadException { + public long getAvailableBeForTask(long previoudBeId, String clusterName) throws LoadException { List beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName, true); if (beIdsInCluster == null) { throw new LoadException("The " + clusterName + " has been deleted"); } - if (!beIdsInCluster.contains(beId)) { - return false; + if (previoudBeId != -1L && !beIdsInCluster.contains(previoudBeId)) { + return -1L; } // check if be has idle slot readLock(); try { - int idleTaskNum = 0; Map beIdToConcurrentTasks = getBeCurrentTasksNumMap(); - if (beIdToConcurrentTasks.containsKey(beId)) { - idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId); - } else { - idleTaskNum = Config.max_routine_load_task_num_per_be; + // 1. Find if the given BE id has available slots + if (previoudBeId != -1L) { + int idleTaskNum = 0; + if (beIdToConcurrentTasks.containsKey(previoudBeId)) { + idleTaskNum = beIdToMaxConcurrentTasks.get(previoudBeId) - beIdToConcurrentTasks.get(previoudBeId); + } else { + idleTaskNum = Config.max_routine_load_task_num_per_be; + } + if (idleTaskNum > 0) { + return previoudBeId; + } } - if (idleTaskNum > 0) { - return true; + + // 2. The given BE id does not have available slots, find a BE with min tasks + updateBeIdToMaxConcurrentTasks(); + int idleTaskNum = 0; + long resultBeId = -1L; + int maxIdleSlotNum = 0; + for (Long beId : beIdsInCluster) { + if (beIdToMaxConcurrentTasks.containsKey(beId)) { + if (beIdToConcurrentTasks.containsKey(beId)) { + idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId); + } else { + idleTaskNum = Config.max_routine_load_task_num_per_be; + } + if (LOG.isDebugEnabled()) { + LOG.debug("be {} has idle {}, concurrent task {}, max concurrent task {}", beId, idleTaskNum, + beIdToConcurrentTasks.get(beId), beIdToMaxConcurrentTasks.get(beId)); + } + resultBeId = maxIdleSlotNum < idleTaskNum ? beId : resultBeId; + maxIdleSlotNum = Math.max(maxIdleSlotNum, idleTaskNum); + } } - return false; + return resultBeId; } finally { readUnlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index e353c7978f..44cd71cd39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -36,13 +36,13 @@ import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Queues; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.List; import java.util.concurrent.LinkedBlockingQueue; @@ -278,34 +278,19 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { // return true if allocate successfully. return false if failed. // throw exception if unrecoverable errors happen. private boolean allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo) throws LoadException { - if (routineLoadTaskInfo.getPreviousBeId() != -1L) { - if (routineLoadManager.checkBeToTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName())) { - if (LOG.isDebugEnabled()) { - LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId()) - .add("job_id", routineLoadTaskInfo.getJobId()) - .add("previous_be_id", routineLoadTaskInfo.getPreviousBeId()) - .add("msg", "task use the previous be id") - .build()); - } - routineLoadTaskInfo.setBeId(routineLoadTaskInfo.getPreviousBeId()); - return true; - } - } - - // the previous BE is not available, try to find a better one - long beId = routineLoadManager.getMinTaskBeId(routineLoadTaskInfo.getClusterName()); - if (beId < 0) { + long beId = routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName()); + if (beId == -1L) { return false; } - routineLoadTaskInfo.setBeId(beId); if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId()) - .add("job_id", routineLoadTaskInfo.getJobId()) - .add("be_id", routineLoadTaskInfo.getBeId()) - .add("msg", "task has been allocated to be") - .build()); + .add("job_id", routineLoadTaskInfo.getJobId()) + .add("previous_be_id", routineLoadTaskInfo.getPreviousBeId()) + .add("assigned_be_id", beId) + .build()); } + routineLoadTaskInfo.setBeId(beId); return true; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index ea460e40eb..fb8fe9bd44 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -17,12 +17,12 @@ package org.apache.doris.load.routineload; -import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.ParseNode; import org.apache.doris.analysis.PauseRoutineLoadStmt; import org.apache.doris.analysis.ResumeRoutineLoadStmt; +import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.StopRoutineLoadStmt; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Catalog; @@ -739,7 +739,7 @@ public class RoutineLoadManagerTest { RoutineLoadManager routineLoadManager = new RoutineLoadManager(); Config.max_routine_load_task_num_per_be = 10; Deencapsulation.setField(routineLoadManager, "beIdToMaxConcurrentTasks", beIdToMaxConcurrentTasks); - Assert.assertEquals(true, routineLoadManager.checkBeToTask(1L, "default")); + Assert.assertEquals(1L, routineLoadManager.getAvailableBeForTask(1L, "default")); } @Test