From a72fbb47cbf98df2fb3a718a8d19e717eea1d665 Mon Sep 17 00:00:00 2001 From: GroundWu <1175416256@qq.com> Date: Wed, 12 Feb 2025 08:47:13 +0000 Subject: [PATCH] [OBKV] fix core and memleak in group commit --- src/observer/table/group/ob_i_table_struct.h | 1 + .../table/group/ob_table_group_common.cpp | 24 +++++- .../table/group/ob_table_group_common.h | 1 + .../table/group/ob_table_group_execute.h | 5 ++ .../table/hbase/ob_hbase_group_processor.cpp | 76 +++++++++---------- .../table/hbase/ob_hbase_group_struct.h | 1 + src/observer/table/ob_table_context.cpp | 5 ++ src/observer/table/ob_table_session_pool.h | 2 +- src/observer/table/redis/cmd/ob_redis_cmd.h | 5 +- src/share/table/ob_table.cpp | 6 +- 10 files changed, 79 insertions(+), 47 deletions(-) diff --git a/src/observer/table/group/ob_i_table_struct.h b/src/observer/table/group/ob_i_table_struct.h index 3b4b647aa9..736fa68f5a 100644 --- a/src/observer/table/group/ob_i_table_struct.h +++ b/src/observer/table/group/ob_i_table_struct.h @@ -67,6 +67,7 @@ public: timeout_ts_ = 0; timeout_ = 0; } + virtual void reset_result() = 0; public: ObTableGroupType type_; rpc::ObRequest *req_; // rpc request diff --git a/src/observer/table/group/ob_table_group_common.cpp b/src/observer/table/group/ob_table_group_common.cpp index 75b69cbc80..4b937eb2be 100644 --- a/src/observer/table/group/ob_table_group_common.cpp +++ b/src/observer/table/group/ob_table_group_common.cpp @@ -160,6 +160,16 @@ int ObTableGroup::add_op(ObITableOp *op) return ret; } +void ObTableGroup::reset_result() +{ + for (int64_t i = 0; i < ops_.count(); i++) { + ObITableOp *op = ops_.at(i); + if (OB_NOT_NULL(op)) { + op->reset_result(); + } + } +} + int ObTableFailedGroups::init() { int ret = OB_SUCCESS; if (is_inited_) { @@ -185,11 +195,17 @@ int ObTableFailedGroups::init() { int ObTableFailedGroups::add(ObTableGroup *group) { int ret = OB_SUCCESS; - ObLockGuard guard(lock_); - if (OB_FAIL(failed_ops_.push_back(group))) { - LOG_WARN("fail to push back group", K(ret)); + if (OB_ISNULL(group)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail group is NULL", K(ret)); } else { - group_info_.gmt_modified_ = ObClockGenerator::getClock(); + group->reset_result(); + ObLockGuard guard(lock_); + if (OB_FAIL(failed_ops_.push_back(group))) { + LOG_WARN("fail to push back group", K(ret)); + } else { + group_info_.gmt_modified_ = ObClockGenerator::getClock(); + } } return ret; } diff --git a/src/observer/table/group/ob_table_group_common.h b/src/observer/table/group/ob_table_group_common.h index 13b35ef619..d93a35f326 100644 --- a/src/observer/table/group/ob_table_group_common.h +++ b/src/observer/table/group/ob_table_group_common.h @@ -92,6 +92,7 @@ public: int init(const ObTableGroupCtx &ctx, ObIArray &ops); int add_op(ObITableOp *op); OB_INLINE ObIArray& get_ops() { return ops_; } + void reset_result(); public: bool is_inited_; ObTableGroupMeta group_meta_; diff --git a/src/observer/table/group/ob_table_group_execute.h b/src/observer/table/group/ob_table_group_execute.h index b54ba2a7ee..2bce303f35 100644 --- a/src/observer/table/group/ob_table_group_execute.h +++ b/src/observer/table/group/ob_table_group_execute.h @@ -220,6 +220,11 @@ public: { reset(); } + virtual void reset_result() + { + result_.reset(); + result_entity_.reset(); + } public: ObTableOperationResult result_; ObTableOperation op_; // single operation diff --git a/src/observer/table/hbase/ob_hbase_group_processor.cpp b/src/observer/table/hbase/ob_hbase_group_processor.cpp index b0d73135b1..df2031e0e0 100644 --- a/src/observer/table/hbase/ob_hbase_group_processor.cpp +++ b/src/observer/table/hbase/ob_hbase_group_processor.cpp @@ -161,6 +161,7 @@ int ObHbaseOpProcessor::init_result(const int64_t total_tablet_count, ObTableMul for (int64_t j = 0; OB_SUCC(ret) && j < tablet_count; j++, batch_result_idx++) { // loop tablet ObTableTabletOpResult &tablet_result = op->result_.at(j); ObTableBatchOperationResult &batch_result = result.get_results().at(batch_result_idx); + batch_result.set_attr(ObMemAttr(MTL_ID(), "ObHbaseGrpRes")); int64_t single_op_count = tablet_result.count(); for (int64_t k = 0; OB_SUCC(ret) && k < single_op_count; k++) { // loop single op if (OB_FAIL(batch_result.push_back(tablet_result.at(k)))) { @@ -177,53 +178,48 @@ int ObHbaseOpProcessor::init_result(const int64_t total_tablet_count, ObTableMul int ObHbaseOpProcessor::process() { int ret = OB_SUCCESS; - ObTableMultiBatchRequest *req = nullptr; - ObTableMultiBatchResult *result = nullptr; - const int64_t op_count = ops_->count(); - const int64_t total_tablet_count = get_tablet_count(); - if (!IS_INIT) { ret = OB_NOT_INIT; LOG_WARN("processor not init", K(ret)); - } else if (OB_ISNULL(req = OB_NEWx(ObTableMultiBatchRequest, (&allocator_)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to alloc multi batch request", K(ret)); - } else if (OB_ISNULL(result = OB_NEWx(ObTableMultiBatchResult, (&allocator_), allocator_))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to alloc multi batch result", K(ret)); - } else if (OB_FAIL(req->get_ops().prepare_allocate(total_tablet_count))) { - LOG_WARN("fail to reserve request ops", K(ret), K(total_tablet_count)); } else { - int64_t batch_req_idx = 0; - for (int64_t i = 0; OB_SUCC(ret) && i < op_count; i++) { // loop op - ObHbaseOp *op = static_cast(ops_->at(i)); - int64_t tablet_count = op->ls_req_.ls_op_.count(); - for (int64_t j = 0; OB_SUCC(ret) && j < tablet_count; j++, batch_req_idx++) { // loop tablet - const ObTableTabletOp &tablet_op = op->ls_req_.ls_op_.at(j); - ObTableBatchOperation &batch_op = req->get_ops().at(batch_req_idx); - batch_op.set_entity_factory(&default_entity_factory_); - if (OB_FAIL(fill_batch_op(tablet_op, batch_op))) { - LOG_WARN("fail to fill batch op", K(ret), KPC(op)); - } else if (OB_FAIL(req->get_tablet_ids().push_back(tablet_op.get_tablet_id()))) { - LOG_WARN("fail to add tablet id", K(ret)); + const int64_t op_count = ops_->count(); + const int64_t total_tablet_count = get_tablet_count(); + SMART_VARS_2((ObTableMultiBatchRequest, req), (ObTableMultiBatchResult, result, allocator_)) { + if (OB_FAIL(req.get_ops().prepare_allocate(total_tablet_count))) { + LOG_WARN("fail to reserve request ops", K(ret), K(total_tablet_count)); + } else { + int64_t batch_req_idx = 0; + for (int64_t i = 0; OB_SUCC(ret) && i < op_count; i++) { // loop op + ObHbaseOp *op = static_cast(ops_->at(i)); + int64_t tablet_count = op->ls_req_.ls_op_.count(); + for (int64_t j = 0; OB_SUCC(ret) && j < tablet_count; j++, batch_req_idx++) { // loop tablet + const ObTableTabletOp &tablet_op = op->ls_req_.ls_op_.at(j); + ObTableBatchOperation &batch_op = req.get_ops().at(batch_req_idx); + batch_op.set_entity_factory(&default_entity_factory_); + if (OB_FAIL(fill_batch_op(tablet_op, batch_op))) { + LOG_WARN("fail to fill batch op", K(ret), KPC(op)); + } else if (OB_FAIL(req.get_tablet_ids().push_back(tablet_op.get_tablet_id()))) { + LOG_WARN("fail to add tablet id", K(ret)); + } + } + } + + if (OB_FAIL(ret)) { + } else if (OB_FAIL(init_result(total_tablet_count, result))) { + LOG_WARN("fail to init result", K(ret)); + } else if (OB_FAIL(ObTableGroupExecuteService::start_trans(*multi_batch_ctx_))) { + LOG_WARN("fail to start trans", K(ret)); + } else if (OB_FAIL(ObTableMultiBatchService::execute(*multi_batch_ctx_, req, result))) { + LOG_WARN("fail to execute multi batch operation", K(ret)); } } - } - if (OB_FAIL(ret)) { - } else if (OB_FAIL(init_result(total_tablet_count, *result))) { - LOG_WARN("fail to init result", K(ret)); - } else if (OB_FAIL(ObTableGroupExecuteService::start_trans(*multi_batch_ctx_))) { - LOG_WARN("fail to start trans", K(ret)); - } else if (OB_FAIL(ObTableMultiBatchService::execute(*multi_batch_ctx_, *req, *result))) { - LOG_WARN("fail to execute multi batch operation", K(ret)); - } - - // end trans - int tmp_ret = OB_SUCCESS; - bool is_rollback = ret != OB_SUCCESS; - if (OB_TMP_FAIL(ObTableGroupExecuteService::end_trans(*multi_batch_ctx_, group_ctx_->create_cb_functor_, is_rollback))) { - LOG_WARN("fail to end trans", K(ret), K(tmp_ret)); + // end trans + int tmp_ret = OB_SUCCESS; + bool is_rollback = ret != OB_SUCCESS; + if (OB_TMP_FAIL(ObTableGroupExecuteService::end_trans(*multi_batch_ctx_, group_ctx_->create_cb_functor_, is_rollback))) { + LOG_WARN("fail to end trans", K(ret), K(tmp_ret)); + } } } diff --git a/src/observer/table/hbase/ob_hbase_group_struct.h b/src/observer/table/hbase/ob_hbase_group_struct.h index 088ba81948..0e9e1fbaeb 100644 --- a/src/observer/table/hbase/ob_hbase_group_struct.h +++ b/src/observer/table/hbase/ob_hbase_group_struct.h @@ -88,6 +88,7 @@ public: reset(); } int init(); + virtual void reset_result() { /* do nothing, result_enity of hbase put is empty */ } public: bool is_inited_; ObTableLSOpRequest ls_req_; diff --git a/src/observer/table/ob_table_context.cpp b/src/observer/table/ob_table_context.cpp index f6c4e8a1a5..b0e78533de 100644 --- a/src/observer/table/ob_table_context.cpp +++ b/src/observer/table/ob_table_context.cpp @@ -682,6 +682,11 @@ int ObTableCtx::adjust_properties() ObTableEntity *entity = static_cast(const_cast(entity_)); const ObIArray &prop_names = entity->get_properties_names(); const ObIArray &prop_objs = entity->get_properties_values(); + if (prop_names.count() != prop_objs.count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("property name count is not equal to property obj count", K(ret), + K(prop_names.count()), K(prop_objs.count())); + } for (int64_t i = 0; OB_SUCC(ret) && i < prop_names.count(); i++) { const ObString &col_name = prop_names.at(i); ObObj &prop_obj = const_cast(prop_objs.at(i)); diff --git a/src/observer/table/ob_table_session_pool.h b/src/observer/table/ob_table_session_pool.h index 149a07755e..9c101a04c9 100644 --- a/src/observer/table/ob_table_session_pool.h +++ b/src/observer/table/ob_table_session_pool.h @@ -55,7 +55,7 @@ public: public: DynamicSysVars() : binlog_row_image_(-1), - kv_group_commit_batch_size_(1), + kv_group_commit_batch_size_(10), group_rw_mode_(0), query_record_size_limit_(-1), enable_query_response_time_stats_(true) diff --git a/src/observer/table/redis/cmd/ob_redis_cmd.h b/src/observer/table/redis/cmd/ob_redis_cmd.h index b9e9a66a45..b6b630340a 100644 --- a/src/observer/table/redis/cmd/ob_redis_cmd.h +++ b/src/observer/table/redis/cmd/ob_redis_cmd.h @@ -183,7 +183,10 @@ public: OB_INLINE RedisCommand* cmd() { return redis_cmd_; } OB_INLINE const RedisCommand* cmd() const { return redis_cmd_; } int get_key(ObString &key) const; - + virtual void reset_result() + { + result_.reset(); + } VIRTUAL_TO_STRING_KV(K_(result), K_(db), K_(response), diff --git a/src/share/table/ob_table.cpp b/src/share/table/ob_table.cpp index cd294afc71..2b406de3e8 100644 --- a/src/share/table/ob_table.cpp +++ b/src/share/table/ob_table.cpp @@ -278,7 +278,11 @@ int ObITableEntity::add_retrieve_property(const ObString &prop_name) //////////////////////////////////////////////////////////////// ObTableEntity::ObTableEntity() -{} +{ + rowkey_.set_attr(ObMemAttr(MTL_ID(), "TblEntRk")); + properties_names_.set_attr(ObMemAttr(MTL_ID(), "TblEntPropN")); + properties_values_.set_attr(ObMemAttr(MTL_ID(), "TblEntPropV")); +} ObTableEntity::~ObTableEntity() {}