add virtual table ignore error whitelist

This commit is contained in:
obdev
2023-02-24 13:06:22 +00:00
committed by ob-robot
parent 3e39a6476e
commit 8ebdd029e4
15 changed files with 105 additions and 150 deletions

View File

@ -385,7 +385,6 @@ public:
px_bf_id_(OB_INVALID_ID),
use_filter_ch_map_(),
total_task_cnt_(0),
ignore_vtable_error_(false),
pkey_table_loc_id_(0),
tsc_op_cnt_(0)
{
@ -551,8 +550,6 @@ public:
return (dfo_id >= 0 && dfo_id <= MAX_DFO_ID) ||
(dfo_id == MAX_DFO_ID);
}
void set_ignore_vtable_error(bool flag) { ignore_vtable_error_ = flag; }
bool is_ignore_vtable_error() { return ignore_vtable_error_; }
void set_pkey_table_loc_id(int64_t id) { pkey_table_loc_id_ = id; }
int64_t get_pkey_table_loc_id() { return pkey_table_loc_id_; };
void inc_tsc_op_cnt() { tsc_op_cnt_++; }
@ -647,7 +644,6 @@ private:
int64_t px_bf_id_; //记录px_bloom_filter_id
ObPxBloomFilterChInfo use_filter_ch_map_; // use and create channel info is same
int64_t total_task_cnt_; // the task total count of dfo start worker
bool ignore_vtable_error_;
int64_t pkey_table_loc_id_; // record pkey table loc id for child dfo
int64_t tsc_op_cnt_;
};

View File

@ -22,6 +22,7 @@
#include "sql/engine/join/ob_join_filter_op.h"
#include "sql/engine/px/exchange/ob_px_repart_transmit_op.h"
#include "sql/optimizer/ob_px_resource_analyzer.h"
#include "sql/engine/px/ob_px_scheduler.h"
using namespace oceanbase::common;
using namespace oceanbase::sql;
@ -317,7 +318,8 @@ int ObDfoMgr::init(ObExecContext &exec_ctx,
const ObOpSpec &root_op_spec,
int64_t expected_worker_count,
int64_t admited_worker_count,
const ObDfoInterruptIdGen &dfo_int_gen)
const ObDfoInterruptIdGen &dfo_int_gen,
ObPxCoordInfo &px_coord_info)
{
int ret = OB_SUCCESS;
root_dfo_ = NULL;
@ -325,7 +327,7 @@ int ObDfoMgr::init(ObExecContext &exec_ctx,
if (inited_) {
ret = OB_INIT_TWICE;
LOG_WARN("dfo mgr init twice", K(ret));
} else if (OB_FAIL(do_split(exec_ctx, allocator_, &root_op_spec, root_dfo_, dfo_int_gen))) {
} else if (OB_FAIL(do_split(exec_ctx, allocator_, &root_op_spec, root_dfo_, dfo_int_gen, px_coord_info))) {
LOG_WARN("fail split ops into dfo", K(ret));
} else if (OB_ISNULL(root_dfo_)) {
ret = OB_ERR_UNEXPECTED;
@ -350,7 +352,8 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx,
ObIAllocator &allocator,
const ObOpSpec *phy_op,
ObDfo *&parent_dfo,
const ObDfoInterruptIdGen &dfo_int_gen) const
const ObDfoInterruptIdGen &dfo_int_gen,
ObPxCoordInfo &px_coord_info) const
{
int ret = OB_SUCCESS;
bool top_px = (nullptr == parent_dfo);
@ -371,6 +374,14 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx,
} else if (phy_op->is_table_scan() && NULL != parent_dfo) {
parent_dfo->set_scan(true);
parent_dfo->inc_tsc_op_cnt();
auto tsc_op = static_cast<const ObTableScanSpec *>(phy_op);
if (TableAccessType::HAS_USER_TABLE == px_coord_info.table_access_type_){
// nop
} else if (!is_virtual_table(tsc_op->get_ref_table_id())) {
px_coord_info.table_access_type_ = TableAccessType::HAS_USER_TABLE;
} else {
px_coord_info.table_access_type_ = TableAccessType::PURE_VIRTUAL_TABLE;
}
} else if (phy_op->is_dml_operator() && NULL != parent_dfo) {
// 当前op是一个dml算子,需要设置dfo的属性
parent_dfo->set_dml_op(true);
@ -494,7 +505,7 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx,
for (int32_t i = 0; OB_SUCC(ret) && i < phy_op->get_child_cnt(); ++i) {
ObDfo *tmp_parent_dfo = parent_dfo;
if (OB_FAIL(do_split(exec_ctx, allocator, phy_op->get_child(i),
tmp_parent_dfo, dfo_int_gen))) {
tmp_parent_dfo, dfo_int_gen, px_coord_info))) {
LOG_WARN("fail split op into dfo", K(ret));
}
}

