das task fast fail when got OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST error and table not exist.
This commit is contained in:
@ -29,6 +29,7 @@ using namespace share;
|
|||||||
using namespace share::schema;
|
using namespace share::schema;
|
||||||
namespace sql
|
namespace sql
|
||||||
{
|
{
|
||||||
|
|
||||||
//not assign array member
|
//not assign array member
|
||||||
void ObDASTableLocMeta::light_assign(const ObDASTableLocMeta &other)
|
void ObDASTableLocMeta::light_assign(const ObDASTableLocMeta &other)
|
||||||
{
|
{
|
||||||
|
|||||||
@ -27,6 +27,14 @@
|
|||||||
(::oceanbase::sql::DAS_OP_TABLE_BATCH_SCAN != (_task_op)->get_type() ? \
|
(::oceanbase::sql::DAS_OP_TABLE_BATCH_SCAN != (_task_op)->get_type() ? \
|
||||||
nullptr : static_cast<::oceanbase::sql::ObDASGroupScanOp*>(_task_op))
|
nullptr : static_cast<::oceanbase::sql::ObDASGroupScanOp*>(_task_op))
|
||||||
|
|
||||||
|
#define IS_DAS_DML_OP(_task_op) \
|
||||||
|
({ \
|
||||||
|
DAS_OP_TABLE_INSERT == (_task_op).get_type() || \
|
||||||
|
DAS_OP_TABLE_UPDATE == (_task_op).get_type() || \
|
||||||
|
DAS_OP_TABLE_LOCK == (_task_op).get_type() || \
|
||||||
|
DAS_OP_TABLE_DELETE == (_task_op).get_type(); \
|
||||||
|
})
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
namespace sql
|
namespace sql
|
||||||
|
|||||||
@ -153,7 +153,7 @@ public:
|
|||||||
void set_tenant_id(uint64_t tenant_id) { tenant_id_ = tenant_id; }
|
void set_tenant_id(uint64_t tenant_id) { tenant_id_ = tenant_id; }
|
||||||
uint64_t get_tenant_id() const { return tenant_id_; }
|
uint64_t get_tenant_id() const { return tenant_id_; }
|
||||||
void set_type(ObDASOpType op_type) { op_type_ = op_type; }
|
void set_type(ObDASOpType op_type) { op_type_ = op_type; }
|
||||||
ObDASOpType get_type() { return op_type_; }
|
ObDASOpType get_type() const { return op_type_; }
|
||||||
void set_trans_desc(transaction::ObTxDesc *trans_desc) { trans_desc_ = trans_desc; }
|
void set_trans_desc(transaction::ObTxDesc *trans_desc) { trans_desc_ = trans_desc; }
|
||||||
transaction::ObTxDesc *get_trans_desc() { return trans_desc_; }
|
transaction::ObTxDesc *get_trans_desc() { return trans_desc_; }
|
||||||
void set_snapshot(transaction::ObTxReadSnapshot *snapshot) { snapshot_ = snapshot; }
|
void set_snapshot(transaction::ObTxReadSnapshot *snapshot) { snapshot_ = snapshot; }
|
||||||
|
|||||||
@ -179,6 +179,7 @@ int ObDataAccessService::retry_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op
|
|||||||
while (is_master_changed_error(ret) ||
|
while (is_master_changed_error(ret) ||
|
||||||
is_partition_change_error(ret) ||
|
is_partition_change_error(ret) ||
|
||||||
OB_REPLICA_NOT_READABLE == ret) {
|
OB_REPLICA_NOT_READABLE == ret) {
|
||||||
|
if (!can_fast_fail(task_op)) {
|
||||||
task_op.in_part_retry_ = true;
|
task_op.in_part_retry_ = true;
|
||||||
das_ref.get_exec_ctx().get_my_session()->set_session_in_retry(true, ret);
|
das_ref.get_exec_ctx().get_my_session()->set_session_in_retry(true, ret);
|
||||||
if (OB_FAIL(clear_task_exec_env(das_ref, task_op))) {
|
if (OB_FAIL(clear_task_exec_env(das_ref, task_op))) {
|
||||||
@ -191,9 +192,40 @@ int ObDataAccessService::retry_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op
|
|||||||
LOG_WARN("execute dist das task failed", K(ret));
|
LOG_WARN("execute dist das task failed", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bool ObDataAccessService::can_fast_fail(const ObIDASTaskOp &task_op) const
|
||||||
|
{
|
||||||
|
bool bret = false;
|
||||||
|
int ret = OB_SUCCESS; // no need to pass ret outside.
|
||||||
|
const common::ObTableID &table_id = IS_DAS_DML_OP(task_op)
|
||||||
|
? static_cast<const ObDASDMLBaseCtDef *>(task_op.get_ctdef())->table_id_
|
||||||
|
: static_cast<const ObDASScanCtDef *>(task_op.get_ctdef())->ref_table_id_;
|
||||||
|
int64_t schema_version = IS_DAS_DML_OP(task_op)
|
||||||
|
? static_cast<const ObDASDMLBaseCtDef *>(task_op.get_ctdef())->schema_version_
|
||||||
|
: static_cast<const ObDASScanCtDef *>(task_op.get_ctdef())->schema_version_;
|
||||||
|
schema::ObSchemaGetterGuard schema_guard;
|
||||||
|
const schema::ObTableSchema *table_schema = nullptr;
|
||||||
|
if (OB_ISNULL(GCTX.schema_service_)) {
|
||||||
|
LOG_ERROR("invalid schema service", KR(ret));
|
||||||
|
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(MTL_ID(), schema_guard))) {
|
||||||
|
LOG_WARN("get tenant schema guard fail", KR(ret), K(MTL_ID()));
|
||||||
|
} else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) {
|
||||||
|
LOG_WARN("failed to get table schema", KR(ret));
|
||||||
|
} else if (OB_ISNULL(table_schema)) {
|
||||||
|
bret = true;
|
||||||
|
LOG_WARN("table not exist, fast fail das task");
|
||||||
|
} else if (table_schema->get_schema_version() != schema_version) {
|
||||||
|
bret = true;
|
||||||
|
LOG_WARN("schema version changed, fast fail das task", "current schema version",
|
||||||
|
table_schema->get_schema_version(), "query schema version", schema_version);
|
||||||
|
}
|
||||||
|
return bret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObDataAccessService::end_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op)
|
int ObDataAccessService::end_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|||||||
@ -58,6 +58,7 @@ private:
|
|||||||
ObIDASTaskOp *task_op,
|
ObIDASTaskOp *task_op,
|
||||||
ObDASExtraData *&extra_result);
|
ObDASExtraData *&extra_result);
|
||||||
int collect_das_task_info(ObDASTaskArg &task_arg, ObDASRemoteInfo &remote_info);
|
int collect_das_task_info(ObDASTaskArg &task_arg, ObDASRemoteInfo &remote_info);
|
||||||
|
bool can_fast_fail(const ObIDASTaskOp &task_op) const;
|
||||||
private:
|
private:
|
||||||
obrpc::ObDASRpcProxy das_rpc_proxy_;
|
obrpc::ObDASRpcProxy das_rpc_proxy_;
|
||||||
common::ObAddr ctrl_addr_;
|
common::ObAddr ctrl_addr_;
|
||||||
|
|||||||
Reference in New Issue
Block a user