fix DAS dynamic partition pruning memleak
This commit is contained in:
@ -309,7 +309,7 @@ OB_INLINE int ObDASCtx::build_related_tablet_loc(ObDASTabletLoc &tablet_loc)
|
||||
ObTableID related_table_id = tablet_loc.loc_meta_->related_table_ids_.at(i);
|
||||
ObDASTableLoc *related_table_loc = nullptr;
|
||||
ObDASTabletLoc *related_tablet_loc = nullptr;
|
||||
DASRelatedTabletMap::Value rv;
|
||||
const DASRelatedTabletMap::Value *rv = nullptr;
|
||||
void *related_loc_buf = allocator_.alloc(sizeof(ObDASTabletLoc));
|
||||
if (OB_ISNULL(related_loc_buf)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
@ -319,18 +319,32 @@ OB_INLINE int ObDASCtx::build_related_tablet_loc(ObDASTabletLoc &tablet_loc)
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get table loc by id failed", K(ret), KPC(tablet_loc.loc_meta_),
|
||||
K(related_table_id), K(table_locs_));
|
||||
} else if (OB_FAIL(related_tablet_map_.get_related_tablet_id(
|
||||
tablet_loc.tablet_id_, related_table_id, rv))) {
|
||||
LOG_WARN("get related tablet id failed", K(ret));
|
||||
} else {
|
||||
} else if (OB_ISNULL(rv = related_tablet_map_.get_related_tablet_id(tablet_loc.tablet_id_,
|
||||
related_table_id))) {
|
||||
//related local index tablet_id pruning only can be used in local plan or remote plan(all operator
|
||||
//use the same das context),
|
||||
//because the distributed plan will transfer tablet_id through exchange operator,
|
||||
//but the related tablet_id map can not be transfered by exchange operator,
|
||||
//unused related pruning in distributed plan's dml operator,
|
||||
//we will use get_all_tablet_and_object_id() to build the related tablet_id map when
|
||||
//dml operator's table loc was inited
|
||||
if (OB_FAIL(build_related_tablet_map(*tablet_loc.loc_meta_))) {
|
||||
LOG_WARN("build related tablet map failed", K(ret), KPC(tablet_loc.loc_meta_));
|
||||
} else if (OB_ISNULL(rv = related_tablet_map_.get_related_tablet_id(tablet_loc.tablet_id_,
|
||||
related_table_id))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get related tablet id failed", K(ret), K(tablet_loc.tablet_id_), K(related_table_id));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
related_tablet_loc = new(related_loc_buf) ObDASTabletLoc();
|
||||
related_tablet_loc->tablet_id_ = rv.tablet_id_;
|
||||
related_tablet_loc->tablet_id_ = rv->tablet_id_;
|
||||
related_tablet_loc->ls_id_ = tablet_loc.ls_id_;
|
||||
related_tablet_loc->server_ = tablet_loc.server_;
|
||||
related_tablet_loc->loc_meta_ = related_table_loc->loc_meta_;
|
||||
related_tablet_loc->next_ = tablet_loc.next_;
|
||||
related_tablet_loc->partition_id_ = rv.part_id_;
|
||||
related_tablet_loc->first_level_part_id_ = rv.first_level_part_id_;
|
||||
related_tablet_loc->partition_id_ = rv->part_id_;
|
||||
related_tablet_loc->first_level_part_id_ = rv->first_level_part_id_;
|
||||
tablet_loc.next_ = related_tablet_loc;
|
||||
if (OB_FAIL(related_table_loc->add_tablet_loc(related_tablet_loc))) {
|
||||
LOG_WARN("add related tablet location failed", K(ret));
|
||||
@ -555,6 +569,20 @@ void ObDASCtx::unmark_need_check_server()
|
||||
}
|
||||
}
|
||||
|
||||
int ObDASCtx::build_related_tablet_map(const ObDASTableLocMeta &loc_meta)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObDASTabletMapper tablet_mapper;
|
||||
ObArray<ObTabletID> tablet_ids;
|
||||
ObArray<ObObjectID> partition_ids;
|
||||
if (OB_FAIL(get_das_tablet_mapper(loc_meta.ref_table_id_, tablet_mapper, &loc_meta.related_table_ids_))) {
|
||||
LOG_WARN("get das tablet mapper failed", K(ret));
|
||||
} else if (OB_FAIL(tablet_mapper.get_all_tablet_and_object_id(tablet_ids, partition_ids))) {
|
||||
LOG_WARN("build related tablet_id map failed", K(ret), K(loc_meta));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_DEF_SERIALIZE(ObDASCtx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -110,6 +110,7 @@ public:
|
||||
|
||||
int build_external_table_location(
|
||||
uint64_t table_loc_id, uint64_t ref_table_id, common::ObIArray<ObAddr> &locations);
|
||||
int build_related_tablet_map(const ObDASTableLocMeta &loc_meta);
|
||||
|
||||
TO_STRING_KV(K_(table_locs),
|
||||
K_(external_table_locs),
|
||||
|
@ -319,7 +319,7 @@ int TabletHashMap::get(const ObTabletID key, ObDASTabletLoc *&value)
|
||||
LOG_WARN("find node failed", KR(ret));
|
||||
} else if (NULL == dst_node) {
|
||||
ret = OB_HASH_NOT_EXIST;
|
||||
LOG_WARN("key dost not exist", KR(ret), K(key));
|
||||
LOG_DEBUG("key dose not exist", KR(ret), K(key));
|
||||
} else {
|
||||
value = dst_node->value_;
|
||||
}
|
||||
|
@ -31,11 +31,11 @@ using namespace transaction;
|
||||
namespace sql
|
||||
{
|
||||
OB_SERIALIZE_MEMBER(DASRelatedTabletMap::MapEntry,
|
||||
src_tablet_id_,
|
||||
related_table_id_,
|
||||
related_tablet_id_,
|
||||
related_part_id_,
|
||||
related_first_level_part_id_);
|
||||
key_.src_tablet_id_,
|
||||
key_.related_table_id_,
|
||||
val_.tablet_id_,
|
||||
val_.part_id_,
|
||||
val_.first_level_part_id_);
|
||||
|
||||
int VirtualSvrPair::init(ObIAllocator &allocator,
|
||||
ObTableID vt_id,
|
||||
@ -119,34 +119,93 @@ int DASRelatedTabletMap::add_related_tablet_id(ObTabletID src_tablet_id,
|
||||
ObObjectID related_part_id,
|
||||
ObObjectID related_first_level_part_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (nullptr == get_related_tablet_id(src_tablet_id, related_table_id)) {
|
||||
MapEntry map_entry;
|
||||
map_entry.src_tablet_id_ = src_tablet_id;
|
||||
map_entry.related_table_id_ = related_table_id;
|
||||
map_entry.related_tablet_id_ = related_tablet_id;
|
||||
map_entry.related_part_id_ = related_part_id;
|
||||
map_entry.related_first_level_part_id_ = related_first_level_part_id;
|
||||
return list_.push_back(map_entry);
|
||||
map_entry.key_.src_tablet_id_ = src_tablet_id;
|
||||
map_entry.key_.related_table_id_ = related_table_id;
|
||||
map_entry.val_.tablet_id_ = related_tablet_id;
|
||||
map_entry.val_.part_id_ = related_part_id;
|
||||
map_entry.val_.first_level_part_id_ = related_first_level_part_id;
|
||||
if (OB_FAIL(list_.push_back(map_entry))) {
|
||||
LOG_WARN("store the related tablet entry failed", K(ret), K(map_entry));
|
||||
} else if (list_.size() > FAST_LOOP_LIST_LEN) {
|
||||
//The length of the list is already long enough,
|
||||
//and searching through it using iteration will be slow.
|
||||
//Therefore, constructing a map can accelerate the search in this situation.
|
||||
if (OB_FAIL(insert_related_tablet_map())) {
|
||||
LOG_WARN("create related tablet map failed", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int DASRelatedTabletMap::get_related_tablet_id(ObTabletID src_tablet_id,
|
||||
ObTableID related_table_id,
|
||||
Value &val)
|
||||
const DASRelatedTabletMap::Value *DASRelatedTabletMap::get_related_tablet_id(ObTabletID src_tablet_id,
|
||||
ObTableID related_table_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const Value *val = nullptr;
|
||||
if (list_.size() > FAST_LOOP_LIST_LEN) {
|
||||
Key tmp_key;
|
||||
tmp_key.src_tablet_id_ = src_tablet_id;
|
||||
tmp_key.related_table_id_ = related_table_id;
|
||||
Value* const *val_ptr = map_.get(&tmp_key);
|
||||
val = (val_ptr != nullptr ? *val_ptr : nullptr);
|
||||
} else {
|
||||
MapEntry *final_entry = nullptr;
|
||||
FOREACH_X(node, list_, final_entry == nullptr) {
|
||||
MapEntry &entry = *node;
|
||||
if (entry.src_tablet_id_ == src_tablet_id && entry.related_table_id_ == related_table_id) {
|
||||
if (entry.key_.src_tablet_id_ == src_tablet_id &&
|
||||
entry.key_.related_table_id_ == related_table_id) {
|
||||
final_entry = &entry;
|
||||
}
|
||||
}
|
||||
if (OB_LIKELY(final_entry != nullptr)) {
|
||||
val.tablet_id_ = final_entry->related_tablet_id_;
|
||||
val.part_id_ = final_entry->related_part_id_;
|
||||
val.first_level_part_id_ = final_entry->related_first_level_part_id_;
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get related tablet id failed", K(ret), K(src_tablet_id), K(related_table_id), K(list_));
|
||||
val = &final_entry->val_;
|
||||
}
|
||||
}
|
||||
return val;
|
||||
}
|
||||
|
||||
int DASRelatedTabletMap::assign(const RelatedTabletList &list)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
clear();
|
||||
FOREACH_X(node, list, OB_SUCC(ret)) {
|
||||
const MapEntry &entry = *node;
|
||||
if (OB_FAIL(add_related_tablet_id(entry.key_.src_tablet_id_,
|
||||
entry.key_.related_table_id_,
|
||||
entry.val_.tablet_id_,
|
||||
entry.val_.part_id_,
|
||||
entry.val_.first_level_part_id_))) {
|
||||
LOG_WARN("add related tablet id failed", K(ret), K(entry));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int DASRelatedTabletMap::insert_related_tablet_map()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!map_.created()) {
|
||||
if (OB_FAIL(map_.create(1000, "DASRelTblKey", "DASRelTblVal", MTL_ID()))) {
|
||||
LOG_WARN("create related tablet map failed", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (map_.empty()) {
|
||||
FOREACH_X(node, list_, OB_SUCC(ret)) {
|
||||
MapEntry &entry = *node;
|
||||
if (OB_FAIL(map_.set_refactored(&entry.key_, &entry.val_))) {
|
||||
LOG_WARN("insert entry to map failed", K(ret), K(entry));
|
||||
}
|
||||
}
|
||||
} else if (!list_.empty()) {
|
||||
MapEntry &final_entry = list_.get_last();
|
||||
if (OB_FAIL(map_.set_refactored(&final_entry.key_, &final_entry.val_))) {
|
||||
LOG_WARN("insert final entry to map failed", K(ret), K(final_entry));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -342,7 +401,7 @@ int ObDASTabletMapper::get_non_partition_tablet_id(ObIArray<ObTabletID> &tablet_
|
||||
} else {
|
||||
DASRelatedTabletMap *map = static_cast<DASRelatedTabletMap *>(related_info_.related_map_);
|
||||
if (OB_NOT_NULL(map) && OB_NOT_NULL(related_list_)
|
||||
&& OB_FAIL(map->get_list().assign(*related_list_))) {
|
||||
&& OB_FAIL(map->assign(*related_list_))) {
|
||||
LOG_WARN("failed to assign related map list", K(ret));
|
||||
}
|
||||
}
|
||||
|
@ -70,28 +70,49 @@ class DASRelatedTabletMap : public share::schema::IRelatedTabletMap
|
||||
{
|
||||
friend class ObDASCtx;
|
||||
public:
|
||||
struct MapEntry
|
||||
struct Key
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
int hash(uint64_t &res)
|
||||
{
|
||||
res += common::murmurhash(&src_tablet_id_, sizeof(src_tablet_id_), res);
|
||||
res += common::murmurhash(&related_table_id_, sizeof(related_table_id_), res);
|
||||
return common::OB_SUCCESS;
|
||||
}
|
||||
|
||||
bool operator ==(const Key &other) const
|
||||
{
|
||||
return (other.src_tablet_id_ == src_tablet_id_
|
||||
&& other.related_table_id_ == related_table_id_);
|
||||
}
|
||||
|
||||
TO_STRING_KV(K_(src_tablet_id),
|
||||
K_(related_table_id),
|
||||
K_(related_tablet_id),
|
||||
K_(related_part_id));
|
||||
K_(related_table_id));
|
||||
common::ObTabletID src_tablet_id_;
|
||||
common::ObTableID related_table_id_;
|
||||
common::ObTabletID related_tablet_id_;
|
||||
common::ObObjectID related_part_id_;
|
||||
// only valid if partition level of related table is level two
|
||||
common::ObObjectID related_first_level_part_id_;
|
||||
};
|
||||
struct Value {
|
||||
|
||||
struct Value
|
||||
{
|
||||
TO_STRING_KV(K_(tablet_id),
|
||||
K_(part_id),
|
||||
K_(first_level_part_id));
|
||||
common::ObTabletID tablet_id_;
|
||||
common::ObObjectID part_id_;
|
||||
// only valid if partition level of related table is level two
|
||||
common::ObObjectID first_level_part_id_;
|
||||
};
|
||||
|
||||
struct MapEntry
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
TO_STRING_KV(K_(key),
|
||||
K_(val));
|
||||
Key key_;
|
||||
Value val_;
|
||||
};
|
||||
typedef common::ObList<MapEntry, common::ObIAllocator> RelatedTabletList;
|
||||
typedef common::hash::ObHashMap<Key*, Value*, common::hash::NoPthreadDefendMode> RelatedTabletMap;
|
||||
public:
|
||||
DASRelatedTabletMap(common::ObIAllocator &allocator)
|
||||
: list_(allocator),
|
||||
@ -104,17 +125,25 @@ public:
|
||||
common::ObTabletID related_tablet_id,
|
||||
common::ObObjectID related_part_id,
|
||||
common::ObObjectID related_first_level_part_id) override;
|
||||
int get_related_tablet_id(common::ObTabletID src_tablet_id,
|
||||
common::ObTableID related_table_id,
|
||||
Value &val);
|
||||
void clear() { list_.clear(); }
|
||||
RelatedTabletList &get_list() { return list_; }
|
||||
const Value *get_related_tablet_id(common::ObTabletID src_tablet_id,
|
||||
common::ObTableID related_table_id);
|
||||
int assign(const RelatedTabletList &list);
|
||||
int insert_related_tablet_map();
|
||||
void clear()
|
||||
{
|
||||
list_.clear();
|
||||
map_.clear();
|
||||
}
|
||||
const RelatedTabletList &get_list() const { return list_; }
|
||||
TO_STRING_KV(K_(list));
|
||||
private:
|
||||
const int64_t FAST_LOOP_LIST_LEN = 100;
|
||||
//There are usually not many tablets for a query.
|
||||
//At this stage, use list to simulate map search,
|
||||
//and then optimize if there are performance problems later.
|
||||
RelatedTabletList list_;
|
||||
//
|
||||
RelatedTabletMap map_;
|
||||
common::ObIAllocator &allocator_;
|
||||
};
|
||||
|
||||
|
@ -1159,28 +1159,7 @@ int ObDMLService::init_das_dml_rtdef(ObDMLRtCtx &dml_rtctx,
|
||||
}
|
||||
}
|
||||
}
|
||||
//related local index tablet_id pruning only can be used in local plan or remote plan(all operator
|
||||
//use the same das context),
|
||||
//because the distributed plan will transfer tablet_id through exchange operator,
|
||||
//but the related tablet_id map can not be transfered by exchange operator,
|
||||
//unused related pruning in distributed plan's dml operator,
|
||||
//we will use get_all_tablet_and_object_id() to build the related tablet_id map when
|
||||
//dml operator's table loc was inited
|
||||
if (OB_SUCC(ret) && !das_ctdef.table_param_.get_data_table().is_index_table()) {
|
||||
const ObDASTableLocMeta *final_loc_meta = das_rtdef.table_loc_->loc_meta_;
|
||||
if (!final_loc_meta->related_table_ids_.empty() &&
|
||||
dml_rtctx.op_.get_spec().plan_->is_distributed_plan()) {
|
||||
ObDASTabletMapper tablet_mapper;
|
||||
ObArray<ObTabletID> tablet_ids;
|
||||
ObArray<ObObjectID> partition_ids;
|
||||
if (OB_FAIL(das_ctx.get_das_tablet_mapper(ref_table_id, tablet_mapper,
|
||||
&final_loc_meta->related_table_ids_))) {
|
||||
LOG_WARN("get das tablet mapper failed", K(ret));
|
||||
} else if (OB_FAIL(tablet_mapper.get_all_tablet_and_object_id(tablet_ids, partition_ids))) {
|
||||
LOG_WARN("build related tablet_id map failed", K(ret), KPC(final_loc_meta));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -594,11 +594,12 @@ int ObCandiTableLoc::replace_local_index_loc(DASRelatedTabletMap &map, ObTableID
|
||||
ref_table_id_ = ref_table_id;
|
||||
for (int64_t i = 0; i < candi_tablet_locs_.count(); ++i) {
|
||||
ObOptTabletLoc &tablet_loc = candi_tablet_locs_.at(i).get_partition_location();
|
||||
DASRelatedTabletMap::Value rv;
|
||||
if (OB_FAIL(map.get_related_tablet_id(tablet_loc.get_tablet_id(), ref_table_id, rv))) {
|
||||
LOG_WARN("related tablet info is invalid", K(ret));
|
||||
const DASRelatedTabletMap::Value *rv = nullptr;
|
||||
if (OB_ISNULL(rv = map.get_related_tablet_id(tablet_loc.get_tablet_id(), ref_table_id))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("related tablet info is invalid", K(ret), K(tablet_loc.get_tablet_id()), K(ref_table_id));
|
||||
} else {
|
||||
tablet_loc.set_tablet_info(rv.tablet_id_, rv.part_id_, rv.first_level_part_id_);
|
||||
tablet_loc.set_tablet_info(rv->tablet_id_, rv->part_id_, rv->first_level_part_id_);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
Reference in New Issue
Block a user