View File

@ -21,7 +21,7 @@ namespace oceanbase
namespace sql
{
class ObPxCoordInfo;
class ObDfoMgr
{
public:
@ -36,7 +36,8 @@ public:
const ObOpSpec &root_op_spec,
int64_t expected_worker_count,
int64_t admited_worker_count,
const ObDfoInterruptIdGen &dfo_int_gen);
const ObDfoInterruptIdGen &dfo_int_gen,
ObPxCoordInfo &px_coord_info);
ObDfo *get_root_dfo() { return root_dfo_; }
virtual int get_ready_dfo(ObDfo *&dfo) const; // 仅用于单层dfo调度
@ -60,7 +61,8 @@ private:
common::ObIAllocator &allocator,
const ObOpSpec *phy_op,
ObDfo *&parent_dfo,
const ObDfoInterruptIdGen &dfo_id_gen) const;
const ObDfoInterruptIdGen &dfo_id_gen,
ObPxCoordInfo &px_coord_info) const;
int create_dfo(common::ObIAllocator &allocator,
const ObOpSpec *dfo_root_op,
ObDfo *&dfo) const;

View File

@ -483,18 +483,7 @@ int ObSerialDfoScheduler::dispatch_sqcs(ObExecContext &exec_ctx,
LOG_WARN("no memory", K(ret));
}
}
bool ignore_vtable_error = dfo.is_ignore_vtable_error();
if (OB_SUCC(ret)) {
ObDfo *child_dfo = nullptr;
for (int i = 0; i < dfo.get_child_count() && OB_SUCC(ret); ++i) {
if (OB_FAIL(dfo.get_child_dfo(i, child_dfo))) {
LOG_WARN("fail to get child dfo", K(ret));
} else if (!child_dfo->is_ignore_vtable_error()) {
ignore_vtable_error = false;
break;
}
}
}
bool ignore_vtable_error = coord_info_.should_ignore_vtable_error();
int64_t cluster_id = GCONF.cluster_id;
ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) {
ObPxSqcMeta &sqc = *sqcs.at(idx);
@ -532,7 +521,7 @@ int ObSerialDfoScheduler::dispatch_sqcs(ObExecContext &exec_ctx,
if (dfo.has_child_dfo()) {
sqc.set_recieve_use_interm_result(true);
}
if (ignore_vtable_error && dfo.get_child_count() > 0) {
if (ignore_vtable_error) {
sqc.set_ignore_vtable_error(true);
}
if (coord_info_.enable_px_batch_rescan()) {
@ -712,30 +701,22 @@ int ObParallelDfoScheduler::do_schedule_dfo(ObExecContext &exec_ctx, ObDfo &dfo)
}
if (OB_SUCC(ret)) {
//if (dfo.is_prealloc_transmit_channel() || dfo.is_prealloc_receive_channel()) {
// 下面的逻辑处理简单 DFO 调用的情况
// - 目的: 大部分分布式查询的并发度为1,并且只有一个 DFO
// 这种情况下无需为 task 建立 worker 线程,
// 直接在 SQC 的工作线程中完成所有执行即可
//ret = fast_dispatch_sqc(exec_ctx, dfo, sqcs);
//} else {
// 下面的逻辑处理握手阶段超时的情况
// - 目的: 为了防止死锁
// - 方式: 一旦超时,则终止掉全部 sqc,等待一段事件后,整个 dfo 重试
// - 问题: init sqc 是异步的,其中部分 sqc 已经汇报了获取 task 的信息
// 突然被终止,QC 方面的状态需要重新维护。但是存在下面的问题:
// 场景举例:
// 1. sqc1 成功,sqc2 超时
// 2. dfo abort, clean sqc state
// 3. sqc1 汇报已经分配好 task (old news)
// 4. sqc1, sqc2 收到中断信息
// 5. sqc1 重新调度
// 6. sqc2 汇报已经分配好 task (latest news)
// 7. qc 认为 dfo 都已全部调度成功 (实际上没有)
// 8. sqc1 汇报分配好的 task (too late msg)
//
ret = dispatch_sqc(exec_ctx, dfo, sqcs);
//}
// 下面的逻辑处理握手阶段超时的情况
// - 目的: 为了防止死锁
// - 方式: 一旦超时,则终止掉全部 sqc,等待一段事件后,整个 dfo 重试
// - 问题: init sqc 是异步的,其中部分 sqc 已经汇报了获取 task 的信息
// 突然被终止,QC 方面的状态需要重新维护。但是存在下面的问题:
// 场景举例:
// 1. sqc1 成功,sqc2 超时
// 2. dfo abort, clean sqc state
// 3. sqc1 汇报已经分配好 task (old news)
// 4. sqc1, sqc2 收到中断信息
// 5. sqc1 重新调度
// 6. sqc2 汇报已经分配好 task (latest news)
// 7. qc 认为 dfo 都已全部调度成功 (实际上没有)
// 8. sqc1 汇报分配好的 task (too late msg)
//
ret = dispatch_sqc(exec_ctx, dfo, sqcs);
}
return ret;
}
@ -1211,74 +1192,6 @@ int ObParallelDfoScheduler::deal_with_init_sqc_error(ObExecContext &exec_ctx,
return ret;
}
// 将lightweight SQC 分发到各个 server,无需在远端申请 px 线程,直接
// 由工作线程执行
int ObParallelDfoScheduler::fast_dispatch_sqc(ObExecContext &exec_ctx,
ObDfo &dfo,
ObArray<ObPxSqcMeta *> &sqcs) const
{
int ret = OB_SUCCESS;
int64_t timeout_us = 0;
const ObPhysicalPlan *phy_plan = NULL;
ObPhysicalPlanCtx *phy_plan_ctx = NULL;
ObSQLSessionInfo *session = NULL;
if (OB_UNLIKELY(NULL == (phy_plan = dfo.get_phy_plan()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL plan ptr unexpected", K(ret));
} else if (OB_ISNULL(phy_plan_ctx = GET_PHY_PLAN_CTX(exec_ctx))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("phy plan ctx NULL", K(ret));
} else if (OB_ISNULL(session = exec_ctx.get_my_session())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session is NULL", K(ret));
}
// 分发 sqc 可能需要重试,主要针对两种情况:
// 1. 分发 sqc 的 rpc 超时
// 2. 分发 sqc 的 rpc 成功,但 sqc 上无法分配任何 worker 线程
// 发生上述情况后,整个 dfo 需要重置状态,稍等片刻然后重试
int64_t cluster_id = GCONF.cluster_id;
ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) {
ObPxSqcMeta &sqc = *sqcs.at(idx);
const ObAddr &addr = sqc.get_exec_addr();
auto proxy = coord_info_.rpc_proxy_.to(addr);
if (OB_UNLIKELY(share::ObServerBlacklist::get_instance().is_in_blacklist(
share::ObCascadMember(addr, cluster_id), true /* add_server */,
session->get_process_query_time()))) {
if (!sqc.is_ignore_vtable_error()) {
ret = OB_RPC_CONNECT_ERROR;
LOG_WARN("peer no in communication, maybe crashed", K(ret), K(sqc), K(cluster_id),
K(session->get_process_query_time()));
} else {
LOG_WARN("ignore the black server list with virtual table", K(ret));
}
}
if (OB_FAIL(ret)) {
} else {
SMART_VAR(ObPxRpcInitSqcArgs, args) {
ObPxRpcInitSqcResponse resp;
timeout_us = phy_plan_ctx->get_timeout_timestamp() - ObTimeUtility::current_time();
args.set_serialize_param(exec_ctx, const_cast<ObOpSpec &>(*dfo.get_root_op_spec()), *phy_plan);
if (timeout_us <= 0) {
ret = OB_TIMEOUT;
} else if (OB_FAIL(args.sqc_.assign(sqc))) {
LOG_WARN("fail assign sqc", K(ret));
} else if (OB_FAIL(proxy
.by(THIS_WORKER.get_rpc_tenant()?: session->get_effective_tenant_id())
.timeout(timeout_us)
.init_sqc(args, resp))) {
LOG_WARN("fail dispatch dfo rpc", K(sqc), K(ret));
}
LOG_TRACE("Sent lw dfo to addr", K(dfo), K(addr), K(args), K(resp));
}
}
}
return ret;
}
/* 当发送 sqc 超时时,可能是遇到了死锁。
* 应对策略是:终止 dfo 下所有 sqc,清空 qc-sqc 通道,
* 等待一段时间,然后重新调度整个 dfo

View File

@ -112,9 +112,6 @@ private:
int deal_with_init_sqc_error(ObExecContext &exec_ctx,
const ObPxSqcMeta &sqc,
int rc) const;
int fast_dispatch_sqc(ObExecContext &exec_ctx,
ObDfo &dfo,
ObArray<ObPxSqcMeta *> &sqcs) const;
int schedule_pair(ObExecContext &exec_ctx,
ObDfo &child,
ObDfo &parent) const;

View File

@ -439,7 +439,8 @@ int ObPxCoordOp::init_dfo_mgr(const ObDfoInterruptIdGen &dfo_id_gen, ObDfoMgr &d
get_spec(),
px_expected,
px_admited_worker_count,
dfo_id_gen))) {
dfo_id_gen,
coord_info_))) {
LOG_WARN("fail init dfo mgr",
K(px_expected),
K(query_expected),

View File

@ -93,6 +93,13 @@ int ObInitSqcP::process()
} else {
/*do nothing*/
}
#ifdef ERRSIM
if (OB_FAIL(OB_E(EventTable::EN_PX_SQC_INIT_PROCESS_FAILED) OB_SUCCESS)) {
LOG_WARN("match sqc execute errism", K(ret));
}
#endif
if (OB_FAIL(ret) && OB_NOT_NULL(sqc_handler)) {
if (unregister_interrupt_) {
ObPxRpcInitSqcArgs &arg = sqc_handler->get_sqc_init_arg();
@ -265,7 +272,8 @@ void ObFastInitSqcReportQCMessageCall::operator()(hash::HashMapPair<ObInterrupti
{
UNUSED(entry);
if (OB_NOT_NULL(sqc_)) {
if (sqc_->is_ignore_vtable_error() && err_ != OB_SUCCESS && err_ != OB_TIMEOUT) {
if (sqc_->is_ignore_vtable_error() && err_ != OB_SUCCESS
&& ObVirtualTableErrorWhitelist::should_ignore_vtable_error(err_)) {
// 当该SQC是虚拟表查询时, 调度RPC失败时需要忽略错误结果.
// 并mock一个sqc finsh msg发送给正在轮询消息的PX算子
// 此操作已确认是线程安全的.

View File

@ -300,7 +300,8 @@ int ObPxMsgProc::on_sqc_finish_msg(ObExecContext &ctx,
} else { /*do nothing.*/ }
if (OB_SUCC(ret)) {
sqc->set_thread_finish(true);
if (sqc->is_ignore_vtable_error() && OB_SUCCESS != pkt.rc_) {
if (sqc->is_ignore_vtable_error() && OB_SUCCESS != pkt.rc_
&& ObVirtualTableErrorWhitelist::should_ignore_vtable_error(pkt.rc_)) {
// 如果收到一个sqc finish消息, 如果该sqc涉及虚拟表, 需要忽略所有错误码
// 如果该dfo是root_dfo的child_dfo, 为了让px走出数据channel的消息循环
// 需要mock一个eof dtl buffer本地发送至px(实际未经过rpc, attach即可)

View File

@ -49,6 +49,11 @@ public:
};
enum class TableAccessType {
NO_TABLE,
PURE_VIRTUAL_TABLE,
HAS_USER_TABLE
};
// 这些信息是调度时候需要用的变量,暂时统一叫做CoordInfo
class ObPxCoordInfo
{
@ -65,7 +70,8 @@ public:
interrupt_id_(interrupt_id),
coord_(coord),
batch_rescan_ctl_(NULL),
pruning_table_location_(NULL)
pruning_table_location_(NULL),
table_access_type_(TableAccessType::NO_TABLE)
{}
virtual ~ObPxCoordInfo() {}
virtual void destroy()
@ -89,6 +95,11 @@ public:
{
return NULL == batch_rescan_ctl_ ? 0 : batch_rescan_ctl_->cur_idx_;
}
// if there is no physical op visits user table and at least one physical op visits virtual table, ignore error
OB_INLINE bool should_ignore_vtable_error()
{
return TableAccessType::PURE_VIRTUAL_TABLE == table_access_type_;
}
public:
ObDfoMgr dfo_mgr_;
ObPieceMsgCtxMgr piece_msg_ctx_mgr_;
@ -100,6 +111,7 @@ public:
ObPxCoordOp &coord_;
ObBatchRescanCtl *batch_rescan_ctl_;
const common::ObIArray<ObTableLocation> *pruning_table_location_;
TableAccessType table_access_type_;
};
class ObDfoSchedulerBasic;

View File

@ -463,8 +463,8 @@ int ObPxTaskProcess::do_process()
// nop
} else if (IS_INTERRUPTED()) {
//当前是被QC中断的,不再向QC发送中断,退出即可。
} else if (ret != OB_TIMEOUT &&
arg_.get_sqc_handler()->get_sqc_init_arg().sqc_.is_ignore_vtable_error()) {
} else if (arg_.get_sqc_handler()->get_sqc_init_arg().sqc_.is_ignore_vtable_error()
&& ObVirtualTableErrorWhitelist::should_ignore_vtable_error(ret)) {
// 忽略虚拟表错误
} else {
(void) ObInterruptUtil::interrupt_qc(arg_.task_, ret);

View File

@ -58,25 +58,6 @@ int ObPXServerAddrUtil::alloc_by_data_distribution(const ObIArray<ObTableLocatio
return ret;
}
int ObPXServerAddrUtil::mark_virtual_table_dfo(common::ObIArray<const ObTableScanSpec*> &scan_ops,
ObDfo &dfo)
{
int ret = OB_SUCCESS;
bool is_vtable = true;
for (int i = 0; i < scan_ops.count() && OB_SUCC(ret); ++i) {
if (OB_ISNULL(scan_ops.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("scan ops is null", K(ret));
} else if (!is_virtual_table(scan_ops.at(i)->get_ref_table_id())) {
is_vtable = false;
break;
}
}
if (OB_SUCC(ret)) {
dfo.set_ignore_vtable_error(is_vtable && scan_ops.count() > 0);
}
return ret;
}
int ObPXServerAddrUtil::build_dynamic_partition_table_location(common::ObIArray<const ObTableScanSpec *> &scan_ops,
const ObIArray<ObTableLocation> *table_locations, ObDfo &dfo)
@ -172,9 +153,6 @@ int ObPXServerAddrUtil::alloc_by_data_distribution_inner(
ref_table_id,
table_loc));
} else {
if (OB_FAIL(mark_virtual_table_dfo(scan_ops, dfo))) {
LOG_WARN("fail to mark virtual table dfo", K(ret));
} else
// 通过TSC或者DML获得当前的DFO的partition对应的location信息
// 后续利用location信息构建对应的SQC meta
if (OB_ISNULL(table_loc = DAS_CTX(ctx).get_table_loc_by_id(table_location_key, ref_table_id))) {
@ -311,7 +289,6 @@ int ObPXServerAddrUtil::build_dfo_sqc(ObExecContext &ctx,
sqc.set_fulltree(dfo.is_fulltree());
sqc.set_qc_server_id(dfo.get_qc_server_id());
sqc.set_parent_dfo_id(dfo.get_parent_dfo_id());
sqc.set_ignore_vtable_error(dfo.is_ignore_vtable_error());
sqc.set_single_tsc_leaf_dfo(dfo.is_single_tsc_leaf_dfo());
for (auto iter = locations.begin(); OB_SUCC(ret) && iter != locations.end(); ++iter) {
if (addrs.at(i) == (*iter)->server_) {
@ -3535,3 +3512,26 @@ int ObExtraServerAliveCheck::do_check() const
LOG_DEBUG("server alive do check", K(ret), K(qc_addr_), K(cluster_id_), K(dfo_mgr_));
return ret;
}
bool ObVirtualTableErrorWhitelist::should_ignore_vtable_error(int error_code)
{
bool should_ignore = false;
switch (error_code) {
case OB_ALLOCATE_MEMORY_FAILED: {
should_ignore = true;
break;
}
case OB_RPC_CONNECT_ERROR: {
should_ignore = true;
break;
}
case OB_RPC_SEND_ERROR: {
should_ignore = true;
break;
}
default: {
break;
}
}
return should_ignore;
}

View File

@ -172,7 +172,6 @@ private:
bool asc, ObExecContext &exec_ctx, ObIArray<int64_t> &base_order);
static int build_dynamic_partition_table_location(common::ObIArray<const ObTableScanSpec*> &scan_ops,
const ObIArray<ObTableLocation> *table_locations, ObDfo &dfo);
static int mark_virtual_table_dfo(common::ObIArray<const ObTableScanSpec*> &scan_ops, ObDfo &dfo);
static int build_dfo_sqc(ObExecContext &ctx,
const DASTabletLocList &locations,
@ -543,6 +542,12 @@ private:
int64_t query_start_time_;
};
class ObVirtualTableErrorWhitelist
{
public:
static bool should_ignore_vtable_error(int error_code);
};
}
}