[Bug][RoutineLoad] Avoid TOO_MANY_TASKS error (#6342)
Use `commitAsync` to commit offset to kafka, instead of using `commitSync`, which may block for a long time. Also assign a group.id to routine load if user not specified "property.group.id" property, so that all consumer of this job will use same group.id instead of a random id for each consume task.
This commit is contained in:
@ -33,6 +33,7 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
static const std::string PROP_GROUP_ID = "group.id";
|
||||
// init kafka consumer will only set common configs such as
|
||||
// brokers, groupid
|
||||
Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
|
||||
@ -47,10 +48,6 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
|
||||
// conf has to be deleted finally
|
||||
Defer delete_conf{[conf]() { delete conf; }};
|
||||
|
||||
std::stringstream ss;
|
||||
ss << BackendOptions::get_localhost() << "_";
|
||||
std::string group_id = ss.str() + UniqueId::gen_uid().to_string();
|
||||
LOG(INFO) << "init kafka consumer with group id: " << group_id;
|
||||
|
||||
std::string errstr;
|
||||
auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) {
|
||||
@ -74,7 +71,6 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
|
||||
};
|
||||
|
||||
RETURN_IF_ERROR(set_conf("metadata.broker.list", ctx->kafka_info->brokers));
|
||||
RETURN_IF_ERROR(set_conf("group.id", group_id));
|
||||
RETURN_IF_ERROR(set_conf("enable.partition.eof", "false"));
|
||||
RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false"));
|
||||
// TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb()
|
||||
@ -108,6 +104,18 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
|
||||
_custom_properties.emplace(item.first, item.second);
|
||||
}
|
||||
|
||||
// if not specified group id, generate a random one.
|
||||
// ATTN: In the new version, we have set a group.id on the FE side for jobs that have not set a groupid,
|
||||
// but in order to ensure compatibility, we still do a check here.
|
||||
if (_custom_properties.find(PROP_GROUP_ID) == _custom_properties.end()) {
|
||||
std::stringstream ss;
|
||||
ss << BackendOptions::get_localhost() << "_";
|
||||
std::string group_id = ss.str() + UniqueId::gen_uid().to_string();
|
||||
RETURN_IF_ERROR(set_conf(PROP_GROUP_ID, group_id));
|
||||
_custom_properties.emplace(PROP_GROUP_ID, group_id);
|
||||
}
|
||||
LOG(INFO) << "init kafka consumer with group id: " << _custom_properties[PROP_GROUP_ID];
|
||||
|
||||
if (conf->set("event_cb", &_k_event_cb, errstr) != RdKafka::Conf::CONF_OK) {
|
||||
std::stringstream ss;
|
||||
ss << "PAUSE: failed to set 'event_cb'";
|
||||
@ -354,7 +362,9 @@ Status KafkaDataConsumer::reset() {
|
||||
}
|
||||
|
||||
Status KafkaDataConsumer::commit(std::vector<RdKafka::TopicPartition*>& offset) {
|
||||
RdKafka::ErrorCode err = _k_consumer->commitSync(offset);
|
||||
// Use async commit so that it will not block for a long time.
|
||||
// Commit failure has no effect on Doris, subsequent tasks will continue to commit the new offset
|
||||
RdKafka::ErrorCode err = _k_consumer->commitAsync(offset);
|
||||
if (err != RdKafka::ERR_NO_ERROR) {
|
||||
std::stringstream ss;
|
||||
ss << "failed to commit kafka offset : " << RdKafka::err2str(err);
|
||||
|
||||
@ -325,9 +325,10 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool
|
||||
std::for_each(topic_partitions.begin(), topic_partitions.end(),
|
||||
[](RdKafka::TopicPartition* tp1) { delete tp1; });
|
||||
}};
|
||||
} break;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
return;
|
||||
break;
|
||||
}
|
||||
cb(ctx);
|
||||
}
|
||||
|
||||
@ -76,6 +76,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
|
||||
private static final Logger LOG = LogManager.getLogger(KafkaRoutineLoadJob.class);
|
||||
|
||||
public static final String KAFKA_FILE_CATALOG = "kafka";
|
||||
public static final String PROP_GROUP_ID = "group.id";
|
||||
|
||||
private String brokerList;
|
||||
private String topic;
|
||||
@ -446,6 +447,10 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
|
||||
if (!stmt.getCustomKafkaProperties().isEmpty()) {
|
||||
setCustomKafkaProperties(stmt.getCustomKafkaProperties());
|
||||
}
|
||||
// set group id if not specified
|
||||
if (!this.customProperties.containsKey(PROP_GROUP_ID)) {
|
||||
this.customProperties.put(PROP_GROUP_ID, name + "_" + UUID.randomUUID().toString());
|
||||
}
|
||||
}
|
||||
|
||||
// this is a unprotected method which is called in the initialization function
|
||||
|
||||
@ -307,6 +307,7 @@ public class RoutineLoadManager implements Writable {
|
||||
// get the BE id with minimum running task on it
|
||||
// return -1 if no BE is available.
|
||||
// throw exception if unrecoverable errors happen.
|
||||
// ATTN: this is only used for unit test now.
|
||||
public long getMinTaskBeId(String clusterName) throws LoadException {
|
||||
List<Long> beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName, true);
|
||||
if (beIdsInCluster == null) {
|
||||
@ -344,30 +345,54 @@ public class RoutineLoadManager implements Writable {
|
||||
// check if the specified BE is available for running task
|
||||
// return true if it is available. return false if otherwise.
|
||||
// throw exception if unrecoverable errors happen.
|
||||
public boolean checkBeToTask(long beId, String clusterName) throws LoadException {
|
||||
public long getAvailableBeForTask(long previoudBeId, String clusterName) throws LoadException {
|
||||
List<Long> beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName, true);
|
||||
if (beIdsInCluster == null) {
|
||||
throw new LoadException("The " + clusterName + " has been deleted");
|
||||
}
|
||||
|
||||
if (!beIdsInCluster.contains(beId)) {
|
||||
return false;
|
||||
if (previoudBeId != -1L && !beIdsInCluster.contains(previoudBeId)) {
|
||||
return -1L;
|
||||
}
|
||||
|
||||
// check if be has idle slot
|
||||
readLock();
|
||||
try {
|
||||
int idleTaskNum = 0;
|
||||
Map<Long, Integer> beIdToConcurrentTasks = getBeCurrentTasksNumMap();
|
||||
if (beIdToConcurrentTasks.containsKey(beId)) {
|
||||
idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId);
|
||||
} else {
|
||||
idleTaskNum = Config.max_routine_load_task_num_per_be;
|
||||
// 1. Find if the given BE id has available slots
|
||||
if (previoudBeId != -1L) {
|
||||
int idleTaskNum = 0;
|
||||
if (beIdToConcurrentTasks.containsKey(previoudBeId)) {
|
||||
idleTaskNum = beIdToMaxConcurrentTasks.get(previoudBeId) - beIdToConcurrentTasks.get(previoudBeId);
|
||||
} else {
|
||||
idleTaskNum = Config.max_routine_load_task_num_per_be;
|
||||
}
|
||||
if (idleTaskNum > 0) {
|
||||
return previoudBeId;
|
||||
}
|
||||
}
|
||||
if (idleTaskNum > 0) {
|
||||
return true;
|
||||
|
||||
// 2. The given BE id does not have available slots, find a BE with min tasks
|
||||
updateBeIdToMaxConcurrentTasks();
|
||||
int idleTaskNum = 0;
|
||||
long resultBeId = -1L;
|
||||
int maxIdleSlotNum = 0;
|
||||
for (Long beId : beIdsInCluster) {
|
||||
if (beIdToMaxConcurrentTasks.containsKey(beId)) {
|
||||
if (beIdToConcurrentTasks.containsKey(beId)) {
|
||||
idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId);
|
||||
} else {
|
||||
idleTaskNum = Config.max_routine_load_task_num_per_be;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("be {} has idle {}, concurrent task {}, max concurrent task {}", beId, idleTaskNum,
|
||||
beIdToConcurrentTasks.get(beId), beIdToMaxConcurrentTasks.get(beId));
|
||||
}
|
||||
resultBeId = maxIdleSlotNum < idleTaskNum ? beId : resultBeId;
|
||||
maxIdleSlotNum = Math.max(maxIdleSlotNum, idleTaskNum);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return resultBeId;
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
|
||||
@ -36,13 +36,13 @@ import org.apache.doris.thrift.TRoutineLoadTask;
|
||||
import org.apache.doris.thrift.TStatus;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Queues;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
@ -278,34 +278,19 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
|
||||
// return true if allocate successfully. return false if failed.
|
||||
// throw exception if unrecoverable errors happen.
|
||||
private boolean allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo) throws LoadException {
|
||||
if (routineLoadTaskInfo.getPreviousBeId() != -1L) {
|
||||
if (routineLoadManager.checkBeToTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName())) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId())
|
||||
.add("job_id", routineLoadTaskInfo.getJobId())
|
||||
.add("previous_be_id", routineLoadTaskInfo.getPreviousBeId())
|
||||
.add("msg", "task use the previous be id")
|
||||
.build());
|
||||
}
|
||||
routineLoadTaskInfo.setBeId(routineLoadTaskInfo.getPreviousBeId());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// the previous BE is not available, try to find a better one
|
||||
long beId = routineLoadManager.getMinTaskBeId(routineLoadTaskInfo.getClusterName());
|
||||
if (beId < 0) {
|
||||
long beId = routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName());
|
||||
if (beId == -1L) {
|
||||
return false;
|
||||
}
|
||||
|
||||
routineLoadTaskInfo.setBeId(beId);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId())
|
||||
.add("job_id", routineLoadTaskInfo.getJobId())
|
||||
.add("be_id", routineLoadTaskInfo.getBeId())
|
||||
.add("msg", "task has been allocated to be")
|
||||
.build());
|
||||
.add("job_id", routineLoadTaskInfo.getJobId())
|
||||
.add("previous_be_id", routineLoadTaskInfo.getPreviousBeId())
|
||||
.add("assigned_be_id", beId)
|
||||
.build());
|
||||
}
|
||||
routineLoadTaskInfo.setBeId(beId);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,12 +17,12 @@
|
||||
|
||||
package org.apache.doris.load.routineload;
|
||||
|
||||
import org.apache.doris.analysis.Separator;
|
||||
import org.apache.doris.analysis.CreateRoutineLoadStmt;
|
||||
import org.apache.doris.analysis.LabelName;
|
||||
import org.apache.doris.analysis.ParseNode;
|
||||
import org.apache.doris.analysis.PauseRoutineLoadStmt;
|
||||
import org.apache.doris.analysis.ResumeRoutineLoadStmt;
|
||||
import org.apache.doris.analysis.Separator;
|
||||
import org.apache.doris.analysis.StopRoutineLoadStmt;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
@ -739,7 +739,7 @@ public class RoutineLoadManagerTest {
|
||||
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
|
||||
Config.max_routine_load_task_num_per_be = 10;
|
||||
Deencapsulation.setField(routineLoadManager, "beIdToMaxConcurrentTasks", beIdToMaxConcurrentTasks);
|
||||
Assert.assertEquals(true, routineLoadManager.checkBeToTask(1L, "default"));
|
||||
Assert.assertEquals(1L, routineLoadManager.getAvailableBeForTask(1L, "default"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user