fix complemend data dag hold mem unexpectedly.
This commit is contained in:
		| @ -1846,6 +1846,7 @@ int ObService::build_ddl_single_replica_request(const ObDDLBuildSingleReplicaReq | ||||
|         || DDL_ADD_COLUMN_OFFLINE == arg.ddl_type_ | ||||
|         || DDL_COLUMN_REDEFINITION == arg.ddl_type_) { | ||||
|       MTL_SWITCH(arg.tenant_id_) { | ||||
|         int saved_ret = OB_SUCCESS; | ||||
|         ObTenantDagScheduler *dag_scheduler = nullptr; | ||||
|         ObComplementDataDag *dag = nullptr; | ||||
|         if (OB_ISNULL(dag_scheduler = MTL(ObTenantDagScheduler *))) { | ||||
| @ -1861,20 +1862,19 @@ int ObService::build_ddl_single_replica_request(const ObDDLBuildSingleReplicaReq | ||||
|         } else if (OB_FAIL(dag->create_first_task())) { | ||||
|           LOG_WARN("create first task failed", K(ret)); | ||||
|         } else if (OB_FAIL(dag_scheduler->add_dag(dag))) { | ||||
|           if (OB_EAGAIN == ret) { | ||||
|             ret = OB_SUCCESS; | ||||
|             LOG_INFO("drop column dag already exists, no need to schedule once again"); | ||||
|           } else if (OB_SIZE_OVERFLOW == ret) { | ||||
|             ret = OB_EAGAIN; | ||||
|           } else { | ||||
|             LOG_WARN("fail to add dag to queue", K(ret)); | ||||
|           } | ||||
|           saved_ret = ret; | ||||
|           LOG_WARN("add dag failed", K(ret), K(arg)); | ||||
|         } | ||||
|         if (OB_FAIL(ret) && OB_NOT_NULL(dag)) { | ||||
|           (void) dag->handle_init_failed_ret_code(ret); | ||||
|           dag_scheduler->free_dag(*dag); | ||||
|           dag = nullptr; | ||||
|         } | ||||
|         if (OB_FAIL(ret)) { | ||||
|           // RS does not retry send RPC to tablet leader when the dag exists. | ||||
|           ret = OB_EAGAIN == saved_ret ? OB_SUCCESS : ret; | ||||
|           ret = OB_SIZE_OVERFLOW == saved_ret ? OB_EAGAIN : ret; | ||||
|         } | ||||
|       } | ||||
|       LOG_INFO("obs get rpc to build drop column dag", K(ret)); | ||||
|     } else { | ||||
|  | ||||
| @ -131,18 +131,20 @@ int ObComplementDataParam::deep_copy_table_schemas(const ObTableSchema *data_tab | ||||
|     LOG_WARN("invalid arguments", K(ret), KP(data_table_schema), KP(hidden_table_schema)); | ||||
|   } else { | ||||
|     ObIAllocator &allocator = allocator_; | ||||
|     const int64_t alloc_size = 2 * sizeof(ObTableSchema); | ||||
|     char *buf = nullptr; | ||||
|     if (OB_ISNULL(buf = static_cast<char *>(allocator.alloc(alloc_size)))) { | ||||
|     const int64_t alloc_size = sizeof(ObTableSchema); | ||||
|     char *buf_for_data_schema = nullptr; | ||||
|     char *buf_for_hidden_schema = nullptr; | ||||
|     if (OB_ISNULL(buf_for_data_schema = static_cast<char *>(allocator.alloc(alloc_size)))) { | ||||
|       ret = OB_ALLOCATE_MEMORY_FAILED; | ||||
|       LOG_WARN("fail to allocate memory", K(ret)); | ||||
|       LOG_WARN("alloc memory failed", K(ret)); | ||||
|     } else if (OB_ISNULL(buf_for_hidden_schema = static_cast<char *>(allocator.alloc(alloc_size)))) { | ||||
|       ret = OB_ALLOCATE_MEMORY_FAILED; | ||||
|       LOG_WARN("alloc memory failed", K(ret)); | ||||
|     } else { | ||||
|       ObTableSchema *deep_copy_data_table_schema = nullptr; | ||||
|       ObTableSchema *deep_copy_hidden_table_schema = nullptr; | ||||
|       deep_copy_data_table_schema = new (buf) ObTableSchema(&allocator); | ||||
|       buf += sizeof(ObTableSchema); | ||||
|       deep_copy_hidden_table_schema = new (buf) ObTableSchema(&allocator); | ||||
|       buf += sizeof(ObTableSchema); | ||||
|       deep_copy_data_table_schema = new (buf_for_data_schema) ObTableSchema(&allocator); | ||||
|       deep_copy_hidden_table_schema = new (buf_for_hidden_schema) ObTableSchema(&allocator); | ||||
|       if (OB_FAIL(deep_copy_data_table_schema->assign(*data_table_schema))) { | ||||
|         LOG_WARN("fail to assign data table schema", K(ret)); | ||||
|       } else if (OB_FAIL(deep_copy_hidden_table_schema->assign(*hidden_table_schema))) { | ||||
| @ -151,11 +153,23 @@ int ObComplementDataParam::deep_copy_table_schemas(const ObTableSchema *data_tab | ||||
|         data_table_schema_ = deep_copy_data_table_schema; | ||||
|         hidden_table_schema_ = deep_copy_hidden_table_schema; | ||||
|       } | ||||
|       if (OB_FAIL(ret) && OB_NOT_NULL(buf)) { | ||||
|         deep_copy_data_table_schema->~ObTableSchema(); | ||||
|         deep_copy_hidden_table_schema->~ObTableSchema(); | ||||
|         allocator.free(buf); | ||||
|         buf = nullptr; | ||||
|       if (OB_FAIL(ret)) { | ||||
|         if (nullptr != deep_copy_data_table_schema) { | ||||
|           deep_copy_data_table_schema->~ObTableSchema(); | ||||
|           deep_copy_data_table_schema = nullptr; | ||||
|         } | ||||
|         if (nullptr != buf_for_data_schema) { | ||||
|           allocator_.free(buf_for_data_schema); | ||||
|           buf_for_data_schema = nullptr; | ||||
|         } | ||||
|         if (nullptr != deep_copy_hidden_table_schema) { | ||||
|           deep_copy_hidden_table_schema->~ObTableSchema(); | ||||
|           deep_copy_hidden_table_schema = nullptr; | ||||
|         } | ||||
|         if (nullptr != buf_for_hidden_schema) { | ||||
|           allocator_.free(buf_for_hidden_schema); | ||||
|           buf_for_hidden_schema = nullptr; | ||||
|         } | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|  | ||||
| @ -39,7 +39,7 @@ public: | ||||
|   ObComplementDataParam(): | ||||
|     is_inited_(false), tenant_id_(common::OB_INVALID_TENANT_ID), ls_id_(share::ObLSID::INVALID_LS_ID),  | ||||
|     source_tablet_id_(ObTabletID::INVALID_TABLET_ID), dest_tablet_id_(ObTabletID::INVALID_TABLET_ID),  | ||||
|     data_table_schema_(nullptr), hidden_table_schema_(nullptr), allocator_("ComplementData"),  | ||||
|     data_table_schema_(nullptr), hidden_table_schema_(nullptr), allocator_("CompleteDataPar"),  | ||||
|     row_store_type_(common::ENCODING_ROW_STORE), schema_version_(0), snapshot_version_(0), | ||||
|     concurrent_cnt_(0), task_id_(0), execution_id_(0), compat_mode_(lib::Worker::CompatMode::INVALID) | ||||
|   {} | ||||
| @ -70,6 +70,7 @@ public: | ||||
|     } | ||||
|     data_table_schema_ = nullptr; | ||||
|     hidden_table_schema_ = nullptr; | ||||
|     ranges_.reset(); | ||||
|     allocator_.reset(); | ||||
|     row_store_type_ = common::ENCODING_ROW_STORE; | ||||
|     schema_version_ = 0; | ||||
| @ -106,7 +107,7 @@ struct ObComplementDataContext final | ||||
| public: | ||||
|   ObComplementDataContext(): | ||||
|     is_inited_(false), is_major_sstable_exist_(false), complement_data_ret_(common::OB_SUCCESS), | ||||
|     allocator_("ComplementData"), lock_(), concurrent_cnt_(0), data_sstable_redo_writer_(), index_builder_(nullptr) | ||||
|     allocator_("CompleteDataCtx"), lock_(), concurrent_cnt_(0), data_sstable_redo_writer_(), index_builder_(nullptr) | ||||
|   {} | ||||
|   ~ObComplementDataContext() { destroy(); } | ||||
|   int init(const ObComplementDataParam ¶m, const ObDataStoreDesc &desc); | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 obdev
					obdev