diff --git a/deps/easy/src/io/easy_negotiation.c b/deps/easy/src/io/easy_negotiation.c index 389ac1ca8..7cd2a0199 100644 --- a/deps/easy/src/io/easy_negotiation.c +++ b/deps/easy/src/io/easy_negotiation.c @@ -85,9 +85,9 @@ static int easy_decode_uint64(char *buf, const int64_t data_len, int64_t *pos, u *val |= (((uint64_t)((*(buf + (*pos)++))) & 0xff)) << 24; *val |= (((uint64_t)((*(buf + (*pos)++))) & 0xff)) << 16; *val |= (((uint64_t)((*(buf + (*pos)++))) & 0xff)) << 8; - *val |= (((uint64_t)((*(buf + (*pos)++))) & 0xff)); - } - + *val |= (((uint64_t)((*(buf + (*pos)++))) & 0xff)); + } + return ret; } @@ -143,9 +143,9 @@ static int easy_decode_negotiation_msg(easy_negotiation_msg_t *ne_msg, char *rec int64_t pos = 0; if (NULL == ne_msg || NULL == recv_buf || NULL == decode_len) { - easy_error_log("easy_decode_negotiation_msg, invalid param!"); - return EASY_ERROR; - } + easy_error_log("easy_decode_negotiation_msg, invalid param!"); + return EASY_ERROR; + } ret = easy_decode_uint64(recv_buf, recv_buf_len, &pos, &(ne_msg->msg_header.header_magic)); if (ret != EASY_OK) { @@ -297,6 +297,6 @@ int easy_send_negotiate_message(easy_connection_t *c) } void easy_consume_negotiation_msg(int fd, easy_io_t *eio) -{ - net_consume_negotiation_msg(fd, eio->magic); -} +{ + net_consume_negotiation_msg(fd, eio->magic); +} diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h index af8316fb9..f392372b2 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -518,6 +518,8 @@ PCODE_DEF(OB_DAS_ASYNC_ERASE_RESULT, 0x529) //erase das result with async rpc PCODE_DEF(OB_RECOMPILE_ALL_VIEWS_BATCH, 0x52A) //reset status for view during upgrade PCODE_DEF(OB_DAS_ASYNC_ACCESS, 0x52B) //das async rpc PCODE_DEF(OB_TRY_ADD_DEP_INFOS_FOR_SYNONYM_BATCH, 0x52C) //add dependency for synonym during upgrade +PCODE_DEF(OB_CLEAN_DTL_INTERM_RESULT, 0x52D) //add dependency for synonym during upgrade + PCODE_DEF(OB_SQL_PCODE_END, 0x54F) // as a guardian // for test schema diff --git a/src/observer/ob_srv_xlator_partition.cpp b/src/observer/ob_srv_xlator_partition.cpp index 269615e66..51c6009ea 100644 --- a/src/observer/ob_srv_xlator_partition.cpp +++ b/src/observer/ob_srv_xlator_partition.cpp @@ -162,6 +162,7 @@ void oceanbase::observer::init_srv_xlator_for_others(ObSrvRpcXlator *xlator) { RPC_PROCESSOR(ObInitTaskP, gctx_); RPC_PROCESSOR(ObInitFastSqcP, gctx_); RPC_PROCESSOR(ObPxTenantTargetMonitorP, gctx_); + RPC_PROCESSOR(ObPxCleanDtlIntermResP, gctx_); // SQL Estimate RPC_PROCESSOR(ObEstimatePartitionRowsP, gctx_); diff --git a/src/sql/code_generator/ob_static_engine_cg.cpp b/src/sql/code_generator/ob_static_engine_cg.cpp index d4955e257..450ae7864 100644 --- a/src/sql/code_generator/ob_static_engine_cg.cpp +++ b/src/sql/code_generator/ob_static_engine_cg.cpp @@ -4289,7 +4289,7 @@ int ObStaticEngineCG::generate_join_spec(ObLogJoin &op, ObJoinSpec &spec) ObNestedLoopJoinSpec &nlj = static_cast(spec); if (op.enable_px_batch_rescan()) { nlj.enable_px_batch_rescan_ = true; - nlj.group_size_ = ObNestedLoopJoinOp::PX_RESCAN_BATCH_ROW_COUNT; + nlj.group_size_ = PX_RESCAN_BATCH_ROW_COUNT; } else { nlj.enable_px_batch_rescan_ = false; } diff --git a/src/sql/dtl/ob_dtl_task.h b/src/sql/dtl/ob_dtl_task.h index b7e3e09d6..11480f05f 100644 --- a/src/sql/dtl/ob_dtl_task.h +++ b/src/sql/dtl/ob_dtl_task.h @@ -51,7 +51,7 @@ public: // no need to serialize DTL_CHAN_STATE state_; - TO_STRING_KV(KP_(chid), K_(type), K_(peer), K_(role), K_(tenant_id), K(state_)); + TO_STRING_KV(K_(chid), K_(type), K_(peer), K_(role), K_(tenant_id), K(state_)); }; class ObDtlChSet @@ -70,6 +70,7 @@ public: int64_t count() const { return ch_info_set_.count(); } int assign(const ObDtlChSet &other); void reset() { ch_info_set_.reset(); } + common::ObIArray &get_ch_info_set() { return ch_info_set_; } TO_STRING_KV(K_(exec_addr), K_(ch_info_set)); protected: common::ObAddr exec_addr_; diff --git a/src/sql/engine/join/ob_nested_loop_join_op.h b/src/sql/engine/join/ob_nested_loop_join_op.h index 6ed6ca87e..21480bce2 100644 --- a/src/sql/engine/join/ob_nested_loop_join_op.h +++ b/src/sql/engine/join/ob_nested_loop_join_op.h @@ -122,8 +122,6 @@ public: int fill_cur_row_rescan_param(); int calc_other_conds(bool &is_match); -public: - static const int64_t PX_RESCAN_BATCH_ROW_COUNT = 8192; private: // state operation and transfer function type. typedef int (ObNestedLoopJoinOp::*state_operation_func_type)(); diff --git a/src/sql/engine/px/exchange/ob_px_fifo_coord_op.h b/src/sql/engine/px/exchange/ob_px_fifo_coord_op.h index 5958db5cb..50c1d8935 100644 --- a/src/sql/engine/px/exchange/ob_px_fifo_coord_op.h +++ b/src/sql/engine/px/exchange/ob_px_fifo_coord_op.h @@ -90,6 +90,10 @@ private: int fetch_rows(const int64_t row_cnt); int setup_loop_proc() override; + virtual void clean_dfos_dtl_interm_result() override + { + msg_proc_.clean_dtl_interm_result(ctx_); + } private: ObPxFifoCoordOpEventListener listener_; ObSerialDfoScheduler serial_scheduler_; diff --git a/src/sql/engine/px/exchange/ob_px_ms_coord_op.h b/src/sql/engine/px/exchange/ob_px_ms_coord_op.h index 953e28597..80b3bfd63 100644 --- a/src/sql/engine/px/exchange/ob_px_ms_coord_op.h +++ b/src/sql/engine/px/exchange/ob_px_ms_coord_op.h @@ -145,6 +145,10 @@ private: int init_store_rows(int64_t n_ways); int setup_readers(); void destroy_readers(); + virtual void clean_dfos_dtl_interm_result() override + { + msg_proc_.clean_dtl_interm_result(ctx_); + } private: ObPxMSCoordOpEventListener listener_; ObSerialDfoScheduler serial_scheduler_; diff --git a/src/sql/engine/px/exchange/ob_px_ordered_coord_op.h b/src/sql/engine/px/exchange/ob_px_ordered_coord_op.h index 02d4561e9..6e82ebd7d 100644 --- a/src/sql/engine/px/exchange/ob_px_ordered_coord_op.h +++ b/src/sql/engine/px/exchange/ob_px_ordered_coord_op.h @@ -128,6 +128,10 @@ private: int setup_readers(); void destroy_readers(); int next_row(ObReceiveRowReader &reader, bool &wait_next_msg); + virtual void clean_dfos_dtl_interm_result() override + { + msg_proc_.clean_dtl_interm_result(ctx_); + } private: ObPxOrderedCoordOpEventListener listener_; ObSerialDfoScheduler serial_scheduler_; diff --git a/src/sql/engine/px/exchange/ob_px_receive_op.cpp b/src/sql/engine/px/exchange/ob_px_receive_op.cpp index a5d8833b2..328faf13f 100644 --- a/src/sql/engine/px/exchange/ob_px_receive_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_receive_op.cpp @@ -545,10 +545,19 @@ int ObPxReceiveOp::erase_dtl_interm_result() LOG_WARN("fail get channel info", K(ret)); } else { key.channel_id_ = ci.chid_; - key.batch_id_ = ctx_.get_px_batch_id(); - if (OB_FAIL(ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) { - LOG_TRACE("fail to release recieve internal result", K(ret)); + for (int64_t batch_id = ctx_.get_px_batch_id(); + batch_id < PX_RESCAN_BATCH_ROW_COUNT && OB_SUCC(ret); batch_id++) { + key.batch_id_ = batch_id; + if (OB_FAIL(ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) { + if (OB_HASH_NOT_EXIST == ret) { + ret = OB_SUCCESS; + break; + } else { + LOG_WARN("fail to release recieve internal result", K(ret), K(key)); + } + } } + LOG_TRACE("receive erase dtl interm res", K(i), K(get_spec().get_id()), K(ci), K(ctx_.get_px_batch_id())); } } return ret; diff --git a/src/sql/engine/px/ob_dfo.cpp b/src/sql/engine/px/ob_dfo.cpp index bae64073b..8c7c810ed 100644 --- a/src/sql/engine/px/ob_dfo.cpp +++ b/src/sql/engine/px/ob_dfo.cpp @@ -77,6 +77,8 @@ OB_SERIALIZE_MEMBER(ObSqcTableLocationKey, tablet_id_, is_dml_, is_loc_uncertain_); +OB_SERIALIZE_MEMBER(ObPxCleanDtlIntermResInfo, ch_total_info_, sqc_id_, task_count_); +OB_SERIALIZE_MEMBER(ObPxCleanDtlIntermResArgs, info_, batch_size_); int ObPxSqcMeta::assign(const ObPxSqcMeta &other) { diff --git a/src/sql/engine/px/ob_dfo.h b/src/sql/engine/px/ob_dfo.h index 2f5d7e1be..fbdb1299a 100644 --- a/src/sql/engine/px/ob_dfo.h +++ b/src/sql/engine/px/ob_dfo.h @@ -273,9 +273,8 @@ public: K_(qc_ch_info), K_(sqc_ch_info), K_(task_count), K_(max_task_count), K_(min_task_count), K_(thread_inited), K_(thread_finish), K_(px_int_id), - K_(is_fulltree), K_(is_rpc_worker), K_(transmit_use_interm_result), - K_(recieve_use_interm_result), K(temp_table_ctx_), K_(server_not_alive), - K_(adjoining_root_dfo), K_(is_single_tsc_leaf_dfo)); + K_(transmit_use_interm_result), + K_(recieve_use_interm_result), K_(serial_receive_channels)); private: uint64_t execution_id_; uint64_t qc_id_; @@ -579,7 +578,9 @@ public: K_(px_bloom_filter_mode), K_(px_bf_id), K_(pkey_table_loc_id), - K_(tsc_op_cnt)); + K_(tsc_op_cnt), + K_(transmit_ch_sets), + K_(receive_ch_sets_map)); private: DISALLOW_COPY_AND_ASSIGN(ObDfo); @@ -717,6 +718,47 @@ public: ObSqcSerializeCache ser_cache_; }; +struct ObPxCleanDtlIntermResInfo +{ + OB_UNIS_VERSION(1); +public: + ObPxCleanDtlIntermResInfo() : ch_total_info_(), sqc_id_(common::OB_INVALID_ID), task_count_(0) {} + ObPxCleanDtlIntermResInfo(dtl::ObDtlChTotalInfo &ch_info, int64_t sqc_id, int64_t task_count) : + ch_total_info_(ch_info), sqc_id_(sqc_id), task_count_(task_count) + {} + + ~ObPxCleanDtlIntermResInfo() { } + void reset() + { + ch_total_info_.reset(); + sqc_id_ = common::OB_INVALID_ID; + task_count_ = 0; + } + + TO_STRING_KV(K_(ch_total_info), K_(sqc_id), K_(task_count)); +public: + dtl::ObDtlChTotalInfo ch_total_info_; + int64_t sqc_id_; + int64_t task_count_; +}; + +class ObPxCleanDtlIntermResArgs +{ + OB_UNIS_VERSION(1); +public: + ObPxCleanDtlIntermResArgs() : info_(), batch_size_(0) {} + ~ObPxCleanDtlIntermResArgs() { } + void reset() + { + info_.reset(); + batch_size_ = 0; + } + + TO_STRING_KV(K_(info), K_(batch_size)); +public: + ObSEArray info_; + uint64_t batch_size_; +}; class ObPxTask { diff --git a/src/sql/engine/px/ob_dfo_scheduler.cpp b/src/sql/engine/px/ob_dfo_scheduler.cpp index 4133a9526..b1195912f 100644 --- a/src/sql/engine/px/ob_dfo_scheduler.cpp +++ b/src/sql/engine/px/ob_dfo_scheduler.cpp @@ -627,6 +627,84 @@ int ObSerialDfoScheduler::do_schedule_dfo(ObExecContext &ctx, ObDfo &dfo) const } +void ObSerialDfoScheduler::clean_dtl_interm_result(ObExecContext &exec_ctx) +{ + int ret = OB_SUCCESS; + const ObIArray &all_dfos = coord_info_.dfo_mgr_.get_all_dfos(); + ObDfo *last_dfo = all_dfos.at(all_dfos.count() - 1); + if (OB_NOT_NULL(last_dfo) && last_dfo->is_scheduled() && OB_NOT_NULL(last_dfo->parent()) + && last_dfo->parent()->is_root_dfo()) { + // all dfo scheduled, do nothing. + LOG_TRACE("all dfo scheduled."); + } else { + const ObDfo *root = coord_info_.dfo_mgr_.get_root_dfo(); + int64_t batch_size = coord_info_.get_rescan_param_count(); + ObSEArray dfos; + common::hash::ObHashMap map; + ObIAllocator &allocator = exec_ctx.get_allocator(); + for (int64_t i = 0; i < all_dfos.count(); i++) { + ObDfo *dfo = all_dfos.at(i); + ObDfo *parent = NULL; + if (OB_NOT_NULL(dfo) && dfo->is_scheduled() && NULL != (parent = dfo->parent()) + && !parent->is_root_dfo() && !parent->is_scheduled()) { + // if current dfo is scheduled but parent dfo is not scheduled. + for (int64_t j = 0; j < parent->get_sqcs_count(); j++) { + ObPxSqcMeta &sqc = parent->get_sqcs().at(j); + int64_t msg_idx = 0; + for (; msg_idx < sqc.get_serial_receive_channels().count(); msg_idx++) { + if (sqc.get_serial_receive_channels().at(msg_idx).get_child_dfo_id() == dfo->get_dfo_id()) { + break; + } + } + if (OB_LIKELY(msg_idx < sqc.get_serial_receive_channels().count())) { + ObPxCleanDtlIntermResArgs *arg = NULL; + if (!map.created() && OB_FAIL(map.create(8, "CleanDtlRes"))) { + LOG_WARN("create map failed", K(ret)); + } else if (OB_FAIL(map.get_refactored(sqc.get_exec_addr(), arg))) { + if (OB_HASH_NOT_EXIST == ret) { + void *buf = NULL; + if (OB_ISNULL(buf = allocator.alloc(sizeof(ObPxCleanDtlIntermResArgs)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("alloc failed", K(ret)); + } else { + arg = new(buf) ObPxCleanDtlIntermResArgs(); + arg->batch_size_ = coord_info_.get_rescan_param_count(); + if (OB_FAIL(map.set_refactored(sqc.get_exec_addr(), arg))) { + LOG_WARN("set refactored failed", K(ret)); + } + } + } else { + LOG_WARN("get refactored failed", K(ret)); + } + } + if (OB_SUCC(ret) && OB_NOT_NULL(arg)) { + ObPxReceiveDataChannelMsg &msg = sqc.get_serial_receive_channels().at(msg_idx); + if (OB_FAIL(arg->info_.push_back(ObPxCleanDtlIntermResInfo(msg.get_ch_total_info(), + sqc.get_sqc_id(), sqc.get_task_count())))) { + LOG_WARN("push back failed", K(ret)); + } + } + } + } + } + } + // ignore allocate, set_refactored and push_back failure. + // send rpc to addrs inserted into the map successfully. + if (OB_UNLIKELY(!map.empty())) { + LOG_TRACE("clean dtl res map", K(map.size())); + ObSQLSessionInfo *session = exec_ctx.get_my_session(); + uint64_t tenant_id = OB_NOT_NULL(session) ? session->get_effective_tenant_id() : OB_SYS_TENANT_ID; + auto iter = map.begin(); + for (; iter != map.end(); iter++) { + if (OB_FAIL(coord_info_.rpc_proxy_.to(iter->first).by(tenant_id).clean_dtl_interm_result(*iter->second, NULL))) { + LOG_WARN("send clean dtl interm result rpc failed", K(ret), K(iter->first), KPC(iter->second)); + } + LOG_TRACE("clean dtl res map", K(iter->first), K(*(iter->second))); + } + } + } +} + // -------------分割线----------- // 启动 DFO 的 SQC 线程 diff --git a/src/sql/engine/px/ob_dfo_scheduler.h b/src/sql/engine/px/ob_dfo_scheduler.h index 99d055ce3..d3bf742de 100644 --- a/src/sql/engine/px/ob_dfo_scheduler.h +++ b/src/sql/engine/px/ob_dfo_scheduler.h @@ -41,6 +41,7 @@ public: ObDfo &child, ObDfo &parent, bool is_parallel_scheduler = true) const; + virtual void clean_dtl_interm_result(ObExecContext &ctx) = 0; int build_data_xchg_ch(ObExecContext &ctx, ObDfo &child, ObDfo &parent) const; int build_data_mn_xchg_ch(ObExecContext &ctx, ObDfo &child, ObDfo &parent) const; virtual int init_all_dfo_channel(ObExecContext &ctx) const; @@ -69,6 +70,8 @@ public: virtual int init_all_dfo_channel(ObExecContext &ctx) const; virtual int dispatch_dtl_data_channel_info(ObExecContext &ctx, ObDfo &child, ObDfo &parent) const; virtual int try_schedule_next_dfo(ObExecContext &ctx) const; + virtual void clean_dtl_interm_result(ObExecContext &ctx) override; + private: int build_transmit_recieve_channel(ObExecContext &ctx, ObDfo *dfo) const; int init_dfo_channel(ObExecContext &ctx, ObDfo *child, ObDfo *parent) const; @@ -91,6 +94,7 @@ public: {} virtual int dispatch_dtl_data_channel_info(ObExecContext &ctx, ObDfo &child, ObDfo &parent) const; virtual int try_schedule_next_dfo(ObExecContext &ctx) const; + virtual void clean_dtl_interm_result(ObExecContext &ctx) override { UNUSED(ctx); } private: int dispatch_transmit_channel_info(ObExecContext &ctx, ObDfo &child, ObDfo &parent) const; int dispatch_receive_channel_info(ObExecContext &ctx, ObDfo &child, ObDfo &parent) const; diff --git a/src/sql/engine/px/ob_px_coord_op.cpp b/src/sql/engine/px/ob_px_coord_op.cpp index 9bf6c5fe2..b216fb495 100644 --- a/src/sql/engine/px/ob_px_coord_op.cpp +++ b/src/sql/engine/px/ob_px_coord_op.cpp @@ -567,6 +567,7 @@ int ObPxCoordOp::inner_close() LOG_WARN("release dtl channel failed", K(release_channel_ret)); } ctx_.del_extra_check(server_alive_checker_); + clean_dfos_dtl_interm_result(); LOG_TRACE("byebye. exit QC Coord"); return ret; } diff --git a/src/sql/engine/px/ob_px_coord_op.h b/src/sql/engine/px/ob_px_coord_op.h index d2d1f8711..2a01f1ce3 100644 --- a/src/sql/engine/px/ob_px_coord_op.h +++ b/src/sql/engine/px/ob_px_coord_op.h @@ -135,6 +135,8 @@ protected: int init_batch_info(); int batch_rescan(); int erase_dtl_interm_result(); + // send rpc to clean dtl interm result of not scheduled dfos. + virtual void clean_dfos_dtl_interm_result() = 0; protected: common::ObArenaAllocator allocator_; common::ObArenaAllocator row_allocator_; diff --git a/src/sql/engine/px/ob_px_rpc_processor.cpp b/src/sql/engine/px/ob_px_rpc_processor.cpp index 211483f83..48b674247 100644 --- a/src/sql/engine/px/ob_px_rpc_processor.cpp +++ b/src/sql/engine/px/ob_px_rpc_processor.cpp @@ -650,3 +650,40 @@ int ObPxTenantTargetMonitorP::process() } return ret; } + +int ObPxCleanDtlIntermResP::process() +{ + int ret = OB_SUCCESS; + dtl::ObDTLIntermResultKey key; + int64_t batch_size = 0 == arg_.batch_size_ ? 1 : arg_.batch_size_; + for (int64_t i = 0; i < arg_.info_.count(); i++) { + ObPxCleanDtlIntermResInfo &info = arg_.info_.at(i); + for (int64_t task_id = 0; task_id < info.task_count_; task_id++) { + ObPxTaskChSet ch_set; + if (OB_FAIL(ObDtlChannelUtil::get_receive_dtl_channel_set(info.sqc_id_, task_id, + info.ch_total_info_, ch_set))) { + LOG_WARN("get receive dtl channel set failed", K(ret)); + } else { + LOG_TRACE("ObPxCleanDtlIntermResP process", K(i), K(arg_.batch_size_), K(info), K(task_id), K(ch_set)); + for (int64_t ch_idx = 0; ch_idx < ch_set.count(); ch_idx++) { + key.channel_id_ = ch_set.get_ch_info_set().at(ch_idx).chid_; + for (int64_t batch_id = 0; batch_id < batch_size && OB_SUCC(ret); batch_id++) { + key.batch_id_= batch_id; + if (OB_FAIL(dtl::ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) { + if (OB_HASH_NOT_EXIST == ret) { + // interm result is written from batch_id = 0 to batch_size, + // if some errors happen when batch_id = i, no interm result of batch_id > i will be written. + // so if erase failed, just break and continue to erase interm result of next channel. + ret = OB_SUCCESS; + break; + } else { + LOG_WARN("fail to release recieve internal result", K(ret), K(ret)); + } + } + } + } + } + } + } + return ret; +} \ No newline at end of file diff --git a/src/sql/engine/px/ob_px_rpc_processor.h b/src/sql/engine/px/ob_px_rpc_processor.h index cc57aaf41..c73f4b9c7 100644 --- a/src/sql/engine/px/ob_px_rpc_processor.h +++ b/src/sql/engine/px/ob_px_rpc_processor.h @@ -193,6 +193,18 @@ private: const observer::ObGlobalContext &global_ctx_; }; +class ObPxCleanDtlIntermResP + : public obrpc::ObRpcProcessor > +{ +public: + ObPxCleanDtlIntermResP(const observer::ObGlobalContext &gctx) + {} + virtual ~ObPxCleanDtlIntermResP() = default; + virtual int init() final { return OB_SUCCESS; } + virtual void destroy() final {} + virtual int process() final; +}; + } // sql } // oceanbase diff --git a/src/sql/engine/px/ob_px_rpc_proxy.h b/src/sql/engine/px/ob_px_rpc_proxy.h index 5e2ef673d..ec05b2c80 100644 --- a/src/sql/engine/px/ob_px_rpc_proxy.h +++ b/src/sql/engine/px/ob_px_rpc_proxy.h @@ -38,6 +38,7 @@ public: RPC_AP(PR5 fast_init_sqc, OB_PX_FAST_INIT_SQC, (sql::ObPxRpcInitSqcArgs), sql::ObPxRpcInitSqcResponse); // px资源监控 RPC_S(PR5 fetch_statistics, OB_PX_TARGET_REQUEST, (sql::ObPxRpcFetchStatArgs), sql::ObPxRpcFetchStatResponse); + RPC_AP(PR5 clean_dtl_interm_result, OB_CLEAN_DTL_INTERM_RESULT, (sql::ObPxCleanDtlIntermResArgs)); }; } // obrpc diff --git a/src/sql/engine/px/ob_px_scheduler.cpp b/src/sql/engine/px/ob_px_scheduler.cpp index 4dfb47905..eb7c40485 100644 --- a/src/sql/engine/px/ob_px_scheduler.cpp +++ b/src/sql/engine/px/ob_px_scheduler.cpp @@ -128,6 +128,16 @@ int ObPxMsgProc::startup_msg_loop(ObExecContext &ctx) return ret; } +void ObPxMsgProc::clean_dtl_interm_result(ObExecContext &ctx) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(scheduler_)) { + LOG_WARN("dfo scheduler is null"); + } else { + scheduler_->clean_dtl_interm_result(ctx); + } +} + // 1. 根据 pkt 信息找到对应 dfo, sqc,标记当前 sqc 线程分配完成 // 2. 判断该 dfo 下是否所有 sqc 都分配线程完成 // 如果完成,则标记 dfo 为 thread_inited, 进入第 3 步,否则结束处理 diff --git a/src/sql/engine/px/ob_px_scheduler.h b/src/sql/engine/px/ob_px_scheduler.h index 1edbaaae4..4535ea338 100644 --- a/src/sql/engine/px/ob_px_scheduler.h +++ b/src/sql/engine/px/ob_px_scheduler.h @@ -178,6 +178,7 @@ public: int on_piece_msg(ObExecContext &ctx, const ObInitChannelPieceMsg &pkt); int on_piece_msg(ObExecContext &ctx, const ObReportingWFPieceMsg &pkt); int on_piece_msg(ObExecContext &ctx, const ObOptStatsGatherPieceMsg &pkt); + void clean_dtl_interm_result(ObExecContext &ctx); // end DATAHUB msg processing private: int do_cleanup_dfo(ObDfo &dfo); diff --git a/src/sql/engine/px/ob_px_util.h b/src/sql/engine/px/ob_px_util.h index 112f51be6..224f85e44 100644 --- a/src/sql/engine/px/ob_px_util.h +++ b/src/sql/engine/px/ob_px_util.h @@ -29,6 +29,7 @@ namespace oceanbase { namespace sql { +const int64_t PX_RESCAN_BATCH_ROW_COUNT = 8192; class ObIExtraStatusCheck; enum ObBcastOptimization { BC_TO_WORKER, diff --git a/src/sql/engine/subquery/ob_subplan_filter_op.h b/src/sql/engine/subquery/ob_subplan_filter_op.h index 9054153b3..fe99e6d52 100644 --- a/src/sql/engine/subquery/ob_subplan_filter_op.h +++ b/src/sql/engine/subquery/ob_subplan_filter_op.h @@ -15,6 +15,7 @@ #include "sql/engine/ob_operator.h" #include "sql/engine/basic/ob_chunk_datum_store.h" +#include "sql/engine/px/ob_px_util.h" namespace oceanbase { @@ -215,7 +216,6 @@ public: public: ObBatchRescanCtl &get_batch_rescan_ctl() { return batch_rescan_ctl_; } - static const int64_t PX_RESCAN_BATCH_ROW_COUNT = 8192; int handle_next_batch_with_px_rescan(const int64_t op_max_batch_size); int handle_next_batch_with_group_rescan(const int64_t op_max_batch_size); private: