branch-2.1: [fix](load) fix the error msg of task submission failure for memory back pressure #51078 (#51131)
Cherry-picked from #51078 Co-authored-by: hui lai <laihui@selectdb.com>
This commit is contained in:
committed by
GitHub
parent
8cca998ce6
commit
df464f84b1
@ -205,13 +205,14 @@ Status RoutineLoadTaskExecutor::get_kafka_real_offsets_for_partitions(
|
||||
|
||||
Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
// check if already submitted
|
||||
if (_task_map.find(task.id) != _task_map.end()) {
|
||||
// already submitted
|
||||
LOG(INFO) << "routine load task " << UniqueId(task.id) << " has already been submitted";
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
if (_task_map.size() >= config::max_routine_load_thread_pool_size || _reach_memory_limit()) {
|
||||
// check task num limit
|
||||
if (_task_map.size() >= config::max_routine_load_thread_pool_size) {
|
||||
LOG(INFO) << "too many tasks in thread pool. reject task: " << UniqueId(task.id)
|
||||
<< ", job id: " << task.job_id
|
||||
<< ", queue size: " << _thread_pool->get_queue_size()
|
||||
@ -220,6 +221,18 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
|
||||
BackendOptions::get_localhost());
|
||||
}
|
||||
|
||||
// check memory limit
|
||||
std::string reason;
|
||||
DBUG_EXECUTE_IF("RoutineLoadTaskExecutor.submit_task.memory_limit", {
|
||||
_reach_memory_limit(reason);
|
||||
return Status::MemoryLimitExceeded("fake reason: " + reason);
|
||||
});
|
||||
if (_reach_memory_limit(reason)) {
|
||||
LOG(INFO) << "reach memory limit. reject task: " << UniqueId(task.id)
|
||||
<< ", job id: " << task.job_id << ", reason: " << reason;
|
||||
return Status::MemoryLimitExceeded(reason);
|
||||
}
|
||||
|
||||
// create the context
|
||||
std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
|
||||
ctx->load_type = TLoadType::ROUTINE_LOAD;
|
||||
@ -306,14 +319,18 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
|
||||
}
|
||||
}
|
||||
|
||||
bool RoutineLoadTaskExecutor::_reach_memory_limit() {
|
||||
bool RoutineLoadTaskExecutor::_reach_memory_limit(std::string& reason) {
|
||||
DBUG_EXECUTE_IF("RoutineLoadTaskExecutor.submit_task.memory_limit", {
|
||||
reason = "reach memory limit";
|
||||
return true;
|
||||
});
|
||||
bool is_exceed_soft_mem_limit = GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
|
||||
auto current_load_mem_value =
|
||||
MemTrackerLimiter::TypeMemSum[MemTrackerLimiter::Type::LOAD]->current_value();
|
||||
if (is_exceed_soft_mem_limit || current_load_mem_value > _load_mem_limit) {
|
||||
LOG(INFO) << "is_exceed_soft_mem_limit: " << is_exceed_soft_mem_limit
|
||||
<< " current_load_mem_value: " << current_load_mem_value
|
||||
<< " _load_mem_limit: " << _load_mem_limit;
|
||||
reason = "is_exceed_soft_mem_limit: " + std::to_string(is_exceed_soft_mem_limit) +
|
||||
" current_load_mem_value: " + std::to_string(current_load_mem_value) +
|
||||
" _load_mem_limit: " + std::to_string(_load_mem_limit);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
||||
@ -88,7 +88,7 @@ private:
|
||||
// create a dummy StreamLoadContext for PKafkaMetaProxyRequest
|
||||
Status _prepare_ctx(const PKafkaMetaProxyRequest& request,
|
||||
std::shared_ptr<StreamLoadContext> ctx);
|
||||
bool _reach_memory_limit();
|
||||
bool _reach_memory_limit(std::string& reason);
|
||||
|
||||
private:
|
||||
ExecEnv* _exec_env = nullptr;
|
||||
|
||||
@ -48,13 +48,11 @@ suite("test_routine_load_error_info","nonConcurrent") {
|
||||
}
|
||||
}
|
||||
|
||||
// case 1: task failed
|
||||
if (enabled != null && enabled.equalsIgnoreCase("true")) {
|
||||
// create table
|
||||
def jobName = "test_error_info"
|
||||
def tableName = "test_routine_error_info"
|
||||
try {
|
||||
sql """
|
||||
def createTable = {tableName ->
|
||||
sql """
|
||||
DROP TABLE IF EXISTS ${tableName}
|
||||
"""
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS ${tableName}
|
||||
(
|
||||
k00 INT NOT NULL,
|
||||
@ -120,13 +118,12 @@ suite("test_routine_load_error_info","nonConcurrent") {
|
||||
"bloom_filter_columns"="k05",
|
||||
"replication_num" = "1"
|
||||
);
|
||||
"""
|
||||
sql "sync"
|
||||
"""
|
||||
}
|
||||
|
||||
// create job
|
||||
GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments")
|
||||
sql """
|
||||
CREATE ROUTINE LOAD ${jobName} on ${tableName}
|
||||
def createJob = {jobName, tableName, kafkaTopic ->
|
||||
sql """
|
||||
CREATE ROUTINE LOAD ${jobName} on ${tableName}
|
||||
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
|
||||
COLUMNS TERMINATED BY "|"
|
||||
PROPERTIES
|
||||
@ -138,10 +135,22 @@ suite("test_routine_load_error_info","nonConcurrent") {
|
||||
FROM KAFKA
|
||||
(
|
||||
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
|
||||
"kafka_topic" = "${kafkaCsvTpoics[0]}",
|
||||
"kafka_topic" = "${kafkaTopic}",
|
||||
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
|
||||
);
|
||||
"""
|
||||
"""
|
||||
}
|
||||
|
||||
// case 1: task failed
|
||||
if (enabled != null && enabled.equalsIgnoreCase("true")) {
|
||||
// create table
|
||||
def jobName = "test_error_info"
|
||||
def tableName = "test_routine_error_info"
|
||||
try {
|
||||
createTable(tableName)
|
||||
sql "sync"
|
||||
GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments")
|
||||
createJob(jobName, tableName, kafkaCsvTpoics[0])
|
||||
sql "sync"
|
||||
|
||||
// check error info
|
||||
@ -158,10 +167,8 @@ suite("test_routine_load_error_info","nonConcurrent") {
|
||||
if (count > 60) {
|
||||
assertEquals(1, 2)
|
||||
break;
|
||||
} else {
|
||||
sleep(1000)
|
||||
continue;
|
||||
}
|
||||
sleep(1000)
|
||||
}
|
||||
} finally {
|
||||
GetDebugPoint().disableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments")
|
||||
@ -175,93 +182,45 @@ suite("test_routine_load_error_info","nonConcurrent") {
|
||||
def jobName = "test_error_info"
|
||||
def tableName = "test_routine_error_info"
|
||||
try {
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS ${tableName}
|
||||
(
|
||||
k00 INT NOT NULL,
|
||||
k01 DATE NOT NULL,
|
||||
k02 BOOLEAN NULL,
|
||||
k03 TINYINT NULL,
|
||||
k04 SMALLINT NULL,
|
||||
k05 INT NULL,
|
||||
k06 BIGINT NULL,
|
||||
k07 LARGEINT NULL,
|
||||
k08 FLOAT NULL,
|
||||
k09 DOUBLE NULL,
|
||||
k10 DECIMAL(9,1) NULL,
|
||||
k11 DECIMALV3(9,1) NULL,
|
||||
k12 DATETIME NULL,
|
||||
k13 DATEV2 NULL,
|
||||
k14 DATETIMEV2 NULL,
|
||||
k15 CHAR NULL,
|
||||
k16 VARCHAR NULL,
|
||||
k17 STRING NULL,
|
||||
k18 JSON NULL,
|
||||
kd01 BOOLEAN NOT NULL DEFAULT "TRUE",
|
||||
kd02 TINYINT NOT NULL DEFAULT "1",
|
||||
kd03 SMALLINT NOT NULL DEFAULT "2",
|
||||
kd04 INT NOT NULL DEFAULT "3",
|
||||
kd05 BIGINT NOT NULL DEFAULT "4",
|
||||
kd06 LARGEINT NOT NULL DEFAULT "5",
|
||||
kd07 FLOAT NOT NULL DEFAULT "6.0",
|
||||
kd08 DOUBLE NOT NULL DEFAULT "7.0",
|
||||
kd09 DECIMAL NOT NULL DEFAULT "888888888",
|
||||
kd10 DECIMALV3 NOT NULL DEFAULT "999999999",
|
||||
kd11 DATE NOT NULL DEFAULT "2023-08-24",
|
||||
kd12 DATETIME NOT NULL DEFAULT "2023-08-24 12:00:00",
|
||||
kd13 DATEV2 NOT NULL DEFAULT "2023-08-24",
|
||||
kd14 DATETIMEV2 NOT NULL DEFAULT "2023-08-24 12:00:00",
|
||||
kd15 CHAR(255) NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
|
||||
kd16 VARCHAR(300) NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
|
||||
kd17 STRING NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
|
||||
kd18 JSON NULL,
|
||||
|
||||
INDEX idx_inverted_k104 (`k05`) USING INVERTED,
|
||||
INDEX idx_inverted_k110 (`k11`) USING INVERTED,
|
||||
INDEX idx_inverted_k113 (`k13`) USING INVERTED,
|
||||
INDEX idx_inverted_k114 (`k14`) USING INVERTED,
|
||||
INDEX idx_inverted_k117 (`k17`) USING INVERTED PROPERTIES("parser" = "english"),
|
||||
INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"),
|
||||
INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"),
|
||||
INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"),
|
||||
|
||||
INDEX idx_bitmap_k104 (`k02`) USING BITMAP,
|
||||
INDEX idx_bitmap_k110 (`kd01`) USING BITMAP
|
||||
|
||||
)
|
||||
DUPLICATE KEY(k00)
|
||||
PARTITION BY RANGE(k01)
|
||||
(
|
||||
PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')),
|
||||
PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')),
|
||||
PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01'))
|
||||
)
|
||||
DISTRIBUTED BY HASH(k00) BUCKETS 32
|
||||
PROPERTIES (
|
||||
"bloom_filter_columns"="k05",
|
||||
"replication_num" = "1"
|
||||
);
|
||||
"""
|
||||
createTable(tableName)
|
||||
sql "sync"
|
||||
createJob(jobName, tableName, "invalid_job")
|
||||
sql "sync"
|
||||
|
||||
// create job
|
||||
sql """
|
||||
CREATE ROUTINE LOAD ${jobName} on ${tableName}
|
||||
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
|
||||
COLUMNS TERMINATED BY "|"
|
||||
PROPERTIES
|
||||
(
|
||||
"max_batch_interval" = "5",
|
||||
"max_batch_rows" = "300000",
|
||||
"max_batch_size" = "209715200"
|
||||
)
|
||||
FROM KAFKA
|
||||
(
|
||||
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
|
||||
"kafka_topic" = "invalid_job",
|
||||
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
|
||||
);
|
||||
"""
|
||||
// check error info
|
||||
def count = 0
|
||||
while (true) {
|
||||
def res = sql "show routine load for ${jobName}"
|
||||
log.info("show routine load: ${res[0].toString()}".toString())
|
||||
log.info("reason: ${res[0][17].toString()}".toString())
|
||||
if (res[0][17].toString() != "") {
|
||||
assertTrue(res[0][17].toString().contains("may be Kafka properties set in job is error or no partition in this topic that should check Kafka"))
|
||||
break;
|
||||
}
|
||||
count++
|
||||
if (count > 60) {
|
||||
assertEquals(1, 2)
|
||||
break;
|
||||
}
|
||||
sleep(1000)
|
||||
}
|
||||
} finally {
|
||||
sql "stop routine load for ${jobName}"
|
||||
sql "DROP TABLE IF EXISTS ${tableName}"
|
||||
}
|
||||
}
|
||||
|
||||
// case 3: memory limit
|
||||
if (enabled != null && enabled.equalsIgnoreCase("true")) {
|
||||
def jobName = "test_memory_limit_error_info"
|
||||
def tableName = "test_routine_memory_limit_error_info"
|
||||
|
||||
try {
|
||||
createTable(tableName)
|
||||
sql "sync"
|
||||
GetDebugPoint().enableDebugPointForAllBEs("RoutineLoadTaskExecutor.submit_task.memory_limit")
|
||||
createJob(jobName, tableName, kafkaCsvTpoics[0])
|
||||
sql "sync"
|
||||
|
||||
// check error info
|
||||
def count = 0
|
||||
@ -269,20 +228,19 @@ suite("test_routine_load_error_info","nonConcurrent") {
|
||||
def res = sql "show routine load for ${jobName}"
|
||||
log.info("show routine load: ${res[0].toString()}".toString())
|
||||
log.info("other msg: ${res[0][19].toString()}".toString())
|
||||
if (res[0][19].toString() != "" && res[0][8].toString() == "NEED_SCHEDULE") {
|
||||
assertTrue(res[0][19].toString().contains("may be Kafka properties set in job is error or no partition in this topic that should check Kafka"))
|
||||
if (res[0][19].toString() != "") {
|
||||
assertTrue(res[0][19].toString().contains("reach memory limit"))
|
||||
break;
|
||||
}
|
||||
count++
|
||||
if (count > 60) {
|
||||
assertEquals(1, 2)
|
||||
break;
|
||||
} else {
|
||||
sleep(1000)
|
||||
continue;
|
||||
}
|
||||
sleep(1000)
|
||||
}
|
||||
} finally {
|
||||
GetDebugPoint().disableDebugPointForAllBEs("RoutineLoadTaskExecutor.submit_task.memory_limit")
|
||||
sql "stop routine load for ${jobName}"
|
||||
sql "DROP TABLE IF EXISTS ${tableName}"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user