diff --git a/deps/oblib/src/lib/utility/ob_tracepoint.h b/deps/oblib/src/lib/utility/ob_tracepoint.h index a225ca570e..ab52660ae3 100644 --- a/deps/oblib/src/lib/utility/ob_tracepoint.h +++ b/deps/oblib/src/lib/utility/ob_tracepoint.h @@ -603,6 +603,7 @@ class EventTable // 600-700 For PX use EN_PX_SQC_EXECUTE_FAILED = 600, EN_PX_SQC_INIT_FAILED = 601, + EN_PX_SQC_INIT_PROCESS_FAILED = 602, // please add new trace point after 700 or before 600 // Compaction Related 700-750 diff --git a/src/share/interrupt/ob_global_interrupt_call.cpp b/src/share/interrupt/ob_global_interrupt_call.cpp index c6e74102c1..a8548a3053 100644 --- a/src/share/interrupt/ob_global_interrupt_call.cpp +++ b/src/share/interrupt/ob_global_interrupt_call.cpp @@ -65,7 +65,10 @@ void ObInterruptChecker::clear_status() interrupted_ = false; array_pos_ = 0; ref_count_ = 0; - MEMSET(interrupt_code_array_, 0, T_ARRAY_SIZE); + for (int idx = 0; idx < T_ARRAY_SIZE; ++idx) + { + interrupt_code_array_[idx].reset(); + } } void ObInterruptChecker::clear_interrupt_status() diff --git a/src/share/interrupt/ob_interrupt_rpc_proxy.h b/src/share/interrupt/ob_interrupt_rpc_proxy.h index a5686c8377..7be93635b9 100644 --- a/src/share/interrupt/ob_interrupt_rpc_proxy.h +++ b/src/share/interrupt/ob_interrupt_rpc_proxy.h @@ -94,6 +94,11 @@ public: : code_(code), info_(from_tid, from_svr_addr, extra_msg) { } + void reset() + { + code_ = 0; + info_.reset(); + } int code_; // Interrupt number obrpc::ObInterruptStackInfo info_; TO_STRING_KV(K_(code), K_(info)); diff --git a/src/sql/engine/px/ob_dfo.h b/src/sql/engine/px/ob_dfo.h index 3de9f5f773..8c2dc6e5b6 100644 --- a/src/sql/engine/px/ob_dfo.h +++ b/src/sql/engine/px/ob_dfo.h @@ -385,7 +385,6 @@ public: px_bf_id_(OB_INVALID_ID), use_filter_ch_map_(), total_task_cnt_(0), - ignore_vtable_error_(false), pkey_table_loc_id_(0), tsc_op_cnt_(0) { @@ -551,8 +550,6 @@ public: return (dfo_id >= 0 && dfo_id <= MAX_DFO_ID) || (dfo_id == MAX_DFO_ID); } - void set_ignore_vtable_error(bool flag) { ignore_vtable_error_ = flag; } - bool is_ignore_vtable_error() { return ignore_vtable_error_; } void set_pkey_table_loc_id(int64_t id) { pkey_table_loc_id_ = id; } int64_t get_pkey_table_loc_id() { return pkey_table_loc_id_; }; void inc_tsc_op_cnt() { tsc_op_cnt_++; } @@ -647,7 +644,6 @@ private: int64_t px_bf_id_; //记录px_bloom_filter_id ObPxBloomFilterChInfo use_filter_ch_map_; // use and create channel info is same int64_t total_task_cnt_; // the task total count of dfo start worker - bool ignore_vtable_error_; int64_t pkey_table_loc_id_; // record pkey table loc id for child dfo int64_t tsc_op_cnt_; }; diff --git a/src/sql/engine/px/ob_dfo_mgr.cpp b/src/sql/engine/px/ob_dfo_mgr.cpp index a6cf8e1e2a..33207f4351 100644 --- a/src/sql/engine/px/ob_dfo_mgr.cpp +++ b/src/sql/engine/px/ob_dfo_mgr.cpp @@ -22,6 +22,7 @@ #include "sql/engine/join/ob_join_filter_op.h" #include "sql/engine/px/exchange/ob_px_repart_transmit_op.h" #include "sql/optimizer/ob_px_resource_analyzer.h" +#include "sql/engine/px/ob_px_scheduler.h" using namespace oceanbase::common; using namespace oceanbase::sql; @@ -317,7 +318,8 @@ int ObDfoMgr::init(ObExecContext &exec_ctx, const ObOpSpec &root_op_spec, int64_t expected_worker_count, int64_t admited_worker_count, - const ObDfoInterruptIdGen &dfo_int_gen) + const ObDfoInterruptIdGen &dfo_int_gen, + ObPxCoordInfo &px_coord_info) { int ret = OB_SUCCESS; root_dfo_ = NULL; @@ -325,7 +327,7 @@ int ObDfoMgr::init(ObExecContext &exec_ctx, if (inited_) { ret = OB_INIT_TWICE; LOG_WARN("dfo mgr init twice", K(ret)); - } else if (OB_FAIL(do_split(exec_ctx, allocator_, &root_op_spec, root_dfo_, dfo_int_gen))) { + } else if (OB_FAIL(do_split(exec_ctx, allocator_, &root_op_spec, root_dfo_, dfo_int_gen, px_coord_info))) { LOG_WARN("fail split ops into dfo", K(ret)); } else if (OB_ISNULL(root_dfo_)) { ret = OB_ERR_UNEXPECTED; @@ -350,7 +352,8 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx, ObIAllocator &allocator, const ObOpSpec *phy_op, ObDfo *&parent_dfo, - const ObDfoInterruptIdGen &dfo_int_gen) const + const ObDfoInterruptIdGen &dfo_int_gen, + ObPxCoordInfo &px_coord_info) const { int ret = OB_SUCCESS; bool top_px = (nullptr == parent_dfo); @@ -371,6 +374,14 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx, } else if (phy_op->is_table_scan() && NULL != parent_dfo) { parent_dfo->set_scan(true); parent_dfo->inc_tsc_op_cnt(); + auto tsc_op = static_cast(phy_op); + if (TableAccessType::HAS_USER_TABLE == px_coord_info.table_access_type_){ + // nop + } else if (!is_virtual_table(tsc_op->get_ref_table_id())) { + px_coord_info.table_access_type_ = TableAccessType::HAS_USER_TABLE; + } else { + px_coord_info.table_access_type_ = TableAccessType::PURE_VIRTUAL_TABLE; + } } else if (phy_op->is_dml_operator() && NULL != parent_dfo) { // 当前op是一个dml算子,需要设置dfo的属性 parent_dfo->set_dml_op(true); @@ -494,7 +505,7 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx, for (int32_t i = 0; OB_SUCC(ret) && i < phy_op->get_child_cnt(); ++i) { ObDfo *tmp_parent_dfo = parent_dfo; if (OB_FAIL(do_split(exec_ctx, allocator, phy_op->get_child(i), - tmp_parent_dfo, dfo_int_gen))) { + tmp_parent_dfo, dfo_int_gen, px_coord_info))) { LOG_WARN("fail split op into dfo", K(ret)); } } diff --git a/src/sql/engine/px/ob_dfo_mgr.h b/src/sql/engine/px/ob_dfo_mgr.h index 22aefac0ca..80e6dc4bce 100644 --- a/src/sql/engine/px/ob_dfo_mgr.h +++ b/src/sql/engine/px/ob_dfo_mgr.h @@ -21,7 +21,7 @@ namespace oceanbase namespace sql { - +class ObPxCoordInfo; class ObDfoMgr { public: @@ -36,7 +36,8 @@ public: const ObOpSpec &root_op_spec, int64_t expected_worker_count, int64_t admited_worker_count, - const ObDfoInterruptIdGen &dfo_int_gen); + const ObDfoInterruptIdGen &dfo_int_gen, + ObPxCoordInfo &px_coord_info); ObDfo *get_root_dfo() { return root_dfo_; } virtual int get_ready_dfo(ObDfo *&dfo) const; // 仅用于单层dfo调度 @@ -60,7 +61,8 @@ private: common::ObIAllocator &allocator, const ObOpSpec *phy_op, ObDfo *&parent_dfo, - const ObDfoInterruptIdGen &dfo_id_gen) const; + const ObDfoInterruptIdGen &dfo_id_gen, + ObPxCoordInfo &px_coord_info) const; int create_dfo(common::ObIAllocator &allocator, const ObOpSpec *dfo_root_op, ObDfo *&dfo) const; diff --git a/src/sql/engine/px/ob_dfo_scheduler.cpp b/src/sql/engine/px/ob_dfo_scheduler.cpp index 2d68847463..7e14f0a933 100644 --- a/src/sql/engine/px/ob_dfo_scheduler.cpp +++ b/src/sql/engine/px/ob_dfo_scheduler.cpp @@ -483,18 +483,7 @@ int ObSerialDfoScheduler::dispatch_sqcs(ObExecContext &exec_ctx, LOG_WARN("no memory", K(ret)); } } - bool ignore_vtable_error = dfo.is_ignore_vtable_error(); - if (OB_SUCC(ret)) { - ObDfo *child_dfo = nullptr; - for (int i = 0; i < dfo.get_child_count() && OB_SUCC(ret); ++i) { - if (OB_FAIL(dfo.get_child_dfo(i, child_dfo))) { - LOG_WARN("fail to get child dfo", K(ret)); - } else if (!child_dfo->is_ignore_vtable_error()) { - ignore_vtable_error = false; - break; - } - } - } + bool ignore_vtable_error = coord_info_.should_ignore_vtable_error(); int64_t cluster_id = GCONF.cluster_id; ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) { ObPxSqcMeta &sqc = *sqcs.at(idx); @@ -532,7 +521,7 @@ int ObSerialDfoScheduler::dispatch_sqcs(ObExecContext &exec_ctx, if (dfo.has_child_dfo()) { sqc.set_recieve_use_interm_result(true); } - if (ignore_vtable_error && dfo.get_child_count() > 0) { + if (ignore_vtable_error) { sqc.set_ignore_vtable_error(true); } if (coord_info_.enable_px_batch_rescan()) { @@ -712,30 +701,22 @@ int ObParallelDfoScheduler::do_schedule_dfo(ObExecContext &exec_ctx, ObDfo &dfo) } if (OB_SUCC(ret)) { - //if (dfo.is_prealloc_transmit_channel() || dfo.is_prealloc_receive_channel()) { - // 下面的逻辑处理简单 DFO 调用的情况 - // - 目的: 大部分分布式查询的并发度为1,并且只有一个 DFO - // 这种情况下无需为 task 建立 worker 线程, - // 直接在 SQC 的工作线程中完成所有执行即可 - //ret = fast_dispatch_sqc(exec_ctx, dfo, sqcs); - //} else { - // 下面的逻辑处理握手阶段超时的情况 - // - 目的: 为了防止死锁 - // - 方式: 一旦超时,则终止掉全部 sqc,等待一段事件后,整个 dfo 重试 - // - 问题: init sqc 是异步的,其中部分 sqc 已经汇报了获取 task 的信息 - // 突然被终止,QC 方面的状态需要重新维护。但是存在下面的问题: - // 场景举例: - // 1. sqc1 成功,sqc2 超时 - // 2. dfo abort, clean sqc state - // 3. sqc1 汇报已经分配好 task (old news) - // 4. sqc1, sqc2 收到中断信息 - // 5. sqc1 重新调度 - // 6. sqc2 汇报已经分配好 task (latest news) - // 7. qc 认为 dfo 都已全部调度成功 (实际上没有) - // 8. sqc1 汇报分配好的 task (too late msg) - // - ret = dispatch_sqc(exec_ctx, dfo, sqcs); - //} + // 下面的逻辑处理握手阶段超时的情况 + // - 目的: 为了防止死锁 + // - 方式: 一旦超时,则终止掉全部 sqc,等待一段事件后,整个 dfo 重试 + // - 问题: init sqc 是异步的,其中部分 sqc 已经汇报了获取 task 的信息 + // 突然被终止,QC 方面的状态需要重新维护。但是存在下面的问题: + // 场景举例: + // 1. sqc1 成功,sqc2 超时 + // 2. dfo abort, clean sqc state + // 3. sqc1 汇报已经分配好 task (old news) + // 4. sqc1, sqc2 收到中断信息 + // 5. sqc1 重新调度 + // 6. sqc2 汇报已经分配好 task (latest news) + // 7. qc 认为 dfo 都已全部调度成功 (实际上没有) + // 8. sqc1 汇报分配好的 task (too late msg) + // + ret = dispatch_sqc(exec_ctx, dfo, sqcs); } return ret; } @@ -1211,74 +1192,6 @@ int ObParallelDfoScheduler::deal_with_init_sqc_error(ObExecContext &exec_ctx, return ret; } -// 将lightweight SQC 分发到各个 server,无需在远端申请 px 线程,直接 -// 由工作线程执行 -int ObParallelDfoScheduler::fast_dispatch_sqc(ObExecContext &exec_ctx, - ObDfo &dfo, - ObArray &sqcs) const -{ - int ret = OB_SUCCESS; - int64_t timeout_us = 0; - const ObPhysicalPlan *phy_plan = NULL; - ObPhysicalPlanCtx *phy_plan_ctx = NULL; - ObSQLSessionInfo *session = NULL; - - if (OB_UNLIKELY(NULL == (phy_plan = dfo.get_phy_plan()))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("NULL plan ptr unexpected", K(ret)); - } else if (OB_ISNULL(phy_plan_ctx = GET_PHY_PLAN_CTX(exec_ctx))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("phy plan ctx NULL", K(ret)); - } else if (OB_ISNULL(session = exec_ctx.get_my_session())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("session is NULL", K(ret)); - } - - - // 分发 sqc 可能需要重试,主要针对两种情况: - // 1. 分发 sqc 的 rpc 超时 - // 2. 分发 sqc 的 rpc 成功,但 sqc 上无法分配任何 worker 线程 - // 发生上述情况后,整个 dfo 需要重置状态,稍等片刻然后重试 - int64_t cluster_id = GCONF.cluster_id; - ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) { - ObPxSqcMeta &sqc = *sqcs.at(idx); - const ObAddr &addr = sqc.get_exec_addr(); - auto proxy = coord_info_.rpc_proxy_.to(addr); - if (OB_UNLIKELY(share::ObServerBlacklist::get_instance().is_in_blacklist( - share::ObCascadMember(addr, cluster_id), true /* add_server */, - session->get_process_query_time()))) { - if (!sqc.is_ignore_vtable_error()) { - ret = OB_RPC_CONNECT_ERROR; - LOG_WARN("peer no in communication, maybe crashed", K(ret), K(sqc), K(cluster_id), - K(session->get_process_query_time())); - } else { - LOG_WARN("ignore the black server list with virtual table", K(ret)); - } - } - if (OB_FAIL(ret)) { - } else { - SMART_VAR(ObPxRpcInitSqcArgs, args) { - ObPxRpcInitSqcResponse resp; - timeout_us = phy_plan_ctx->get_timeout_timestamp() - ObTimeUtility::current_time(); - args.set_serialize_param(exec_ctx, const_cast(*dfo.get_root_op_spec()), *phy_plan); - if (timeout_us <= 0) { - ret = OB_TIMEOUT; - } else if (OB_FAIL(args.sqc_.assign(sqc))) { - LOG_WARN("fail assign sqc", K(ret)); - } else if (OB_FAIL(proxy - .by(THIS_WORKER.get_rpc_tenant()?: session->get_effective_tenant_id()) - .timeout(timeout_us) - .init_sqc(args, resp))) { - LOG_WARN("fail dispatch dfo rpc", K(sqc), K(ret)); - } - LOG_TRACE("Sent lw dfo to addr", K(dfo), K(addr), K(args), K(resp)); - } - } - } - return ret; -} - - /* 当发送 sqc 超时时,可能是遇到了死锁。 * 应对策略是:终止 dfo 下所有 sqc,清空 qc-sqc 通道, * 等待一段时间,然后重新调度整个 dfo diff --git a/src/sql/engine/px/ob_dfo_scheduler.h b/src/sql/engine/px/ob_dfo_scheduler.h index 1a19bc860d..99d055ce37 100644 --- a/src/sql/engine/px/ob_dfo_scheduler.h +++ b/src/sql/engine/px/ob_dfo_scheduler.h @@ -112,9 +112,6 @@ private: int deal_with_init_sqc_error(ObExecContext &exec_ctx, const ObPxSqcMeta &sqc, int rc) const; - int fast_dispatch_sqc(ObExecContext &exec_ctx, - ObDfo &dfo, - ObArray &sqcs) const; int schedule_pair(ObExecContext &exec_ctx, ObDfo &child, ObDfo &parent) const; diff --git a/src/sql/engine/px/ob_px_coord_op.cpp b/src/sql/engine/px/ob_px_coord_op.cpp index 52fba63140..e350a401b9 100644 --- a/src/sql/engine/px/ob_px_coord_op.cpp +++ b/src/sql/engine/px/ob_px_coord_op.cpp @@ -439,7 +439,8 @@ int ObPxCoordOp::init_dfo_mgr(const ObDfoInterruptIdGen &dfo_id_gen, ObDfoMgr &d get_spec(), px_expected, px_admited_worker_count, - dfo_id_gen))) { + dfo_id_gen, + coord_info_))) { LOG_WARN("fail init dfo mgr", K(px_expected), K(query_expected), diff --git a/src/sql/engine/px/ob_px_rpc_processor.cpp b/src/sql/engine/px/ob_px_rpc_processor.cpp index 3138840677..a8e93966b5 100644 --- a/src/sql/engine/px/ob_px_rpc_processor.cpp +++ b/src/sql/engine/px/ob_px_rpc_processor.cpp @@ -93,6 +93,13 @@ int ObInitSqcP::process() } else { /*do nothing*/ } + +#ifdef ERRSIM + if (OB_FAIL(OB_E(EventTable::EN_PX_SQC_INIT_PROCESS_FAILED) OB_SUCCESS)) { + LOG_WARN("match sqc execute errism", K(ret)); + } +#endif + if (OB_FAIL(ret) && OB_NOT_NULL(sqc_handler)) { if (unregister_interrupt_) { ObPxRpcInitSqcArgs &arg = sqc_handler->get_sqc_init_arg(); @@ -265,7 +272,8 @@ void ObFastInitSqcReportQCMessageCall::operator()(hash::HashMapPairis_ignore_vtable_error() && err_ != OB_SUCCESS && err_ != OB_TIMEOUT) { + if (sqc_->is_ignore_vtable_error() && err_ != OB_SUCCESS + && ObVirtualTableErrorWhitelist::should_ignore_vtable_error(err_)) { // 当该SQC是虚拟表查询时, 调度RPC失败时需要忽略错误结果. // 并mock一个sqc finsh msg发送给正在轮询消息的PX算子 // 此操作已确认是线程安全的. diff --git a/src/sql/engine/px/ob_px_scheduler.cpp b/src/sql/engine/px/ob_px_scheduler.cpp index 27987e575b..c44a9efc25 100644 --- a/src/sql/engine/px/ob_px_scheduler.cpp +++ b/src/sql/engine/px/ob_px_scheduler.cpp @@ -300,7 +300,8 @@ int ObPxMsgProc::on_sqc_finish_msg(ObExecContext &ctx, } else { /*do nothing.*/ } if (OB_SUCC(ret)) { sqc->set_thread_finish(true); - if (sqc->is_ignore_vtable_error() && OB_SUCCESS != pkt.rc_) { + if (sqc->is_ignore_vtable_error() && OB_SUCCESS != pkt.rc_ + && ObVirtualTableErrorWhitelist::should_ignore_vtable_error(pkt.rc_)) { // 如果收到一个sqc finish消息, 如果该sqc涉及虚拟表, 需要忽略所有错误码 // 如果该dfo是root_dfo的child_dfo, 为了让px走出数据channel的消息循环 // 需要mock一个eof dtl buffer本地发送至px(实际未经过rpc, attach即可) diff --git a/src/sql/engine/px/ob_px_scheduler.h b/src/sql/engine/px/ob_px_scheduler.h index a3e425eccc..1edbaaae44 100644 --- a/src/sql/engine/px/ob_px_scheduler.h +++ b/src/sql/engine/px/ob_px_scheduler.h @@ -49,6 +49,11 @@ public: }; +enum class TableAccessType { + NO_TABLE, + PURE_VIRTUAL_TABLE, + HAS_USER_TABLE +}; // 这些信息是调度时候需要用的变量,暂时统一叫做CoordInfo class ObPxCoordInfo { @@ -65,7 +70,8 @@ public: interrupt_id_(interrupt_id), coord_(coord), batch_rescan_ctl_(NULL), - pruning_table_location_(NULL) + pruning_table_location_(NULL), + table_access_type_(TableAccessType::NO_TABLE) {} virtual ~ObPxCoordInfo() {} virtual void destroy() @@ -89,6 +95,11 @@ public: { return NULL == batch_rescan_ctl_ ? 0 : batch_rescan_ctl_->cur_idx_; } + // if there is no physical op visits user table and at least one physical op visits virtual table, ignore error + OB_INLINE bool should_ignore_vtable_error() + { + return TableAccessType::PURE_VIRTUAL_TABLE == table_access_type_; + } public: ObDfoMgr dfo_mgr_; ObPieceMsgCtxMgr piece_msg_ctx_mgr_; @@ -100,6 +111,7 @@ public: ObPxCoordOp &coord_; ObBatchRescanCtl *batch_rescan_ctl_; const common::ObIArray *pruning_table_location_; + TableAccessType table_access_type_; }; class ObDfoSchedulerBasic; diff --git a/src/sql/engine/px/ob_px_task_process.cpp b/src/sql/engine/px/ob_px_task_process.cpp index ad476091ee..2d8b28aef1 100644 --- a/src/sql/engine/px/ob_px_task_process.cpp +++ b/src/sql/engine/px/ob_px_task_process.cpp @@ -463,8 +463,8 @@ int ObPxTaskProcess::do_process() // nop } else if (IS_INTERRUPTED()) { //当前是被QC中断的,不再向QC发送中断,退出即可。 - } else if (ret != OB_TIMEOUT && - arg_.get_sqc_handler()->get_sqc_init_arg().sqc_.is_ignore_vtable_error()) { + } else if (arg_.get_sqc_handler()->get_sqc_init_arg().sqc_.is_ignore_vtable_error() + && ObVirtualTableErrorWhitelist::should_ignore_vtable_error(ret)) { // 忽略虚拟表错误 } else { (void) ObInterruptUtil::interrupt_qc(arg_.task_, ret); diff --git a/src/sql/engine/px/ob_px_util.cpp b/src/sql/engine/px/ob_px_util.cpp index 1290e357c2..7bd73f5ffd 100644 --- a/src/sql/engine/px/ob_px_util.cpp +++ b/src/sql/engine/px/ob_px_util.cpp @@ -58,25 +58,6 @@ int ObPXServerAddrUtil::alloc_by_data_distribution(const ObIArray &scan_ops, - ObDfo &dfo) -{ - int ret = OB_SUCCESS; - bool is_vtable = true; - for (int i = 0; i < scan_ops.count() && OB_SUCC(ret); ++i) { - if (OB_ISNULL(scan_ops.at(i))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("scan ops is null", K(ret)); - } else if (!is_virtual_table(scan_ops.at(i)->get_ref_table_id())) { - is_vtable = false; - break; - } - } - if (OB_SUCC(ret)) { - dfo.set_ignore_vtable_error(is_vtable && scan_ops.count() > 0); - } - return ret; -} int ObPXServerAddrUtil::build_dynamic_partition_table_location(common::ObIArray &scan_ops, const ObIArray *table_locations, ObDfo &dfo) @@ -172,9 +153,6 @@ int ObPXServerAddrUtil::alloc_by_data_distribution_inner( ref_table_id, table_loc)); } else { - if (OB_FAIL(mark_virtual_table_dfo(scan_ops, dfo))) { - LOG_WARN("fail to mark virtual table dfo", K(ret)); - } else // 通过TSC或者DML获得当前的DFO的partition对应的location信息 // 后续利用location信息构建对应的SQC meta if (OB_ISNULL(table_loc = DAS_CTX(ctx).get_table_loc_by_id(table_location_key, ref_table_id))) { @@ -311,7 +289,6 @@ int ObPXServerAddrUtil::build_dfo_sqc(ObExecContext &ctx, sqc.set_fulltree(dfo.is_fulltree()); sqc.set_qc_server_id(dfo.get_qc_server_id()); sqc.set_parent_dfo_id(dfo.get_parent_dfo_id()); - sqc.set_ignore_vtable_error(dfo.is_ignore_vtable_error()); sqc.set_single_tsc_leaf_dfo(dfo.is_single_tsc_leaf_dfo()); for (auto iter = locations.begin(); OB_SUCC(ret) && iter != locations.end(); ++iter) { if (addrs.at(i) == (*iter)->server_) { @@ -3535,3 +3512,26 @@ int ObExtraServerAliveCheck::do_check() const LOG_DEBUG("server alive do check", K(ret), K(qc_addr_), K(cluster_id_), K(dfo_mgr_)); return ret; } + +bool ObVirtualTableErrorWhitelist::should_ignore_vtable_error(int error_code) +{ + bool should_ignore = false; + switch (error_code) { + case OB_ALLOCATE_MEMORY_FAILED: { + should_ignore = true; + break; + } + case OB_RPC_CONNECT_ERROR: { + should_ignore = true; + break; + } + case OB_RPC_SEND_ERROR: { + should_ignore = true; + break; + } + default: { + break; + } + } + return should_ignore; +} diff --git a/src/sql/engine/px/ob_px_util.h b/src/sql/engine/px/ob_px_util.h index 7f9d6568f5..7d2c3869e4 100644 --- a/src/sql/engine/px/ob_px_util.h +++ b/src/sql/engine/px/ob_px_util.h @@ -172,7 +172,6 @@ private: bool asc, ObExecContext &exec_ctx, ObIArray &base_order); static int build_dynamic_partition_table_location(common::ObIArray &scan_ops, const ObIArray *table_locations, ObDfo &dfo); - static int mark_virtual_table_dfo(common::ObIArray &scan_ops, ObDfo &dfo); static int build_dfo_sqc(ObExecContext &ctx, const DASTabletLocList &locations, @@ -543,6 +542,12 @@ private: int64_t query_start_time_; }; +class ObVirtualTableErrorWhitelist +{ +public: + static bool should_ignore_vtable_error(int error_code); +}; + } }