diff --git a/be/src/exec/blocking_join_node.cpp b/be/src/exec/blocking_join_node.cpp index b4b2f057c3..a82e5f7ce4 100644 --- a/be/src/exec/blocking_join_node.cpp +++ b/be/src/exec/blocking_join_node.cpp @@ -82,10 +82,6 @@ Status BlockingJoinNode::close(RuntimeState* state) { void BlockingJoinNode::build_side_thread(RuntimeState* state, boost::promise* status) { status->set_value(construct_build_side(state)); - // Release the thread token as soon as possible (before the main thread joins - // on it). This way, if we had a chain of 10 joins using 1 additional thread, - // we'd keep the additional thread busy the whole time. - state->resource_pool()->release_thread_token(false); } Status BlockingJoinNode::open(RuntimeState* state) { @@ -105,19 +101,8 @@ Status BlockingJoinNode::open(RuntimeState* state) { // main thread boost::promise build_side_status; - if (state->resource_pool()->try_acquire_thread_token()) { - add_runtime_exec_option("Join Build-Side Prepared Asynchronously"); - // Thread build_thread(_node_name, "build thread", - // bind(&BlockingJoinNode::BuildSideThread, this, state, &build_side_status)); - // if (!state->cgroup().empty()) { - // RETURN_IF_ERROR( - // state->exec_env()->cgroups_mgr()->assign_thread_to_cgroup( - // build_thread, state->cgroup())); - // } - boost::thread(bind(&BlockingJoinNode::build_side_thread, this, state, &build_side_status)); - } else { - build_side_status.set_value(construct_build_side(state)); - } + add_runtime_exec_option("Join Build-Side Prepared Asynchronously"); + boost::thread(bind(&BlockingJoinNode::build_side_thread, this, state, &build_side_status)); // Open the left child so that it may perform any initialisation in parallel. // Don't exit even if we see an error, we still need to wait for the build thread diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index fff59ef41b..3fc0dfecf2 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -181,10 +181,6 @@ Status HashJoinNode::close(RuntimeState* state) { void HashJoinNode::build_side_thread(RuntimeState* state, boost::promise* status) { status->set_value(construct_hash_table(state)); - // Release the thread token as soon as possible (before the main thread joins - // on it). This way, if we had a chain of 10 joins using 1 additional thread, - // we'd keep the additional thread busy the whole time. - state->resource_pool()->release_thread_token(false); } Status HashJoinNode::construct_hash_table(RuntimeState* state) { @@ -238,13 +234,8 @@ Status HashJoinNode::open(RuntimeState* state) { // Only do this if we can get a thread token. Otherwise, do this in the // main thread boost::promise thread_status; - - if (state->resource_pool()->try_acquire_thread_token()) { - add_runtime_exec_option("Hash Table Built Asynchronously"); - boost::thread(bind(&HashJoinNode::build_side_thread, this, state, &thread_status)); - } else { - thread_status.set_value(construct_hash_table(state)); - } + add_runtime_exec_option("Hash Table Built Asynchronously"); + boost::thread(bind(&HashJoinNode::build_side_thread, this, state, &thread_status)); if (!_runtime_filter_descs.empty()) { RuntimeFilterSlots runtime_filter_slots(_probe_expr_ctxs, _build_expr_ctxs, diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 45341dfff5..6584100195 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -1327,7 +1327,6 @@ Status OlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) { void OlapScanNode::transfer_thread(RuntimeState* state) { // scanner open pushdown to scanThread - state->resource_pool()->acquire_thread_token(); Status status = Status::OK(); for (auto scanner : _olap_scanners) { status = Expr::clone_if_not_exists(_conjunct_ctxs, state, scanner->conjunct_ctxs()); @@ -1483,7 +1482,6 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { } } // end of transfer while - state->resource_pool()->release_thread_token(true); VLOG_CRITICAL << "TransferThread finish."; { std::unique_lock l(_row_batches_lock); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 038e9f0199..834529c7dd 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -187,8 +187,8 @@ FragmentExecState::FragmentExecState(const TUniqueId& query_id, _executor(exec_env, std::bind(std::mem_fn(&FragmentExecState::coordinator_callback), this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)), - _timeout_second(-1), _set_rsc_info(false), + _timeout_second(-1), _fragments_ctx(std::move(fragments_ctx)) { _start_time = DateTimeValue::local_time(); _coord_addr = _fragments_ctx->coord_addr; diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 72c2f6024b..70233e1e44 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -83,8 +83,6 @@ public: const TUniqueId& fragment_instance_id, std::vector* selected_columns); - RuntimeFilterMergeController& runtimefilter_controller() { return _runtimefilter_controller; } - Status apply_filter(const PPublishFilterRequest* request, const char* attach_data); Status merge_filter(const PMergeFilterRequest* request, const char* attach_data); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 37d162e3ce..d0f2cb2b5e 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -53,7 +53,6 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env, _done(false), _prepared(false), _closed(false), - _has_thread_token(false), _is_report_success(true), _is_report_on_cancel(true), _collect_query_statistics_with_every_batch(false) {} @@ -100,20 +99,6 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, _is_report_success = request.query_options.is_report_success; } - // Reserve one main thread from the pool - _runtime_state->resource_pool()->acquire_thread_token(); - _has_thread_token = true; - - _average_thread_tokens = profile()->add_sampling_counter( - "AverageThreadTokens", - std::bind(std::mem_fn(&ThreadResourceMgr::ResourcePool::num_threads), - _runtime_state->resource_pool())); - - // if (_exec_env->process_mem_tracker() != NULL) { - // // we have a global limit - // _runtime_state->mem_trackers()->push_back(_exec_env->process_mem_tracker()); - // } - int64_t bytes_limit = request.query_options.mem_limit; if (bytes_limit <= 0) { // sometimes the request does not set the query mem limit, we use default one. @@ -346,8 +331,6 @@ Status PlanFragmentExecutor::open_internal() { _sink.reset(NULL); _done = true; - release_thread_token(); - stop_report_thread(); send_report(true); @@ -465,7 +448,6 @@ Status PlanFragmentExecutor::get_next(RowBatch** batch) { LOG(INFO) << "Finished executing fragment query_id=" << print_id(_query_id) << " instance_id=" << print_id(_runtime_state->fragment_instance_id()); // Query is done, return the thread token - release_thread_token(); stop_report_thread(); send_report(true); } @@ -542,14 +524,6 @@ RuntimeProfile* PlanFragmentExecutor::profile() { return _runtime_state->runtime_profile(); } -void PlanFragmentExecutor::release_thread_token() { - if (_has_thread_token) { - _has_thread_token = false; - _runtime_state->resource_pool()->release_thread_token(true); - profile()->stop_sampling_counters_updates(_average_thread_tokens); - } -} - void PlanFragmentExecutor::close() { if (_closed) { return; diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index d3fb2d58a5..f2d1f2ae96 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -130,9 +130,6 @@ public: // Initiate cancellation. Must not be called until after prepare() returned. void cancel(); - // Releases the thread token for this fragment executor. - void release_thread_token(); - // call these only after prepare() RuntimeState* runtime_state() { return _runtime_state.get(); } const RowDescriptor& row_desc(); @@ -175,9 +172,6 @@ private: // true if close() has been called bool _closed; - // true if this fragment has not returned the thread token to the thread resource mgr - bool _has_thread_token; - bool _is_report_success; // If this is set to false, and '_is_report_success' is false as well, @@ -208,15 +202,6 @@ private: RuntimeProfile::Counter* _fragment_cpu_timer; - // Average number of thread tokens for the duration of the plan fragment execution. - // Fragments that do a lot of cpu work (non-coordinator fragment) will have at - // least 1 token. Fragments that contain a hdfs scan node will have 1+ tokens - // depending on system load. Other nodes (e.g. hash join node) can also reserve - // additional tokens. - // This is a measure of how much CPU resources this fragment used during the course - // of the execution. - RuntimeProfile::Counter* _average_thread_tokens; - // It is shared with BufferControlBlock and will be called in two different // threads. But their calls are all at different time, there is no problem of // multithreaded access. diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 441046efc2..ceedbc86d7 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -155,10 +155,6 @@ RuntimeState::~RuntimeState() { if (_buffer_reservation != nullptr) { _buffer_reservation->Close(); } - - if (_exec_env != nullptr && _exec_env->thread_mgr() != nullptr) { - _exec_env->thread_mgr()->unregister_pool(_resource_pool); - } } Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOptions& query_options, @@ -194,11 +190,6 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt _query_options.batch_size = DEFAULT_BATCH_SIZE; } - // Register with the thread mgr - if (exec_env != NULL) { - _resource_pool = exec_env->thread_mgr()->register_pool(); - DCHECK(_resource_pool != NULL); - } _db_name = "insert_stmt"; _import_label = print_id(fragment_instance_id); diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index 5cf9c2027f..34f1864799 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -43,8 +43,6 @@ static const std::string THREAD_INVOLUNTARY_CONTEXT_SWITCHES = "InvoluntaryConte // The root counter name for all top level counters. static const std::string ROOT_COUNTER = ""; -RuntimeProfile::PeriodicCounterUpdateState RuntimeProfile::_s_periodic_counter_update_state; - RuntimeProfile::RuntimeProfile(const std::string& name, bool is_averaged_profile) : _pool(new ObjectPool()), _own_pool(false), @@ -57,22 +55,6 @@ RuntimeProfile::RuntimeProfile(const std::string& name, bool is_averaged_profile } RuntimeProfile::~RuntimeProfile() { - for (auto iter = _rate_counters.begin(); iter != _rate_counters.end(); ++iter) { - stop_rate_counters_updates(*iter); - } - - for (auto iter = _sampling_counters.begin(); iter != _sampling_counters.end(); ++iter) { - stop_sampling_counters_updates(*iter); - } - - std::set*>::const_iterator buckets_iter; - - for (buckets_iter = _bucketing_counters.begin(); buckets_iter != _bucketing_counters.end(); - ++buckets_iter) { - // This is just a clean up. No need to perform conversion. Also, the underlying - // counters might be gone already. - stop_bucketing_counters_updates(*buckets_iter, false); - } } void RuntimeProfile::merge(RuntimeProfile* other) { @@ -654,61 +636,23 @@ RuntimeProfile::Counter* RuntimeProfile::add_rate_counter(const std::string& nam } Counter* dst_counter = add_counter(name, dst_type); - _rate_counters.push_back(dst_counter); - register_periodic_counter(src_counter, NULL, dst_counter, RATE_COUNTER); return dst_counter; } RuntimeProfile::Counter* RuntimeProfile::add_rate_counter(const std::string& name, SampleFn fn, TUnit::type dst_type) { - Counter* dst_counter = add_counter(name, dst_type); - register_periodic_counter(NULL, fn, dst_counter, RATE_COUNTER); - return dst_counter; + return add_counter(name, dst_type); } RuntimeProfile::Counter* RuntimeProfile::add_sampling_counter(const std::string& name, Counter* src_counter) { DCHECK(src_counter->type() == TUnit::UNIT); - Counter* dst_counter = add_counter(name, TUnit::DOUBLE_VALUE); - register_periodic_counter(src_counter, NULL, dst_counter, SAMPLING_COUNTER); - return dst_counter; + return add_counter(name, TUnit::DOUBLE_VALUE); } RuntimeProfile::Counter* RuntimeProfile::add_sampling_counter(const std::string& name, SampleFn sample_fn) { - Counter* dst_counter = add_counter(name, TUnit::DOUBLE_VALUE); - _sampling_counters.push_back(dst_counter); - register_periodic_counter(NULL, sample_fn, dst_counter, SAMPLING_COUNTER); - return dst_counter; -} - -void RuntimeProfile::add_bucketing_counters(const std::string& name, - const std::string& parent_counter_name, - Counter* src_counter, int num_buckets, - std::vector* buckets) { - { - std::lock_guard l(_counter_map_lock); - _bucketing_counters.insert(buckets); - } - - for (int i = 0; i < num_buckets; ++i) { - std::stringstream counter_name; - counter_name << name << "=" << i; - buckets->push_back( - add_counter(counter_name.str(), TUnit::DOUBLE_VALUE, parent_counter_name)); - } - - std::lock_guard l(_s_periodic_counter_update_state.lock); - - if (_s_periodic_counter_update_state.update_thread.get() == NULL) { - _s_periodic_counter_update_state.update_thread.reset( - new boost::thread(&RuntimeProfile::periodic_counter_update_loop)); - } - - BucketCountersInfo info; - info.src_counter = src_counter; - info.num_sampled = 0; - _s_periodic_counter_update_state.bucketing_counters[buckets] = info; + return add_counter(name, TUnit::DOUBLE_VALUE); } RuntimeProfile::EventSequence* RuntimeProfile::add_event_sequence(const std::string& name) { @@ -724,145 +668,6 @@ RuntimeProfile::EventSequence* RuntimeProfile::add_event_sequence(const std::str return timer; } -void RuntimeProfile::register_periodic_counter(Counter* src_counter, SampleFn sample_fn, - Counter* dst_counter, PeriodicCounterType type) { - DCHECK(src_counter == NULL || sample_fn == NULL); - - std::lock_guard l(_s_periodic_counter_update_state.lock); - - if (_s_periodic_counter_update_state.update_thread.get() == NULL) { - _s_periodic_counter_update_state.update_thread.reset( - new boost::thread(&RuntimeProfile::periodic_counter_update_loop)); - } - - switch (type) { - case RATE_COUNTER: { - RateCounterInfo counter; - counter.src_counter = src_counter; - counter.sample_fn = sample_fn; - counter.elapsed_ms = 0; - _s_periodic_counter_update_state.rate_counters[dst_counter] = counter; - break; - } - - case SAMPLING_COUNTER: { - SamplingCounterInfo counter; - counter.src_counter = src_counter; - counter.sample_fn = sample_fn; - counter.num_sampled = 0; - counter.total_sampled_value = 0; - _s_periodic_counter_update_state.sampling_counters[dst_counter] = counter; - break; - } - - default: - DCHECK(false) << "Unsupported PeriodicCounterType:" << type; - } -} - -void RuntimeProfile::stop_rate_counters_updates(Counter* rate_counter) { - std::lock_guard l(_s_periodic_counter_update_state.lock); - _s_periodic_counter_update_state.rate_counters.erase(rate_counter); -} - -void RuntimeProfile::stop_sampling_counters_updates(Counter* sampling_counter) { - std::lock_guard l(_s_periodic_counter_update_state.lock); - _s_periodic_counter_update_state.sampling_counters.erase(sampling_counter); -} - -void RuntimeProfile::stop_bucketing_counters_updates(std::vector* buckets, bool convert) { - int64_t num_sampled = 0; - { - std::lock_guard l(_s_periodic_counter_update_state.lock); - PeriodicCounterUpdateState::BucketCountersMap::const_iterator itr = - _s_periodic_counter_update_state.bucketing_counters.find(buckets); - - if (itr != _s_periodic_counter_update_state.bucketing_counters.end()) { - num_sampled = itr->second.num_sampled; - _s_periodic_counter_update_state.bucketing_counters.erase(buckets); - } - } - - if (convert && num_sampled > 0) { - for (Counter* counter : *buckets) { - double perc = 100 * counter->value() / (double)num_sampled; - counter->set(perc); - } - } -} - -RuntimeProfile::PeriodicCounterUpdateState::PeriodicCounterUpdateState() : _done(false) {} - -RuntimeProfile::PeriodicCounterUpdateState::~PeriodicCounterUpdateState() { - if (_s_periodic_counter_update_state.update_thread.get() != NULL) { - { - // Lock to ensure the update thread will see the update to _done - std::lock_guard l(_s_periodic_counter_update_state.lock); - _done = true; - } - _s_periodic_counter_update_state.update_thread->join(); - } -} - -void RuntimeProfile::periodic_counter_update_loop() { - while (!_s_periodic_counter_update_state._done) { - boost::system_time before_time = boost::get_system_time(); - SleepFor(MonoDelta::FromMilliseconds(config::periodic_counter_update_period_ms)); - boost::posix_time::time_duration elapsed = boost::get_system_time() - before_time; - int elapsed_ms = elapsed.total_milliseconds(); - - std::lock_guard l(_s_periodic_counter_update_state.lock); - - for (PeriodicCounterUpdateState::RateCounterMap::iterator it = - _s_periodic_counter_update_state.rate_counters.begin(); - it != _s_periodic_counter_update_state.rate_counters.end(); ++it) { - it->second.elapsed_ms += elapsed_ms; - int64_t value; - - if (it->second.src_counter != NULL) { - value = it->second.src_counter->value(); - } else { - DCHECK(it->second.sample_fn != NULL); - value = it->second.sample_fn(); - } - - int64_t rate = value * 1000 / (it->second.elapsed_ms); - it->first->set(rate); - } - - for (PeriodicCounterUpdateState::SamplingCounterMap::iterator it = - _s_periodic_counter_update_state.sampling_counters.begin(); - it != _s_periodic_counter_update_state.sampling_counters.end(); ++it) { - ++it->second.num_sampled; - int64_t value; - - if (it->second.src_counter != NULL) { - value = it->second.src_counter->value(); - } else { - DCHECK(it->second.sample_fn != NULL); - value = it->second.sample_fn(); - } - - it->second.total_sampled_value += value; - double average = - static_cast(it->second.total_sampled_value) / it->second.num_sampled; - it->first->set(average); - } - - for (PeriodicCounterUpdateState::BucketCountersMap::iterator it = - _s_periodic_counter_update_state.bucketing_counters.begin(); - it != _s_periodic_counter_update_state.bucketing_counters.end(); ++it) { - int64_t val = it->second.src_counter->value(); - - if (val >= it->first->size()) { - val = it->first->size() - 1; - } - - it->first->at(val)->update(1); - ++it->second.num_sampled; - } - } -} void RuntimeProfile::print_child_counters(const std::string& prefix, const std::string& counter_name, diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index a0b01d0d88..f5edf61c0b 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -414,12 +414,6 @@ public: // Same as 'add_sampling_counter' above except the samples are taken by calling fn. Counter* add_sampling_counter(const std::string& name, SampleFn fn); - // Add a bucket of counters to store the sampled value of src_counter. - // The src_counter is sampled periodically and the buckets are updated. - void add_bucketing_counters(const std::string& name, const std::string& parent_counter_name, - Counter* src_counter, int max_buckets, - std::vector* buckets); - /// Adds a high water mark counter to the runtime profile. Otherwise, same behavior /// as AddCounter(). HighWaterMarkCounter* AddHighWaterMarkCounter(const std::string& name, TUnit::type unit, @@ -429,22 +423,6 @@ public: std::shared_ptr AddSharedHighWaterMarkCounter( const std::string& name, TUnit::type unit, const std::string& parent_counter_name = ""); - // stops updating the value of 'rate_counter'. Rate counters are updated - // periodically so should be removed as soon as the underlying counter is - // no longer going to change. - void stop_rate_counters_updates(Counter* rate_counter); - - // stops updating the value of 'sampling_counter'. Sampling counters are updated - // periodically so should be removed as soon as the underlying counter is - // no longer going to change. - void stop_sampling_counters_updates(Counter* sampling_counter); - - // stops updating the bucket counter. - // If convert is true, convert the buckets from count to percentage. - // Sampling counters are updated periodically so should be removed as soon as the - // underlying counter is no longer going to change. - void stop_bucketing_counters_updates(std::vector* buckets, bool convert); - // Recursively compute the fraction of the 'total_time' spent in this profile and // its children. // This function updates _local_time_percent for each profile. @@ -516,10 +494,6 @@ private: // of the total time in the entire profile tree. double _local_time_percent; - std::vector _rate_counters; - - std::vector _sampling_counters; - enum PeriodicCounterType { RATE_COUNTER = 0, SAMPLING_COUNTER, @@ -544,41 +518,6 @@ private: // TODO: customize bucketing }; - // This is a static singleton object that is used to update all rate counters and - // sampling counters. - struct PeriodicCounterUpdateState { - PeriodicCounterUpdateState(); - - // Tears down the update thread. - ~PeriodicCounterUpdateState(); - - // Lock protecting state below - std::mutex lock; - - // If true, tear down the update thread. - volatile bool _done; - - // Thread performing asynchronous updates. - boost::scoped_ptr update_thread; - - // A map of the dst (rate) counter to the src counter and elapsed time. - typedef std::map RateCounterMap; - RateCounterMap rate_counters; - - // A map of the dst (averages over samples) counter to the src counter (to be sampled) - // and number of samples taken. - typedef std::map SamplingCounterMap; - SamplingCounterMap sampling_counters; - - // Map from a bucket of counters to the src counter - typedef std::map*, BucketCountersInfo> BucketCountersMap; - BucketCountersMap bucketing_counters; - }; - - // Singleton object that keeps track of all rate counters and the thread - // for updating them. - static PeriodicCounterUpdateState _s_periodic_counter_update_state; - // update a subtree of profiles from nodes, rooted at *idx. // On return, *idx points to the node immediately following this subtree. void update(const std::vector& nodes, int* idx); @@ -588,18 +527,6 @@ private: // Called recusively. void compute_time_in_profile(int64_t total_time); - // Registers a periodic counter to be updated by the update thread. - // Either sample_fn or dst_counter must be non-NULL. When the periodic counter - // is updated, it either gets the value from the dst_counter or calls the sample - // function to get the value. - // dst_counter/sample fn is assumed to be compatible types with src_counter. - static void register_periodic_counter(Counter* src_counter, SampleFn sample_fn, - Counter* dst_counter, PeriodicCounterType type); - - // Loop for periodic counter update thread. This thread wakes up once in a while - // and updates all the added rate counters and sampling counters. - static void periodic_counter_update_loop(); - // Print the child counters of the given counter name static void print_child_counters(const std::string& prefix, const std::string& counter_name, const CounterMap& counter_map, diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index 44cd71cd39..e204155288 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -209,9 +209,9 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { // And this task will then be aborted because of a timeout. // In this way, we can prevent the entire job from being paused due to submit errors, // and we can also relieve the pressure on BE by waiting for the timeout period. - LOG.warn("failed to submit routine load task {} to BE: {}", + LOG.warn("failed to submit routine load task {} to BE: {}, error: {}", DebugUtil.printId(routineLoadTaskInfo.getId()), - routineLoadTaskInfo.getBeId()); + routineLoadTaskInfo.getBeId(), e.getMessage()); routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).setOtherMsg(e.getMessage()); // fall through to set ExecuteStartTime }