From 9dc35ab5346d3373a6f97e6d514f535abdfd73a5 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> Date: Fri, 23 Sep 2022 20:23:19 +0800 Subject: [PATCH] [fix](streamload) set coord for streamLoad (#12744) When a stream load is canceled, status is reported to coord. --- be/src/common/daemon.cpp | 2 ++ be/src/runtime/fragment_mgr.cpp | 6 +++++- .../java/org/apache/doris/planner/StreamLoadPlanner.java | 3 +++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 64f1e509fb..0492400e21 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -77,6 +77,8 @@ void Daemon::tcmalloc_gc_thread() { &used_size); MallocExtension::instance()->GetNumericProperty("tcmalloc.pageheap_free_bytes", &free_size); size_t alloc_size = used_size + free_size; + LOG(INFO) << "tcmalloc.pageheap_free_bytes " << free_size + << ", generic.current_allocated_bytes " << used_size; if (alloc_size > config::tc_use_memory_min) { size_t max_free_size = alloc_size * config::tc_free_memory_rate / 100; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 1b86f57a94..214581f696 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -297,8 +297,9 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil FrontendServiceConnection coord(_exec_env->frontend_client_cache(), _coord_addr, &coord_status); if (!coord_status.ok()) { std::stringstream ss; + UniqueId uid(_query_id.hi, _query_id.lo); ss << "couldn't get a client for " << _coord_addr << ", reason: " << coord_status; - LOG(WARNING) << "query_id: " << _query_id << ", " << ss.str(); + LOG(WARNING) << "query_id: " << uid << ", " << ss.str(); update_status(Status::InternalError(ss.str())); return; } @@ -623,6 +624,9 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl, &(fragments_ctx->desc_tbl))); fragments_ctx->coord_addr = params.coord; + LOG(INFO) << "query_id: " + << UniqueId(fragments_ctx->query_id.hi, fragments_ctx->query_id.lo) + << " coord_addr " << fragments_ctx->coord_addr; fragments_ctx->query_globals = params.query_globals; if (params.__isset.resource_info) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index aa1e43ac5c..61214af84e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -43,10 +43,12 @@ import org.apache.doris.common.UserException; import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.service.FrontendOptions; import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.PaloInternalServiceVersion; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TLoadErrorHubInfo; +import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPlanFragmentExecParams; import org.apache.doris.thrift.TQueryGlobals; import org.apache.doris.thrift.TQueryOptions; @@ -200,6 +202,7 @@ public class StreamLoadPlanner { params.setFragment(fragment.toThrift()); params.setDescTbl(analyzer.getDescTbl().toThrift()); + params.setCoord(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port)); TPlanFragmentExecParams execParams = new TPlanFragmentExecParams(); // user load id (streamLoadTask.id) as query id