fix table location memory leak
This commit is contained in:
@ -91,6 +91,7 @@ int ObPXServerAddrUtil::alloc_by_data_distribution_inner(ObExecContext& ctx, ObD
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
const ObPhyTableLocation* table_loc = NULL;
|
const ObPhyTableLocation* table_loc = NULL;
|
||||||
|
ObPhyTableLocationGuard full_table_loc;
|
||||||
uint64_t table_location_key = OB_INVALID_INDEX;
|
uint64_t table_location_key = OB_INVALID_INDEX;
|
||||||
uint64_t ref_table_id = OB_INVALID_ID;
|
uint64_t ref_table_id = OB_INVALID_ID;
|
||||||
if (scan_ops.count() > 0) {
|
if (scan_ops.count() > 0) {
|
||||||
@ -106,8 +107,10 @@ int ObPXServerAddrUtil::alloc_by_data_distribution_inner(ObExecContext& ctx, ObD
|
|||||||
if (dml_op && dml_op->is_table_location_uncertain()) {
|
if (dml_op && dml_op->is_table_location_uncertain()) {
|
||||||
bool is_weak = false;
|
bool is_weak = false;
|
||||||
if (OB_FAIL(ObTaskExecutorCtxUtil::get_full_table_phy_table_location(
|
if (OB_FAIL(ObTaskExecutorCtxUtil::get_full_table_phy_table_location(
|
||||||
ctx, table_location_key, ref_table_id, is_weak, table_loc))) {
|
ctx, table_location_key, ref_table_id, is_weak, full_table_loc))) {
|
||||||
LOG_WARN("fail to get phy table location", K(ret));
|
LOG_WARN("fail to get phy table location", K(ret));
|
||||||
|
} else {
|
||||||
|
table_loc = full_table_loc.get_loc();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (OB_FAIL(ObTaskExecutorCtxUtil::get_phy_table_location(ctx, table_location_key, ref_table_id, table_loc))) {
|
if (OB_FAIL(ObTaskExecutorCtxUtil::get_phy_table_location(ctx, table_location_key, ref_table_id, table_loc))) {
|
||||||
@ -504,6 +507,7 @@ int ObPXServerAddrUtil::set_dfo_accessed_location(
|
|||||||
// pass
|
// pass
|
||||||
} else {
|
} else {
|
||||||
const ObPhyTableLocation* table_loc = nullptr;
|
const ObPhyTableLocation* table_loc = nullptr;
|
||||||
|
ObPhyTableLocationGuard full_table_loc;
|
||||||
uint64_t table_location_key = common::OB_INVALID_ID;
|
uint64_t table_location_key = common::OB_INVALID_ID;
|
||||||
uint64_t ref_table_id = common::OB_INVALID_ID;
|
uint64_t ref_table_id = common::OB_INVALID_ID;
|
||||||
if (FALSE_IT(table_location_key = dml_op->get_table_id())) {
|
if (FALSE_IT(table_location_key = dml_op->get_table_id())) {
|
||||||
@ -512,8 +516,10 @@ int ObPXServerAddrUtil::set_dfo_accessed_location(
|
|||||||
if (dml_op->is_table_location_uncertain()) {
|
if (dml_op->is_table_location_uncertain()) {
|
||||||
bool is_weak = false;
|
bool is_weak = false;
|
||||||
if (OB_FAIL(ObTaskExecutorCtxUtil::get_full_table_phy_table_location(
|
if (OB_FAIL(ObTaskExecutorCtxUtil::get_full_table_phy_table_location(
|
||||||
ctx, table_location_key, ref_table_id, is_weak, table_loc))) {
|
ctx, table_location_key, ref_table_id, is_weak, full_table_loc))) {
|
||||||
LOG_WARN("fail to get phy table location", K(ret));
|
LOG_WARN("fail to get phy table location", K(ret));
|
||||||
|
} else {
|
||||||
|
table_loc = full_table_loc.get_loc();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (OB_FAIL(ObTaskExecutorCtxUtil::get_phy_table_location(ctx, table_location_key, ref_table_id, table_loc))) {
|
if (OB_FAIL(ObTaskExecutorCtxUtil::get_phy_table_location(ctx, table_location_key, ref_table_id, table_loc))) {
|
||||||
|
|||||||
@ -335,8 +335,9 @@ int ObTaskExecutorCtxUtil::get_part_runner_server(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTaskExecutorCtxUtil::get_full_table_phy_table_location(ObExecContext& ctx, uint64_t table_location_key,
|
// 每次调用都会 allocate 一个 table_location
|
||||||
uint64_t ref_table_id, bool is_weak, const ObPhyTableLocation*& table_location)
|
int ObTaskExecutorCtxUtil::get_full_table_phy_table_location(ObExecContext &ctx, uint64_t table_location_key,
|
||||||
|
uint64_t ref_table_id, bool is_weak, ObPhyTableLocationGuard &table_location)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObPhyTableLocationInfo phy_location_info;
|
ObPhyTableLocationInfo phy_location_info;
|
||||||
@ -348,9 +349,6 @@ int ObTaskExecutorCtxUtil::get_full_table_phy_table_location(ObExecContext& ctx,
|
|||||||
const ObTableSchema* table_schema = NULL;
|
const ObTableSchema* table_schema = NULL;
|
||||||
const uint64_t tenant_id = extract_tenant_id(ref_table_id);
|
const uint64_t tenant_id = extract_tenant_id(ref_table_id);
|
||||||
// ObPhysicalPlanCtx *plan_ctx = ctx.get_physical_plan_ctx();
|
// ObPhysicalPlanCtx *plan_ctx = ctx.get_physical_plan_ctx();
|
||||||
ObPhyTableLocation* loc = nullptr;
|
|
||||||
table_location = NULL;
|
|
||||||
|
|
||||||
if (OB_ISNULL(location_cache) || OB_ISNULL(schema_service)) {
|
if (OB_ISNULL(location_cache) || OB_ISNULL(schema_service)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("location cache or schema_service is null", KP(location_cache), KP(schema_service), K(ret));
|
LOG_WARN("location cache or schema_service is null", KP(location_cache), KP(schema_service), K(ret));
|
||||||
@ -380,18 +378,13 @@ int ObTaskExecutorCtxUtil::get_full_table_phy_table_location(ObExecContext& ctx,
|
|||||||
|
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
// bypass
|
// bypass
|
||||||
} else if (NULL == (loc = static_cast<ObPhyTableLocation*>(ctx.get_allocator().alloc(sizeof(ObPhyTableLocation))))) {
|
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
|
||||||
} else if (NULL == (loc = new (loc) ObPhyTableLocation())) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("fail new object", K(ret));
|
|
||||||
} else if (OB_FAIL(ObTableLocation::get_phy_table_location_info(
|
} else if (OB_FAIL(ObTableLocation::get_phy_table_location_info(
|
||||||
ctx, table_location_key, ref_table_id, is_weak, part_ids, *location_cache, phy_location_info))) {
|
ctx, table_location_key, ref_table_id, is_weak, part_ids, *location_cache, phy_location_info))) {
|
||||||
LOG_WARN("get phy table location info failed", K(ret));
|
LOG_WARN("get phy table location info failed", K(ret));
|
||||||
} else if (OB_FAIL(loc->add_partition_locations(phy_location_info))) {
|
} else if (OB_FAIL(table_location.new_location(ctx.get_allocator()))) {
|
||||||
|
LOG_WARN("fail alloc new location", K(ret));
|
||||||
|
} else if (OB_FAIL(table_location.get_loc()->add_partition_locations(phy_location_info))) {
|
||||||
LOG_WARN("add partition locations failed", K(ret), K(phy_location_info));
|
LOG_WARN("add partition locations failed", K(ret), K(phy_location_info));
|
||||||
} else {
|
|
||||||
table_location = loc;
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -310,8 +310,8 @@ public:
|
|||||||
ObTaskExecutorCtx& ctx, uint64_t table_location_key, uint64_t ref_table_id, ObPhyTableLocation*& table_location);
|
ObTaskExecutorCtx& ctx, uint64_t table_location_key, uint64_t ref_table_id, ObPhyTableLocation*& table_location);
|
||||||
static ObPhyTableLocation* get_phy_table_location_for_update(
|
static ObPhyTableLocation* get_phy_table_location_for_update(
|
||||||
ObTaskExecutorCtx& ctx, uint64_t table_location_key, uint64_t ref_table_id);
|
ObTaskExecutorCtx& ctx, uint64_t table_location_key, uint64_t ref_table_id);
|
||||||
static int get_full_table_phy_table_location(ObExecContext& ctx, uint64_t table_location_key, uint64_t ref_table_id,
|
static int get_full_table_phy_table_location(ObExecContext &ctx, uint64_t table_location_key, uint64_t ref_table_id,
|
||||||
bool is_weak, const ObPhyTableLocation*& table_location);
|
bool is_weak, ObPhyTableLocationGuard &table_location);
|
||||||
|
|
||||||
static int extract_server_participants(
|
static int extract_server_participants(
|
||||||
ObExecContext& ctx, const common::ObAddr& svr, common::ObPartitionIArray& participants);
|
ObExecContext& ctx, const common::ObAddr& svr, common::ObPartitionIArray& participants);
|
||||||
|
|||||||
@ -185,6 +185,41 @@ int ObPhyTableLocation::find_not_include_part_ids(const SrcArray& all_part_ids,
|
|||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class ObPhyTableLocationGuard {
|
||||||
|
public:
|
||||||
|
ObPhyTableLocationGuard() : loc_(nullptr){};
|
||||||
|
~ObPhyTableLocationGuard()
|
||||||
|
{
|
||||||
|
if (loc_) {
|
||||||
|
loc_->~ObPhyTableLocation();
|
||||||
|
loc_ = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
int new_location(common::ObIAllocator &allocator)
|
||||||
|
{
|
||||||
|
int ret = common::OB_SUCCESS;
|
||||||
|
void *buf = nullptr;
|
||||||
|
if (OB_NOT_NULL(loc_)) {
|
||||||
|
// init twice
|
||||||
|
ret = common::OB_ERR_UNEXPECTED;
|
||||||
|
} else if (nullptr == (buf = allocator.alloc(sizeof(ObPhyTableLocation)))) {
|
||||||
|
ret = common::OB_ALLOCATE_MEMORY_FAILED;
|
||||||
|
} else if (NULL == (loc_ = new (buf) ObPhyTableLocation())) {
|
||||||
|
ret = common::OB_ERR_UNEXPECTED;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
// caller must ensure that the loc_ is not NULL before call get_loc()
|
||||||
|
ObPhyTableLocation *get_loc()
|
||||||
|
{
|
||||||
|
return loc_;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
ObPhyTableLocation *loc_;
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace sql
|
} // namespace sql
|
||||||
} // namespace oceanbase
|
} // namespace oceanbase
|
||||||
#endif /* OCEANBASE_SQL_OB_PHY_TABLE_LOCATION_ */
|
#endif /* OCEANBASE_SQL_OB_PHY_TABLE_LOCATION_ */
|
||||||
|
|||||||
Reference in New Issue
Block a user