diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index c0bffc6b6..33e9b1d07 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -1452,3 +1452,7 @@ DEF_BOOL(ob_proxy_readonly_transaction_routing_policy, OB_TENANT_PARAMETER, "tru DEF_BOOL(_enable_block_file_punch_hole, OB_CLUSTER_PARAMETER, "False", "specifies whether to punch whole when free blocks in block_file", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_BOOL(_ob_enable_px_for_inner_sql, OB_CLUSTER_PARAMETER, "true", + "specifies whether inner sql uses px. " + "The default value is TRUE. Value: TRUE: turned on FALSE: turned off", + ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); diff --git a/src/sql/code_generator/ob_code_generator_impl.cpp b/src/sql/code_generator/ob_code_generator_impl.cpp index 79d730db4..9ffd64ea4 100644 --- a/src/sql/code_generator/ob_code_generator_impl.cpp +++ b/src/sql/code_generator/ob_code_generator_impl.cpp @@ -3574,6 +3574,7 @@ int ObCodeGeneratorImpl::convert_set(ObLogSet& op, const PhyOpsDesc& child_ops, const ObIArray& search_order = op.get_search_ordering(); OZ(r_union->search_by_col_lists_.init(search_order.count())); + OZ(r_union->init_op_schema_obj(search_order.count())); ARRAY_FOREACH(search_order, i) { const ObRawExpr* raw_expr = search_order.at(i).expr_; @@ -3667,6 +3668,8 @@ int ObCodeGeneratorImpl::convert_internal_sort( int ret = OB_SUCCESS; if (OB_FAIL(sort.init_sort_columns(sort_column.count()))) { SQL_CG_LOG(WARN, "fail to init sort columns.", K(ret)); + } else if (OB_FAIL(sort.init_op_schema_obj(sort_column.count()))) { + LOG_WARN("fail to init sort schema obj array", K(ret)); } ARRAY_FOREACH(sort_column, i) { @@ -8715,6 +8718,9 @@ int ObCodeGeneratorImpl::fill_sort_columns(const ColumnIndexProviderImpl& idx_pr int64_t sort_idx = OB_INVALID_INDEX; if (OB_FAIL(merge_receive.init_sort_columns(sort_keys.count()))) { LOG_WARN("fail to init sort column", K(ret)); + } else if (NULL != phy_op && + OB_FAIL(phy_op->init_op_schema_obj(sort_keys.count()))) { + LOG_WARN("fail to get op schema obj array", K(ret)); } ARRAY_FOREACH(sort_keys, i) { diff --git a/src/sql/engine/ob_phy_operator.cpp b/src/sql/engine/ob_phy_operator.cpp index de7a2b741..69eb791d2 100644 --- a/src/sql/engine/ob_phy_operator.cpp +++ b/src/sql/engine/ob_phy_operator.cpp @@ -53,7 +53,7 @@ ObPhyOperator::ObPhyOperator(ObIAllocator& alloc) is_exact_rows_(false), type_(PHY_INVALID), plan_depth_(0), - op_schema_objs_() + op_schema_objs_(alloc) {} ObPhyOperator::~ObPhyOperator() @@ -1169,5 +1169,14 @@ int ObPhyOperator::try_open_and_get_operator_ctx( return ret; } +int ObPhyOperator::init_op_schema_obj(int64_t count) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(op_schema_objs_.init(count))) { + LOG_WARN("fail to init op schema obj", K(ret), K(count)); + } + return ret; +} + } // namespace sql } // namespace oceanbase diff --git a/src/sql/engine/ob_phy_operator.h b/src/sql/engine/ob_phy_operator.h index acf9c1a8c..935439328 100644 --- a/src/sql/engine/ob_phy_operator.h +++ b/src/sql/engine/ob_phy_operator.h @@ -829,6 +829,7 @@ public: return op_schema_objs_; } + int init_op_schema_obj(int64_t count); private: static const int64_t CHECK_STATUS_MASK = 0x3FF; // check_status for each 1024 rows protected: @@ -856,8 +857,7 @@ protected: bool is_exact_rows_; ObPhyOperatorType type_; // for GDB debug purpose, no need to serialize int32_t plan_depth_; // for plan cache explain - common::ObSEArray op_schema_objs_; - + common::ObFixedArray op_schema_objs_; private: DISALLOW_COPY_AND_ASSIGN(ObPhyOperator); }; diff --git a/src/sql/engine/px/exchange/ob_px_merge_sort_receive.cpp b/src/sql/engine/px/exchange/ob_px_merge_sort_receive.cpp index bdd1a1c69..c064a38b5 100644 --- a/src/sql/engine/px/exchange/ob_px_merge_sort_receive.cpp +++ b/src/sql/engine/px/exchange/ob_px_merge_sort_receive.cpp @@ -204,6 +204,12 @@ int ObPxMergeSortReceive::inner_close(ObExecContext& ctx) const if (release_merge_sort_ret != common::OB_SUCCESS) { LOG_WARN("release dtl channel failed", K(release_merge_sort_ret)); } + + // must erase after unlink channel + release_channel_ret = erase_dtl_interm_result(ctx); + if (release_channel_ret != common::OB_SUCCESS) { + LOG_TRACE("release interm result failed", KR(release_channel_ret)); + } } return ret; } diff --git a/src/sql/engine/px/exchange/ob_px_ms_receive_op.cpp b/src/sql/engine/px/exchange/ob_px_ms_receive_op.cpp index 70199a891..e08a8f2a3 100644 --- a/src/sql/engine/px/exchange/ob_px_ms_receive_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_ms_receive_op.cpp @@ -141,6 +141,11 @@ int ObPxMSReceiveOp::inner_close() if (release_merge_sort_ret != common::OB_SUCCESS) { LOG_WARN("release dtl channel failed", K(release_merge_sort_ret)); } + + release_channel_ret = erase_dtl_interm_result(); + if (release_channel_ret != common::OB_SUCCESS) { + LOG_TRACE("release interm result failed", KR(release_channel_ret)); + } return ret; } diff --git a/src/sql/engine/px/exchange/ob_px_receive.cpp b/src/sql/engine/px/exchange/ob_px_receive.cpp index a81b42257..1f4433892 100644 --- a/src/sql/engine/px/exchange/ob_px_receive.cpp +++ b/src/sql/engine/px/exchange/ob_px_receive.cpp @@ -294,6 +294,29 @@ int ObPxReceive::active_all_receive_channel(ObPxReceiveCtx& recv_ctx, ObExecCont return ret; } +int ObPxReceive::erase_dtl_interm_result(ObExecContext &ctx) const +{ + int ret = OB_SUCCESS; + dtl::ObDtlChannelInfo ci; + ObDTLIntermResultKey key; + ObPxReceiveCtx *recv_ctx = NULL; + if (OB_ISNULL(recv_ctx = GET_PHY_OPERATOR_CTX(ObPxReceiveCtx, ctx, get_id()))) { + LOG_DEBUG("The operator has not been opened.", K(ret), K_(id), "op_type", + ob_phy_operator_type_str(get_type())); + } else { + for (int i = 0; i < recv_ctx->get_ch_set().count(); ++i) { + if (OB_FAIL(recv_ctx->get_ch_set().get_channel_info(i, ci))) { + LOG_WARN("fail get channel info", K(ret)); + } else { + key.channel_id_ = ci.chid_; + if (OB_FAIL(ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) { + LOG_TRACE("fail to release recieve internal result", K(ret)); + } + } + } + } + return ret; +} ////////////////////////////////////////// ObPxFifoReceive::ObPxFifoReceive(common::ObIAllocator& alloc) : ObPxReceive(alloc) @@ -351,26 +374,11 @@ int ObPxFifoReceive::inner_close(ObExecContext& ctx) const /* we must release channel even if there is some error happen before */ if (OB_NOT_NULL(recv_ctx)) { - - int release_channel_ret = ObPxChannelUtil::flush_rows(recv_ctx->task_channels_); - if (release_channel_ret != common::OB_SUCCESS) { - LOG_WARN("release dtl channel failed", K(release_channel_ret)); + int release_channel_ret = ObPxChannelUtil::flush_rows(recv_ctx->task_channels_); + if (release_channel_ret != common::OB_SUCCESS) { + LOG_WARN("release dtl channel failed", K(release_channel_ret)); } - ObDTLIntermResultKey key; - ObDtlBasicChannel* channel = NULL; - ; - for (int i = 0; i < recv_ctx->task_channels_.count(); ++i) { - channel = static_cast(recv_ctx->task_channels_.at(i)); - key.channel_id_ = channel->get_id(); - if (channel->use_interm_result()) { - release_channel_ret = ObDTLIntermResultManager::getInstance().erase_interm_result_info(key); - if (release_channel_ret != common::OB_SUCCESS) { - LOG_WARN("fail to release recieve internal result", KR(release_channel_ret), K(ret)); - } - } - } - - dtl::ObDtlChannelLoop& loop = recv_ctx->msg_loop_; + dtl::ObDtlChannelLoop &loop = recv_ctx->msg_loop_; release_channel_ret = loop.unregister_all_channel(); if (release_channel_ret != common::OB_SUCCESS) { // the following unlink actions is not safe is any unregister failure happened @@ -380,6 +388,11 @@ int ObPxFifoReceive::inner_close(ObExecContext& ctx) const if (release_channel_ret != common::OB_SUCCESS) { LOG_WARN("release dtl channel failed", KR(release_channel_ret)); } + // must erase after unlink channel + release_channel_ret = erase_dtl_interm_result(ctx); + if (release_channel_ret != common::OB_SUCCESS) { + LOG_TRACE("release interm result failed", KR(release_channel_ret)); + } } return ret; } diff --git a/src/sql/engine/px/exchange/ob_px_receive.h b/src/sql/engine/px/exchange/ob_px_receive.h index c41c99c5d..af163678a 100644 --- a/src/sql/engine/px/exchange/ob_px_receive.h +++ b/src/sql/engine/px/exchange/ob_px_receive.h @@ -180,6 +180,7 @@ protected: ObPxTaskChSet& ch_set, common::ObIArray& channels, dtl::ObDtlFlowControl* dfc = nullptr); int get_sqc_id(ObExecContext& ctx, int64_t& sqc_id) const; + int erase_dtl_interm_result(ObExecContext &ctx) const; private: /* functions */ /* variables */ diff --git a/src/sql/engine/px/exchange/ob_px_receive_op.cpp b/src/sql/engine/px/exchange/ob_px_receive_op.cpp index e55978867..9f69ad600 100644 --- a/src/sql/engine/px/exchange/ob_px_receive_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_receive_op.cpp @@ -302,6 +302,24 @@ int ObPxReceiveOp::active_all_receive_channel() } return ret; } + +int ObPxReceiveOp::erase_dtl_interm_result() +{ + int ret = OB_SUCCESS; + dtl::ObDtlChannelInfo ci; + ObDTLIntermResultKey key; + for (int i = 0; i < get_ch_set().count(); ++i) { + if (OB_FAIL(get_ch_set().get_channel_info(i, ci))) { + LOG_WARN("fail get channel info", K(ret)); + } else { + key.channel_id_ = ci.chid_; + if (OB_FAIL(ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) { + LOG_TRACE("fail to release recieve internal result", K(ret)); + } + } + } + return ret; +} //------------- end ObPxReceiveOp----------------- ObPxFifoReceiveOp::ObPxFifoReceiveOp(ObExecContext& exec_ctx, const ObOpSpec& spec, ObOpInput* input) @@ -316,26 +334,14 @@ int ObPxFifoReceiveOp::inner_open() int ObPxFifoReceiveOp::inner_close() { int ret = OB_SUCCESS; + int release_channel_ret = common::OB_SUCCESS; /* we must release channel even if there is some error happen before */ if (channel_linked_) { - int release_channel_ret = ObPxChannelUtil::flush_rows(task_channels_); + release_channel_ret = ObPxChannelUtil::flush_rows(task_channels_); if (release_channel_ret != common::OB_SUCCESS) { LOG_WARN("release dtl channel failed", K(release_channel_ret)); } - ObDTLIntermResultKey key; - ObDtlBasicChannel *channel = NULL; int64_t recv_cnt = 0; - for (int i = 0; i < task_channels_.count(); ++i) { - channel = static_cast(task_channels_.at(i)); - key.channel_id_ = channel->get_id(); - recv_cnt += channel->get_recv_buffer_cnt(); - if (channel->use_interm_result()) { - release_channel_ret = ObDTLIntermResultManager::getInstance().erase_interm_result_info(key); - if (release_channel_ret != common::OB_SUCCESS) { - LOG_WARN("fail to release recieve internal result", K(ret)); - } - } - } op_monitor_info_.otherstat_3_id_ = ObSqlMonitorStatIds::DTL_SEND_RECV_COUNT; op_monitor_info_.otherstat_3_value_ = recv_cnt; release_channel_ret = msg_loop_.unregister_all_channel(); @@ -348,6 +354,11 @@ int ObPxFifoReceiveOp::inner_close() LOG_WARN("release dtl channel failed", K(release_channel_ret)); } } + // must erase after unlink channel + release_channel_ret = erase_dtl_interm_result(); + if (release_channel_ret != common::OB_SUCCESS) { + LOG_TRACE("release interm result failed", KR(release_channel_ret)); + } return ret; } diff --git a/src/sql/engine/px/exchange/ob_px_receive_op.h b/src/sql/engine/px/exchange/ob_px_receive_op.h index 59d3e6e7f..41bdc7c15 100644 --- a/src/sql/engine/px/exchange/ob_px_receive_op.h +++ b/src/sql/engine/px/exchange/ob_px_receive_op.h @@ -123,7 +123,7 @@ public: } int64_t get_sqc_id(); - + int erase_dtl_interm_result(); public: void reset_for_rescan() { diff --git a/src/sql/engine/px/ob_granule_iterator.cpp b/src/sql/engine/px/ob_granule_iterator.cpp index 7d1c23c7e..a9b71490e 100644 --- a/src/sql/engine/px/ob_granule_iterator.cpp +++ b/src/sql/engine/px/ob_granule_iterator.cpp @@ -207,6 +207,7 @@ void ObGranuleIterator::ObGranuleIteratorCtx::destroy() ranges_.reset(); pkeys_.reset(); rescan_tasks_.reset(); + pwj_rescan_task_infos_.reset(); } int ObGranuleIterator::ObGranuleIteratorCtx::parameters_init(const ObGIInput* input) @@ -323,9 +324,17 @@ int ObGranuleIterator::rescan(ObExecContext& ctx) const gi_ctx->state_ = GI_GET_NEXT_GRANULE_TASK; } } else { - ret = OB_NOT_SUPPORTED; - LOG_WARN("the partition wise join GI rescan not supported", K(ret)); + // 在partition_wise_join的情况, 按woker第一次完整执行所抢占的任务执行. + // 在执行过程中缓存住了自己的任务队列. + if (GI_UNINITIALIZED == gi_ctx->state_ || GI_PREPARED == gi_ctx->state_) { + /*do nothing*/ + } else { + gi_ctx->is_rescan_ = true; + gi_ctx->rescan_task_idx_ = 0; + gi_ctx->state_ = GI_GET_NEXT_GRANULE_TASK; + } } + return ret; } @@ -570,12 +579,24 @@ int ObGranuleIterator::do_get_next_granule_task( } } } - if (OB_FAIL(fetch_full_pw_tasks(exec_ctx, gi_task_infos, op_ids))) { + if (OB_FAIL(ret)) { + } else if (gi_ctx.is_rescan_) { + if (gi_ctx.rescan_task_idx_ >= gi_ctx.pwj_rescan_task_infos_.count()) { + ret = OB_ITER_END; + gi_ctx.state_ = GI_END; + } + } else if (OB_FAIL(fetch_full_pw_tasks(exec_ctx, gi_task_infos, op_ids))) { if (OB_ITER_END != ret) { LOG_WARN("try fetch task failed", K(ret)); } else { gi_ctx.state_ = GI_END; } + } else { + for (int i = 0; i < gi_task_infos.count() && OB_SUCC(ret); ++i) { + if (OB_FAIL(gi_ctx.pwj_rescan_task_infos_.push_back(gi_task_infos.at(i)))) { + LOG_WARN("fail to rescan pwj task info", K(ret)); + } + } } ARRAY_FOREACH_X(op_ids, idx, cnt, OB_SUCC(ret)) { @@ -585,7 +606,16 @@ int ObGranuleIterator::do_get_next_granule_task( } } if (OB_SUCC(ret)) { - if (OB_FAIL(gi_prepare_map->set_refactored(op_ids.at(idx), gi_task_infos.at(idx)))) { + if (gi_ctx.is_rescan_) { + if (gi_ctx.rescan_task_idx_ >= gi_ctx.pwj_rescan_task_infos_.count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("rescan task idx is unexpected", K(gi_ctx.rescan_task_idx_), + K(gi_ctx.pwj_rescan_task_infos_.count()), K(cnt)); + } else if (OB_FAIL(gi_prepare_map->set_refactored(op_ids.at(idx), + gi_ctx.pwj_rescan_task_infos_.at(gi_ctx.rescan_task_idx_++)))) { + LOG_WARN("reset table scan's ranges failed", K(ret)); + } + } else if (OB_FAIL(gi_prepare_map->set_refactored(op_ids.at(idx), gi_task_infos.at(idx)))) { LOG_WARN("reset table scan's ranges failed", K(ret)); } LOG_DEBUG("produce a gi task(PWJ)", K(op_ids.at(idx)), K(gi_task_infos.at(idx))); diff --git a/src/sql/engine/px/ob_granule_iterator.h b/src/sql/engine/px/ob_granule_iterator.h index b94724a8b..cb97c65f4 100644 --- a/src/sql/engine/px/ob_granule_iterator.h +++ b/src/sql/engine/px/ob_granule_iterator.h @@ -119,7 +119,8 @@ public: all_task_fetched_(false), is_rescan_(false), rescan_taskset_(nullptr), - rescan_task_idx_(0) + rescan_task_idx_(0), + pwj_rescan_task_infos_() {} virtual ~ObGranuleIteratorCtx() { @@ -146,6 +147,7 @@ public: const ObGITaskSet* rescan_taskset_ = NULL; common::ObSEArray rescan_tasks_; int64_t rescan_task_idx_; + common::ObSEArray pwj_rescan_task_infos_; }; public: diff --git a/src/sql/engine/px/ob_granule_iterator_op.cpp b/src/sql/engine/px/ob_granule_iterator_op.cpp index 00d60cbc7..d73d6e0d3 100644 --- a/src/sql/engine/px/ob_granule_iterator_op.cpp +++ b/src/sql/engine/px/ob_granule_iterator_op.cpp @@ -220,7 +220,8 @@ ObGranuleIteratorOp::ObGranuleIteratorOp(ObExecContext& exec_ctx, const ObOpSpec all_task_fetched_(false), is_rescan_(false), rescan_taskset_(nullptr), - rescan_task_idx_(0) + rescan_task_idx_(0), + pwj_rescan_task_infos_() {} void ObGranuleIteratorOp::destroy() @@ -228,6 +229,7 @@ void ObGranuleIteratorOp::destroy() ranges_.reset(); pkeys_.reset(); rescan_tasks_.reset(); + pwj_rescan_task_infos_.reset(); } int ObGranuleIteratorOp::parameters_init() @@ -360,8 +362,15 @@ int ObGranuleIteratorOp::rescan() state_ = GI_GET_NEXT_GRANULE_TASK; } } else { - ret = OB_NOT_SUPPORTED; - LOG_WARN("the partition wise join GI rescan not supported", K(ret)); + // 在partition_wise_join的情况, 按woker第一次完整执行所抢占的任务执行. + // 在执行过程中缓存住了自己的任务队列. + if (GI_UNINITIALIZED == state_ || GI_PREPARED == state_) { + /*do nothing*/ + } else { + is_rescan_ = true; + rescan_task_idx_ = 0; + state_ = GI_GET_NEXT_GRANULE_TASK; + } } return ret; } @@ -512,12 +521,24 @@ int ObGranuleIteratorOp::do_get_next_granule_task(bool prepare /* = false */) } } } - if (OB_FAIL(fetch_full_pw_tasks(gi_task_infos, op_ids))) { + if (OB_FAIL(ret)) { + } else if (is_rescan_) { + if (rescan_task_idx_ >= pwj_rescan_task_infos_.count()) { + ret = OB_ITER_END; + state_ = GI_END; + } + } else if (OB_FAIL(fetch_full_pw_tasks(gi_task_infos, op_ids))) { if (OB_ITER_END != ret) { LOG_WARN("try fetch task failed", K(ret)); } else { state_ = GI_END; } + } else { + for (int i = 0; i < gi_task_infos.count() && OB_SUCC(ret); ++i) { + if (OB_FAIL(pwj_rescan_task_infos_.push_back(gi_task_infos.at(i)))) { + LOG_WARN("fail to rescan pwj task info", K(ret)); + } + } } ARRAY_FOREACH_X(op_ids, idx, cnt, OB_SUCC(ret)) @@ -527,11 +548,21 @@ int ObGranuleIteratorOp::do_get_next_granule_task(bool prepare /* = false */) LOG_WARN("failed to erase task", K(ret)); } } - if (OB_FAIL(ret)) { - } else if (OB_FAIL(gi_prepare_map->set_refactored(op_ids.at(idx), gi_task_infos.at(idx)))) { - LOG_WARN("reset table scan's ranges failed", K(ret)); + if (OB_SUCC(ret)) { + if (is_rescan_) { + if (rescan_task_idx_ >= pwj_rescan_task_infos_.count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("rescan task idx is unexpected", K(rescan_task_idx_), + K(pwj_rescan_task_infos_.count()), K(cnt)); + } else if (OB_FAIL(gi_prepare_map->set_refactored(op_ids.at(idx), + pwj_rescan_task_infos_.at(rescan_task_idx_++)))) { + LOG_WARN("reset table scan's ranges failed", K(ret)); + } + } else if (OB_FAIL(gi_prepare_map->set_refactored(op_ids.at(idx), gi_task_infos.at(idx)))) { + LOG_WARN("reset table scan's ranges failed", K(ret)); + } + LOG_DEBUG("produce a gi task(PWJ)", K(op_ids.at(idx)), K(gi_task_infos.at(idx))); } - LOG_DEBUG("produce a gi task(PWJ)", K(op_ids.at(idx)), K(gi_task_infos.at(idx))); } if (OB_SUCC(ret)) { diff --git a/src/sql/engine/px/ob_granule_iterator_op.h b/src/sql/engine/px/ob_granule_iterator_op.h index d4bf26d2b..4fdb79766 100644 --- a/src/sql/engine/px/ob_granule_iterator_op.h +++ b/src/sql/engine/px/ob_granule_iterator_op.h @@ -232,6 +232,9 @@ private: const ObGITaskSet* rescan_taskset_ = NULL; common::ObSEArray rescan_tasks_; int64_t rescan_task_idx_; + // full pwj场景下, 在执行过程中缓存住了自己的任务队列. + // 供GI rescan使用 + common::ObSEArray pwj_rescan_task_infos_; }; } // end namespace sql diff --git a/src/sql/engine/px/ob_px_coord.cpp b/src/sql/engine/px/ob_px_coord.cpp index e863d4b0d..b5a9b022e 100644 --- a/src/sql/engine/px/ob_px_coord.cpp +++ b/src/sql/engine/px/ob_px_coord.cpp @@ -710,6 +710,8 @@ int ObPxCoord::wait_all_running_dfos_exit(ObExecContext& ctx) const loop.ignore_interrupt(); ObPxControlChannelProc control_channels; + int64_t times_offset = 0; + int64_t last_timestamp = 0; bool wait_msg = true; while (OB_SUCC(ret) && wait_msg) { ObDtlChannelLoop& loop = px_ctx.msg_loop_; @@ -718,7 +720,7 @@ int ObPxCoord::wait_all_running_dfos_exit(ObExecContext& ctx) const /** * start to get next msg. */ - if (OB_FAIL(check_all_sqc(active_dfos, all_dfo_terminate))) { + if (OB_FAIL(check_all_sqc(active_dfos, times_offset++, all_dfo_terminate, last_timestamp))) { LOG_WARN("fail to check sqc"); } else if (all_dfo_terminate) { wait_msg = false; @@ -762,7 +764,10 @@ int ObPxCoord::wait_all_running_dfos_exit(ObExecContext& ctx) const return ret; } -int ObPxCoord::check_all_sqc(ObIArray& active_dfos, bool& all_dfo_terminate) const +int ObPxCoord::check_all_sqc(ObIArray &active_dfos, + int64_t times_offset, + bool &all_dfo_terminate, + int64_t &last_timestamp) const { int ret = OB_SUCCESS; all_dfo_terminate = true; @@ -781,6 +786,15 @@ int ObPxCoord::check_all_sqc(ObIArray& active_dfos, bool& all_dfo_termin LOG_WARN("NULL unexpected sqc", K(ret)); } else if (sqc->need_report()) { LOG_DEBUG("wait for sqc", K(sqc)); + int64_t cur_timestamp = ObTimeUtility::current_time(); + // > 1s, increase gradually + // In order to get the dfo to propose as soon as possible and + // In order to avoid the interruption that is not received, + // So the interruption needs to be sent repeatedly + if (cur_timestamp - last_timestamp > (1000000 + min(times_offset, 10) * 1000000)) { + last_timestamp = cur_timestamp; + ObInterruptUtil::broadcast_dfo(active_dfos.at(i), OB_GOT_SIGNAL_ABORTING); + } all_dfo_terminate = false; break; } diff --git a/src/sql/engine/px/ob_px_coord.h b/src/sql/engine/px/ob_px_coord.h index c066c71bd..82061b57e 100644 --- a/src/sql/engine/px/ob_px_coord.h +++ b/src/sql/engine/px/ob_px_coord.h @@ -162,11 +162,14 @@ protected: int register_first_buffer_cache(ObExecContext& ctx, ObPxCoordCtx& px_ctx, ObDfo* root_dfo) const; void unregister_first_buffer_cache(ObExecContext& ctx, ObPxCoordCtx& px_ctx) const; - int check_all_sqc(common::ObIArray& active_dfos, bool& all_dfo_terminate) const; - int calc_allocated_worker_count( int64_t px_expected, int64_t query_expected, int64_t query_allocated, int64_t& allocated_worker_count) const; + int check_all_sqc(common::ObIArray &active_dfos, + int64_t time_offset, + bool &all_dfo_terminate, + int64_t &cur_timestamp) const; + int register_interrupt(ObPxCoordCtx* px_ctx) const; void clear_interrupt(ObPxCoordCtx* px_ctx) const; diff --git a/src/sql/engine/px/ob_px_coord_op.cpp b/src/sql/engine/px/ob_px_coord_op.cpp index 6c88185e0..969927bb2 100644 --- a/src/sql/engine/px/ob_px_coord_op.cpp +++ b/src/sql/engine/px/ob_px_coord_op.cpp @@ -649,6 +649,8 @@ int ObPxCoordOp::wait_all_running_dfos_exit() loop.ignore_interrupt(); ObPxControlChannelProc control_channels; + int64_t times_offset = 0; + int64_t last_timestamp = 0; bool wait_msg = true; while (OB_SUCC(ret) && wait_msg) { ObDtlChannelLoop& loop = msg_loop_; @@ -657,7 +659,7 @@ int ObPxCoordOp::wait_all_running_dfos_exit() /** * start to get next msg. */ - if (OB_FAIL(check_all_sqc(active_dfos, all_dfo_terminate))) { + if (OB_FAIL(check_all_sqc(active_dfos, times_offset++, all_dfo_terminate, last_timestamp))) { LOG_WARN("fail to check sqc"); } else if (all_dfo_terminate) { wait_msg = false; @@ -701,7 +703,10 @@ int ObPxCoordOp::wait_all_running_dfos_exit() return ret; } -int ObPxCoordOp::check_all_sqc(ObIArray& active_dfos, bool& all_dfo_terminate) +int ObPxCoordOp::check_all_sqc(ObIArray &active_dfos, + int64_t times_offset, + bool &all_dfo_terminate, + int64_t &last_timestamp) { int ret = OB_SUCCESS; all_dfo_terminate = true; @@ -720,6 +725,15 @@ int ObPxCoordOp::check_all_sqc(ObIArray& active_dfos, bool& all_dfo_term LOG_WARN("NULL unexpected sqc", K(ret)); } else if (sqc->need_report()) { LOG_DEBUG("wait for sqc", K(sqc)); + int64_t cur_timestamp = ObTimeUtility::current_time(); + // > 1s, increase gradually + // In order to get the dfo to propose as soon as possible and + // In order to avoid the interruption that is not received, + // So the interruption needs to be sent repeatedly + if (cur_timestamp - last_timestamp > (1000000 + min(times_offset, 10) * 1000000)) { + last_timestamp = cur_timestamp; + ObInterruptUtil::broadcast_dfo(active_dfos.at(i), OB_GOT_SIGNAL_ABORTING); + } all_dfo_terminate = false; break; } diff --git a/src/sql/engine/px/ob_px_coord_op.h b/src/sql/engine/px/ob_px_coord_op.h index afefacb48..59f498aa3 100644 --- a/src/sql/engine/px/ob_px_coord_op.h +++ b/src/sql/engine/px/ob_px_coord_op.h @@ -119,7 +119,10 @@ protected: int register_first_buffer_cache(ObDfo* root_dfo); void unregister_first_buffer_cache(); - int check_all_sqc(common::ObIArray& active_dfos, bool& all_dfo_terminate); + int check_all_sqc(common::ObIArray &active_dfos, + int64_t time_offset, + bool &all_dfo_terminate, + int64_t &cur_timestamp); int calc_allocated_worker_count( int64_t px_expected, int64_t query_expected, int64_t query_allocated, int64_t& allocated_worker_count); diff --git a/src/sql/engine/px/ob_px_rpc_processor.cpp b/src/sql/engine/px/ob_px_rpc_processor.cpp index a1c4e45d6..792155fb1 100644 --- a/src/sql/engine/px/ob_px_rpc_processor.cpp +++ b/src/sql/engine/px/ob_px_rpc_processor.cpp @@ -167,8 +167,8 @@ int ObInitTaskP::after_process() return OB_NOT_SUPPORTED; } -void ObFastInitSqcReportQCMessageCall::operator()( - hash::HashMapPair& entry) +void ObFastInitSqcReportQCMessageCall::operator()(hash::HashMapPair &entry) { UNUSED(entry); if (OB_NOT_NULL(sqc_)) { @@ -299,7 +299,7 @@ int ObFastInitSqcCB::deal_with_rpc_timeout_err_safely() int ret = OB_SUCCESS; ObDealWithRpcTimeoutCall call(addr_, retry_info_, timeout_ts_, trace_id_); call.ret_ = OB_TIMEOUT; - ObGlobalInterruptManager* manager = ObGlobalInterruptManager::getInstance(); + ObGlobalInterruptManager *manager = ObGlobalInterruptManager::getInstance(); if (OB_NOT_NULL(manager)) { if (OB_FAIL(manager->get_map().atomic_refactored(interrupt_id_, call))) { LOG_WARN("fail to deal with rpc timeout call", K(interrupt_id_)); diff --git a/src/sql/engine/px/ob_px_rpc_processor.h b/src/sql/engine/px/ob_px_rpc_processor.h index 59990e227..1612c227b 100644 --- a/src/sql/engine/px/ob_px_rpc_processor.h +++ b/src/sql/engine/px/ob_px_rpc_processor.h @@ -101,34 +101,43 @@ public: {} ~ObFastInitSqcReportQCMessageCall() = default; void operator()(hash::HashMapPair& entry); - private: ObPxSqcMeta* sqc_; }; -class ObDealWithRpcTimeoutCall { +class ObDealWithRpcTimeoutCall +{ public: - ObDealWithRpcTimeoutCall( - common::ObAddr addr, ObQueryRetryInfo* retry_info, int64_t timeout_ts, common::ObCurTraceId::TraceId& trace_id) - : addr_(addr), retry_info_(retry_info), timeout_ts_(timeout_ts), trace_id_(trace_id), ret_(common::OB_TIMEOUT) - {} + ObDealWithRpcTimeoutCall(common::ObAddr addr, + ObQueryRetryInfo *retry_info, + int64_t timeout_ts, + common::ObCurTraceId::TraceId &trace_id) : addr_(addr), retry_info_(retry_info), + timeout_ts_(timeout_ts), trace_id_(trace_id), ret_(common::OB_TIMEOUT) {} ~ObDealWithRpcTimeoutCall() = default; - void operator()(hash::HashMapPair& entry); + void operator() (hash::HashMapPair &entry); void deal_with_rpc_timeout_err(); - public: common::ObAddr addr_; - ObQueryRetryInfo* retry_info_; + ObQueryRetryInfo *retry_info_; int64_t timeout_ts_; common::ObCurTraceId::TraceId trace_id_; int ret_; }; -class ObFastInitSqcCB : public obrpc::ObPxRpcProxy::AsyncCB { +class ObFastInitSqcCB + : public obrpc::ObPxRpcProxy::AsyncCB +{ public: - ObFastInitSqcCB(const common::ObAddr& server, const common::ObCurTraceId::TraceId& trace_id, - ObQueryRetryInfo* retry_info, int64_t timeout_ts, ObInterruptibleTaskID tid, ObPxSqcMeta* sqc) - : addr_(server), retry_info_(retry_info), timeout_ts_(timeout_ts), interrupt_id_(tid), sqc_(sqc) + ObFastInitSqcCB(const common::ObAddr &server, + const common::ObCurTraceId::TraceId &trace_id, + ObQueryRetryInfo *retry_info, + int64_t timeout_ts, + ObInterruptibleTaskID tid, + ObPxSqcMeta *sqc) + : addr_(server), retry_info_(retry_info), + timeout_ts_(timeout_ts), interrupt_id_(tid), + sqc_(sqc) { trace_id_.set(trace_id); } @@ -149,10 +158,7 @@ public: } return newcb; } - virtual void set_args(const Request& arg) - { - UNUSED(arg); - } + virtual void set_args(const Request &arg) { UNUSED(arg); } int deal_with_rpc_timeout_err_safely(); void interrupt_qc(int err); diff --git a/src/sql/ob_index_sstable_builder.cpp b/src/sql/ob_index_sstable_builder.cpp index 4caaa80ca..ea71c5859 100644 --- a/src/sql/ob_index_sstable_builder.cpp +++ b/src/sql/ob_index_sstable_builder.cpp @@ -931,7 +931,10 @@ int ObIndexSSTableBuilder::gen_build_macro(ObPhysicalPlan& phy_plan, ObPhyOperat } else { sort->set_column_count(columns.count()); if (OB_FAIL(sort->init_sort_columns(index_table_->get_rowkey_column_num()))) { - LOG_WARN("init sort column failed", K(ret), "rowkey_cnt", index_table_->get_rowkey_column_num()); + LOG_WARN("init sort column failed", + K(ret), "rowkey_cnt", index_table_->get_rowkey_column_num()); + } else if (OB_FAIL(sort->init_op_schema_obj(index_table_->get_rowkey_column_num()))) { + LOG_WARN("fail to init op schema obj", K(ret)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < index_table_->get_rowkey_column_num(); i++) { const bool ascending = true; diff --git a/src/sql/optimizer/ob_log_window_function.cpp b/src/sql/optimizer/ob_log_window_function.cpp index 40fa454aa..1a074f842 100644 --- a/src/sql/optimizer/ob_log_window_function.cpp +++ b/src/sql/optimizer/ob_log_window_function.cpp @@ -514,9 +514,6 @@ int ObLogWindowFunction::get_win_partition_intersect_exprs( bool ObLogWindowFunction::is_block_op() const { bool is_block_op = true; - // 对于window function算子, 在没有partition by以及完整窗口情况下, - // 所有数据作为一个窗口, 认为是block算子 - // 在其他情况下, 认为是非block算子 ObWinFunRawExpr* win_expr = NULL; for (int64_t i = 0; i < win_exprs_.count(); ++i) { if (OB_ISNULL(win_expr = win_exprs_.at(i))) { diff --git a/src/sql/resolver/dml/ob_dml_resolver.cpp b/src/sql/resolver/dml/ob_dml_resolver.cpp index f9958206c..e0fd73dc0 100644 --- a/src/sql/resolver/dml/ob_dml_resolver.cpp +++ b/src/sql/resolver/dml/ob_dml_resolver.cpp @@ -2006,6 +2006,9 @@ int ObDMLResolver::resolve_base_or_alias_table_item_normal(uint64_t tenant_id, c K(params_.contain_dml_), K(is_inner_table(tschema->get_table_id())), K(session_info_->is_inner())); + } else if (!GCONF._ob_enable_px_for_inner_sql && + ObSQLSessionInfo::USER_SESSION != session_info_->get_session_type()) { + stmt->get_query_ctx()->forbid_use_px_ = true; } else { // use px, including PL, inner sql, inner connection sql triggered by CMD }