[fix](streamload) set coord for streamLoad (#12744)

When a stream load is canceled, status is reported to coord.
This commit is contained in:
Yongqiang YANG
2022-09-23 20:23:19 +08:00
committed by GitHub
parent 7f5970d62f
commit 9dc35ab534
3 changed files with 10 additions and 1 deletions

View File

@ -77,6 +77,8 @@ void Daemon::tcmalloc_gc_thread() {
&used_size);
MallocExtension::instance()->GetNumericProperty("tcmalloc.pageheap_free_bytes", &free_size);
size_t alloc_size = used_size + free_size;
LOG(INFO) << "tcmalloc.pageheap_free_bytes " << free_size
<< ", generic.current_allocated_bytes " << used_size;
if (alloc_size > config::tc_use_memory_min) {
size_t max_free_size = alloc_size * config::tc_free_memory_rate / 100;

View File

@ -297,8 +297,9 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil
FrontendServiceConnection coord(_exec_env->frontend_client_cache(), _coord_addr, &coord_status);
if (!coord_status.ok()) {
std::stringstream ss;
UniqueId uid(_query_id.hi, _query_id.lo);
ss << "couldn't get a client for " << _coord_addr << ", reason: " << coord_status;
LOG(WARNING) << "query_id: " << _query_id << ", " << ss.str();
LOG(WARNING) << "query_id: " << uid << ", " << ss.str();
update_status(Status::InternalError(ss.str()));
return;
}
@ -623,6 +624,9 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl,
&(fragments_ctx->desc_tbl)));
fragments_ctx->coord_addr = params.coord;
LOG(INFO) << "query_id: "
<< UniqueId(fragments_ctx->query_id.hi, fragments_ctx->query_id.lo)
<< " coord_addr " << fragments_ctx->coord_addr;
fragments_ctx->query_globals = params.query_globals;
if (params.__isset.resource_info) {

View File

@ -43,10 +43,12 @@ import org.apache.doris.common.UserException;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.PaloInternalServiceVersion;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TLoadErrorHubInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPlanFragmentExecParams;
import org.apache.doris.thrift.TQueryGlobals;
import org.apache.doris.thrift.TQueryOptions;
@ -200,6 +202,7 @@ public class StreamLoadPlanner {
params.setFragment(fragment.toThrift());
params.setDescTbl(analyzer.getDescTbl().toThrift());
params.setCoord(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
TPlanFragmentExecParams execParams = new TPlanFragmentExecParams();
// user load id (streamLoadTask.id) as query id