From ad1c2616f68b6259a0df46ee538f9be4561b8c19 Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 10 Nov 2022 01:05:53 +0000 Subject: [PATCH] fix complemend data dag hold mem unexpectedly. --- src/observer/ob_service.cpp | 16 ++++----- src/storage/ddl/ob_complement_data_task.cpp | 40 ++++++++++++++------- src/storage/ddl/ob_complement_data_task.h | 5 +-- 3 files changed, 38 insertions(+), 23 deletions(-) diff --git a/src/observer/ob_service.cpp b/src/observer/ob_service.cpp index 47cb5f375..4b31fc455 100644 --- a/src/observer/ob_service.cpp +++ b/src/observer/ob_service.cpp @@ -1846,6 +1846,7 @@ int ObService::build_ddl_single_replica_request(const ObDDLBuildSingleReplicaReq || DDL_ADD_COLUMN_OFFLINE == arg.ddl_type_ || DDL_COLUMN_REDEFINITION == arg.ddl_type_) { MTL_SWITCH(arg.tenant_id_) { + int saved_ret = OB_SUCCESS; ObTenantDagScheduler *dag_scheduler = nullptr; ObComplementDataDag *dag = nullptr; if (OB_ISNULL(dag_scheduler = MTL(ObTenantDagScheduler *))) { @@ -1861,20 +1862,19 @@ int ObService::build_ddl_single_replica_request(const ObDDLBuildSingleReplicaReq } else if (OB_FAIL(dag->create_first_task())) { LOG_WARN("create first task failed", K(ret)); } else if (OB_FAIL(dag_scheduler->add_dag(dag))) { - if (OB_EAGAIN == ret) { - ret = OB_SUCCESS; - LOG_INFO("drop column dag already exists, no need to schedule once again"); - } else if (OB_SIZE_OVERFLOW == ret) { - ret = OB_EAGAIN; - } else { - LOG_WARN("fail to add dag to queue", K(ret)); - } + saved_ret = ret; + LOG_WARN("add dag failed", K(ret), K(arg)); } if (OB_FAIL(ret) && OB_NOT_NULL(dag)) { (void) dag->handle_init_failed_ret_code(ret); dag_scheduler->free_dag(*dag); dag = nullptr; } + if (OB_FAIL(ret)) { + // RS does not retry send RPC to tablet leader when the dag exists. + ret = OB_EAGAIN == saved_ret ? OB_SUCCESS : ret; + ret = OB_SIZE_OVERFLOW == saved_ret ? OB_EAGAIN : ret; + } } LOG_INFO("obs get rpc to build drop column dag", K(ret)); } else { diff --git a/src/storage/ddl/ob_complement_data_task.cpp b/src/storage/ddl/ob_complement_data_task.cpp index 50dba32f7..369e3db10 100644 --- a/src/storage/ddl/ob_complement_data_task.cpp +++ b/src/storage/ddl/ob_complement_data_task.cpp @@ -131,18 +131,20 @@ int ObComplementDataParam::deep_copy_table_schemas(const ObTableSchema *data_tab LOG_WARN("invalid arguments", K(ret), KP(data_table_schema), KP(hidden_table_schema)); } else { ObIAllocator &allocator = allocator_; - const int64_t alloc_size = 2 * sizeof(ObTableSchema); - char *buf = nullptr; - if (OB_ISNULL(buf = static_cast(allocator.alloc(alloc_size)))) { + const int64_t alloc_size = sizeof(ObTableSchema); + char *buf_for_data_schema = nullptr; + char *buf_for_hidden_schema = nullptr; + if (OB_ISNULL(buf_for_data_schema = static_cast(allocator.alloc(alloc_size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to allocate memory", K(ret)); + LOG_WARN("alloc memory failed", K(ret)); + } else if (OB_ISNULL(buf_for_hidden_schema = static_cast(allocator.alloc(alloc_size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("alloc memory failed", K(ret)); } else { ObTableSchema *deep_copy_data_table_schema = nullptr; ObTableSchema *deep_copy_hidden_table_schema = nullptr; - deep_copy_data_table_schema = new (buf) ObTableSchema(&allocator); - buf += sizeof(ObTableSchema); - deep_copy_hidden_table_schema = new (buf) ObTableSchema(&allocator); - buf += sizeof(ObTableSchema); + deep_copy_data_table_schema = new (buf_for_data_schema) ObTableSchema(&allocator); + deep_copy_hidden_table_schema = new (buf_for_hidden_schema) ObTableSchema(&allocator); if (OB_FAIL(deep_copy_data_table_schema->assign(*data_table_schema))) { LOG_WARN("fail to assign data table schema", K(ret)); } else if (OB_FAIL(deep_copy_hidden_table_schema->assign(*hidden_table_schema))) { @@ -151,11 +153,23 @@ int ObComplementDataParam::deep_copy_table_schemas(const ObTableSchema *data_tab data_table_schema_ = deep_copy_data_table_schema; hidden_table_schema_ = deep_copy_hidden_table_schema; } - if (OB_FAIL(ret) && OB_NOT_NULL(buf)) { - deep_copy_data_table_schema->~ObTableSchema(); - deep_copy_hidden_table_schema->~ObTableSchema(); - allocator.free(buf); - buf = nullptr; + if (OB_FAIL(ret)) { + if (nullptr != deep_copy_data_table_schema) { + deep_copy_data_table_schema->~ObTableSchema(); + deep_copy_data_table_schema = nullptr; + } + if (nullptr != buf_for_data_schema) { + allocator_.free(buf_for_data_schema); + buf_for_data_schema = nullptr; + } + if (nullptr != deep_copy_hidden_table_schema) { + deep_copy_hidden_table_schema->~ObTableSchema(); + deep_copy_hidden_table_schema = nullptr; + } + if (nullptr != buf_for_hidden_schema) { + allocator_.free(buf_for_hidden_schema); + buf_for_hidden_schema = nullptr; + } } } } diff --git a/src/storage/ddl/ob_complement_data_task.h b/src/storage/ddl/ob_complement_data_task.h index 2cdc6057d..eb9756760 100644 --- a/src/storage/ddl/ob_complement_data_task.h +++ b/src/storage/ddl/ob_complement_data_task.h @@ -39,7 +39,7 @@ public: ObComplementDataParam(): is_inited_(false), tenant_id_(common::OB_INVALID_TENANT_ID), ls_id_(share::ObLSID::INVALID_LS_ID), source_tablet_id_(ObTabletID::INVALID_TABLET_ID), dest_tablet_id_(ObTabletID::INVALID_TABLET_ID), - data_table_schema_(nullptr), hidden_table_schema_(nullptr), allocator_("ComplementData"), + data_table_schema_(nullptr), hidden_table_schema_(nullptr), allocator_("CompleteDataPar"), row_store_type_(common::ENCODING_ROW_STORE), schema_version_(0), snapshot_version_(0), concurrent_cnt_(0), task_id_(0), execution_id_(0), compat_mode_(lib::Worker::CompatMode::INVALID) {} @@ -70,6 +70,7 @@ public: } data_table_schema_ = nullptr; hidden_table_schema_ = nullptr; + ranges_.reset(); allocator_.reset(); row_store_type_ = common::ENCODING_ROW_STORE; schema_version_ = 0; @@ -106,7 +107,7 @@ struct ObComplementDataContext final public: ObComplementDataContext(): is_inited_(false), is_major_sstable_exist_(false), complement_data_ret_(common::OB_SUCCESS), - allocator_("ComplementData"), lock_(), concurrent_cnt_(0), data_sstable_redo_writer_(), index_builder_(nullptr) + allocator_("CompleteDataCtx"), lock_(), concurrent_cnt_(0), data_sstable_redo_writer_(), index_builder_(nullptr) {} ~ObComplementDataContext() { destroy(); } int init(const ObComplementDataParam ¶m, const ObDataStoreDesc &desc);