From 33672fdd50edf5ae21064ea2e669d0c9618a30b9 Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 3 Nov 2022 05:38:34 +0000 Subject: [PATCH] das task fast fail when got OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST error and table not exist. --- src/sql/das/ob_das_define.cpp | 1 + src/sql/das/ob_das_define.h | 8 ++++ src/sql/das/ob_das_task.h | 2 +- src/sql/das/ob_data_access_service.cpp | 52 +++++++++++++++++++++----- src/sql/das/ob_data_access_service.h | 1 + 5 files changed, 53 insertions(+), 11 deletions(-) diff --git a/src/sql/das/ob_das_define.cpp b/src/sql/das/ob_das_define.cpp index a21712da66..54568948be 100644 --- a/src/sql/das/ob_das_define.cpp +++ b/src/sql/das/ob_das_define.cpp @@ -29,6 +29,7 @@ using namespace share; using namespace share::schema; namespace sql { + //not assign array member void ObDASTableLocMeta::light_assign(const ObDASTableLocMeta &other) { diff --git a/src/sql/das/ob_das_define.h b/src/sql/das/ob_das_define.h index 788d77f6f0..d4ec844d68 100644 --- a/src/sql/das/ob_das_define.h +++ b/src/sql/das/ob_das_define.h @@ -27,6 +27,14 @@ (::oceanbase::sql::DAS_OP_TABLE_BATCH_SCAN != (_task_op)->get_type() ? \ nullptr : static_cast<::oceanbase::sql::ObDASGroupScanOp*>(_task_op)) +#define IS_DAS_DML_OP(_task_op) \ + ({ \ + DAS_OP_TABLE_INSERT == (_task_op).get_type() || \ + DAS_OP_TABLE_UPDATE == (_task_op).get_type() || \ + DAS_OP_TABLE_LOCK == (_task_op).get_type() || \ + DAS_OP_TABLE_DELETE == (_task_op).get_type(); \ + }) + namespace oceanbase { namespace sql diff --git a/src/sql/das/ob_das_task.h b/src/sql/das/ob_das_task.h index 5a9b710298..7945be256e 100644 --- a/src/sql/das/ob_das_task.h +++ b/src/sql/das/ob_das_task.h @@ -153,7 +153,7 @@ public: void set_tenant_id(uint64_t tenant_id) { tenant_id_ = tenant_id; } uint64_t get_tenant_id() const { return tenant_id_; } void set_type(ObDASOpType op_type) { op_type_ = op_type; } - ObDASOpType get_type() { return op_type_; } + ObDASOpType get_type() const { return op_type_; } void set_trans_desc(transaction::ObTxDesc *trans_desc) { trans_desc_ = trans_desc; } transaction::ObTxDesc *get_trans_desc() { return trans_desc_; } void set_snapshot(transaction::ObTxReadSnapshot *snapshot) { snapshot_ = snapshot; } diff --git a/src/sql/das/ob_data_access_service.cpp b/src/sql/das/ob_data_access_service.cpp index c9623fadf5..2f0e9b2779 100644 --- a/src/sql/das/ob_data_access_service.cpp +++ b/src/sql/das/ob_data_access_service.cpp @@ -179,21 +179,53 @@ int ObDataAccessService::retry_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op while (is_master_changed_error(ret) || is_partition_change_error(ret) || OB_REPLICA_NOT_READABLE == ret) { - task_op.in_part_retry_ = true; - das_ref.get_exec_ctx().get_my_session()->set_session_in_retry(true, ret); - if (OB_FAIL(clear_task_exec_env(das_ref, task_op))) { - LOG_WARN("clear task execution environment", K(ret)); - } else if (OB_FAIL(das_ref.get_exec_ctx().check_status())) { - LOG_WARN("query is timeout, terminate retry", K(ret)); - } else if (OB_FAIL(refresh_partition_location(das_ref, task_op))) { - LOG_WARN("refresh partition location failed", K(ret)); - } else if (OB_FAIL(execute_dist_das_task(das_ref, task_op))) { - LOG_WARN("execute dist das task failed", K(ret)); + if (!can_fast_fail(task_op)) { + task_op.in_part_retry_ = true; + das_ref.get_exec_ctx().get_my_session()->set_session_in_retry(true, ret); + if (OB_FAIL(clear_task_exec_env(das_ref, task_op))) { + LOG_WARN("clear task execution environment", K(ret)); + } else if (OB_FAIL(das_ref.get_exec_ctx().check_status())) { + LOG_WARN("query is timeout, terminate retry", K(ret)); + } else if (OB_FAIL(refresh_partition_location(das_ref, task_op))) { + LOG_WARN("refresh partition location failed", K(ret)); + } else if (OB_FAIL(execute_dist_das_task(das_ref, task_op))) { + LOG_WARN("execute dist das task failed", K(ret)); + } } } return ret; } + +bool ObDataAccessService::can_fast_fail(const ObIDASTaskOp &task_op) const +{ + bool bret = false; + int ret = OB_SUCCESS; // no need to pass ret outside. + const common::ObTableID &table_id = IS_DAS_DML_OP(task_op) + ? static_cast(task_op.get_ctdef())->table_id_ + : static_cast(task_op.get_ctdef())->ref_table_id_; + int64_t schema_version = IS_DAS_DML_OP(task_op) + ? static_cast(task_op.get_ctdef())->schema_version_ + : static_cast(task_op.get_ctdef())->schema_version_; + schema::ObSchemaGetterGuard schema_guard; + const schema::ObTableSchema *table_schema = nullptr; + if (OB_ISNULL(GCTX.schema_service_)) { + LOG_ERROR("invalid schema service", KR(ret)); + } else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(MTL_ID(), schema_guard))) { + LOG_WARN("get tenant schema guard fail", KR(ret), K(MTL_ID())); + } else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) { + LOG_WARN("failed to get table schema", KR(ret)); + } else if (OB_ISNULL(table_schema)) { + bret = true; + LOG_WARN("table not exist, fast fail das task"); + } else if (table_schema->get_schema_version() != schema_version) { + bret = true; + LOG_WARN("schema version changed, fast fail das task", "current schema version", + table_schema->get_schema_version(), "query schema version", schema_version); + } + return bret; +} + int ObDataAccessService::end_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op) { int ret = OB_SUCCESS; diff --git a/src/sql/das/ob_data_access_service.h b/src/sql/das/ob_data_access_service.h index 10774853aa..001ef1685d 100644 --- a/src/sql/das/ob_data_access_service.h +++ b/src/sql/das/ob_data_access_service.h @@ -58,6 +58,7 @@ private: ObIDASTaskOp *task_op, ObDASExtraData *&extra_result); int collect_das_task_info(ObDASTaskArg &task_arg, ObDASRemoteInfo &remote_info); + bool can_fast_fail(const ObIDASTaskOp &task_op) const; private: obrpc::ObDASRpcProxy das_rpc_proxy_; common::ObAddr ctrl_addr_;