[fix](http stream) http stream support memtable_on_sink_node header (#31866)

This commit is contained in:
meiyi
2024-03-07 11:13:31 +08:00
committed by yiguolei
parent 9d5c51f5c3
commit da5a40077f
2 changed files with 21 additions and 5 deletions

View File

@ -294,6 +294,10 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,
TStreamLoadPutRequest request;
if (http_req != nullptr) {
request.__set_load_sql(http_req->header(HTTP_SQL));
if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) {
bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true");
request.__set_memtable_on_sink_node(value);
}
} else {
request.__set_token(ctx->auth.token);
request.__set_load_sql(ctx->sql_str);

View File

@ -2036,6 +2036,11 @@ public class FrontendServiceImpl implements FrontendService.Iface {
request.getTbl(),
request.getUserIp(), PrivPredicate.LOAD);
}
if (request.isSetMemtableOnSinkNode()) {
ctx.getSessionVariable().enableMemtableOnSinkNode = request.isMemtableOnSinkNode();
} else {
ctx.getSessionVariable().enableMemtableOnSinkNode = Config.stream_load_default_memtable_on_sink_node;
}
SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
try {
@ -2057,6 +2062,13 @@ public class FrontendServiceImpl implements FrontendService.Iface {
Coordinator coord = new Coordinator(ctx, analyzer, executor.planner());
coord.setLoadMemLimit(request.getExecMemLimit());
coord.setQueryType(TQueryType.LOAD);
Table table = parsedStmt.getTargetTable();
if (table instanceof OlapTable) {
boolean isEnableMemtableOnSinkNode =
((OlapTable) table).getTableProperty().getUseSchemaLightChange()
? coord.getQueryOptions().isEnableMemtableOnSinkNode() : false;
coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
}
TExecPlanFragmentParams plan = coord.getStreamLoadPlan();
int loadStreamPerNode = 20;
@ -2073,11 +2085,11 @@ public class FrontendServiceImpl implements FrontendService.Iface {
// The txn_id here is obtained from the NativeInsertStmt
result.getParams().setTxnConf(new TTxnParams().setTxnId(txn_id));
result.getParams().setImportLabel(parsedStmt.getLabel());
result.setDbId(parsedStmt.getTargetTable().getDatabase().getId());
result.setTableId(parsedStmt.getTargetTable().getId());
result.setBaseSchemaVersion(((OlapTable) parsedStmt.getTargetTable()).getBaseSchemaVersion());
result.setGroupCommitIntervalMs(((OlapTable) parsedStmt.getTargetTable()).getGroupCommitIntervalMs());
result.setGroupCommitDataBytes(((OlapTable) parsedStmt.getTargetTable()).getGroupCommitDataBytes());
result.setDbId(table.getDatabase().getId());
result.setTableId(table.getId());
result.setBaseSchemaVersion(((OlapTable) table).getBaseSchemaVersion());
result.setGroupCommitIntervalMs(((OlapTable) table).getGroupCommitIntervalMs());
result.setGroupCommitDataBytes(((OlapTable) table).getGroupCommitDataBytes());
result.setWaitInternalGroupCommitFinish(Config.wait_internal_group_commit_finish);
} catch (UserException e) {
LOG.warn("exec sql error", e);