[OBKV] fix core and memleak in group commit
This commit is contained in:
@ -67,6 +67,7 @@ public:
|
||||
timeout_ts_ = 0;
|
||||
timeout_ = 0;
|
||||
}
|
||||
virtual void reset_result() = 0;
|
||||
public:
|
||||
ObTableGroupType type_;
|
||||
rpc::ObRequest *req_; // rpc request
|
||||
|
||||
@ -160,6 +160,16 @@ int ObTableGroup::add_op(ObITableOp *op)
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTableGroup::reset_result()
|
||||
{
|
||||
for (int64_t i = 0; i < ops_.count(); i++) {
|
||||
ObITableOp *op = ops_.at(i);
|
||||
if (OB_NOT_NULL(op)) {
|
||||
op->reset_result();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int ObTableFailedGroups::init() {
|
||||
int ret = OB_SUCCESS;
|
||||
if (is_inited_) {
|
||||
@ -185,11 +195,17 @@ int ObTableFailedGroups::init() {
|
||||
int ObTableFailedGroups::add(ObTableGroup *group)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLockGuard<ObSpinLock> guard(lock_);
|
||||
if (OB_FAIL(failed_ops_.push_back(group))) {
|
||||
LOG_WARN("fail to push back group", K(ret));
|
||||
if (OB_ISNULL(group)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail group is NULL", K(ret));
|
||||
} else {
|
||||
group_info_.gmt_modified_ = ObClockGenerator::getClock();
|
||||
group->reset_result();
|
||||
ObLockGuard<ObSpinLock> guard(lock_);
|
||||
if (OB_FAIL(failed_ops_.push_back(group))) {
|
||||
LOG_WARN("fail to push back group", K(ret));
|
||||
} else {
|
||||
group_info_.gmt_modified_ = ObClockGenerator::getClock();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -92,6 +92,7 @@ public:
|
||||
int init(const ObTableGroupCtx &ctx, ObIArray<ObITableOp *> &ops);
|
||||
int add_op(ObITableOp *op);
|
||||
OB_INLINE ObIArray<ObITableOp *>& get_ops() { return ops_; }
|
||||
void reset_result();
|
||||
public:
|
||||
bool is_inited_;
|
||||
ObTableGroupMeta group_meta_;
|
||||
|
||||
@ -220,6 +220,11 @@ public:
|
||||
{
|
||||
reset();
|
||||
}
|
||||
virtual void reset_result()
|
||||
{
|
||||
result_.reset();
|
||||
result_entity_.reset();
|
||||
}
|
||||
public:
|
||||
ObTableOperationResult result_;
|
||||
ObTableOperation op_; // single operation
|
||||
|
||||
@ -161,6 +161,7 @@ int ObHbaseOpProcessor::init_result(const int64_t total_tablet_count, ObTableMul
|
||||
for (int64_t j = 0; OB_SUCC(ret) && j < tablet_count; j++, batch_result_idx++) { // loop tablet
|
||||
ObTableTabletOpResult &tablet_result = op->result_.at(j);
|
||||
ObTableBatchOperationResult &batch_result = result.get_results().at(batch_result_idx);
|
||||
batch_result.set_attr(ObMemAttr(MTL_ID(), "ObHbaseGrpRes"));
|
||||
int64_t single_op_count = tablet_result.count();
|
||||
for (int64_t k = 0; OB_SUCC(ret) && k < single_op_count; k++) { // loop single op
|
||||
if (OB_FAIL(batch_result.push_back(tablet_result.at(k)))) {
|
||||
@ -177,53 +178,48 @@ int ObHbaseOpProcessor::init_result(const int64_t total_tablet_count, ObTableMul
|
||||
int ObHbaseOpProcessor::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableMultiBatchRequest *req = nullptr;
|
||||
ObTableMultiBatchResult *result = nullptr;
|
||||
const int64_t op_count = ops_->count();
|
||||
const int64_t total_tablet_count = get_tablet_count();
|
||||
|
||||
if (!IS_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("processor not init", K(ret));
|
||||
} else if (OB_ISNULL(req = OB_NEWx(ObTableMultiBatchRequest, (&allocator_)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc multi batch request", K(ret));
|
||||
} else if (OB_ISNULL(result = OB_NEWx(ObTableMultiBatchResult, (&allocator_), allocator_))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc multi batch result", K(ret));
|
||||
} else if (OB_FAIL(req->get_ops().prepare_allocate(total_tablet_count))) {
|
||||
LOG_WARN("fail to reserve request ops", K(ret), K(total_tablet_count));
|
||||
} else {
|
||||
int64_t batch_req_idx = 0;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < op_count; i++) { // loop op
|
||||
ObHbaseOp *op = static_cast<ObHbaseOp*>(ops_->at(i));
|
||||
int64_t tablet_count = op->ls_req_.ls_op_.count();
|
||||
for (int64_t j = 0; OB_SUCC(ret) && j < tablet_count; j++, batch_req_idx++) { // loop tablet
|
||||
const ObTableTabletOp &tablet_op = op->ls_req_.ls_op_.at(j);
|
||||
ObTableBatchOperation &batch_op = req->get_ops().at(batch_req_idx);
|
||||
batch_op.set_entity_factory(&default_entity_factory_);
|
||||
if (OB_FAIL(fill_batch_op(tablet_op, batch_op))) {
|
||||
LOG_WARN("fail to fill batch op", K(ret), KPC(op));
|
||||
} else if (OB_FAIL(req->get_tablet_ids().push_back(tablet_op.get_tablet_id()))) {
|
||||
LOG_WARN("fail to add tablet id", K(ret));
|
||||
const int64_t op_count = ops_->count();
|
||||
const int64_t total_tablet_count = get_tablet_count();
|
||||
SMART_VARS_2((ObTableMultiBatchRequest, req), (ObTableMultiBatchResult, result, allocator_)) {
|
||||
if (OB_FAIL(req.get_ops().prepare_allocate(total_tablet_count))) {
|
||||
LOG_WARN("fail to reserve request ops", K(ret), K(total_tablet_count));
|
||||
} else {
|
||||
int64_t batch_req_idx = 0;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < op_count; i++) { // loop op
|
||||
ObHbaseOp *op = static_cast<ObHbaseOp*>(ops_->at(i));
|
||||
int64_t tablet_count = op->ls_req_.ls_op_.count();
|
||||
for (int64_t j = 0; OB_SUCC(ret) && j < tablet_count; j++, batch_req_idx++) { // loop tablet
|
||||
const ObTableTabletOp &tablet_op = op->ls_req_.ls_op_.at(j);
|
||||
ObTableBatchOperation &batch_op = req.get_ops().at(batch_req_idx);
|
||||
batch_op.set_entity_factory(&default_entity_factory_);
|
||||
if (OB_FAIL(fill_batch_op(tablet_op, batch_op))) {
|
||||
LOG_WARN("fail to fill batch op", K(ret), KPC(op));
|
||||
} else if (OB_FAIL(req.get_tablet_ids().push_back(tablet_op.get_tablet_id()))) {
|
||||
LOG_WARN("fail to add tablet id", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(init_result(total_tablet_count, result))) {
|
||||
LOG_WARN("fail to init result", K(ret));
|
||||
} else if (OB_FAIL(ObTableGroupExecuteService::start_trans(*multi_batch_ctx_))) {
|
||||
LOG_WARN("fail to start trans", K(ret));
|
||||
} else if (OB_FAIL(ObTableMultiBatchService::execute(*multi_batch_ctx_, req, result))) {
|
||||
LOG_WARN("fail to execute multi batch operation", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(init_result(total_tablet_count, *result))) {
|
||||
LOG_WARN("fail to init result", K(ret));
|
||||
} else if (OB_FAIL(ObTableGroupExecuteService::start_trans(*multi_batch_ctx_))) {
|
||||
LOG_WARN("fail to start trans", K(ret));
|
||||
} else if (OB_FAIL(ObTableMultiBatchService::execute(*multi_batch_ctx_, *req, *result))) {
|
||||
LOG_WARN("fail to execute multi batch operation", K(ret));
|
||||
}
|
||||
|
||||
// end trans
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
bool is_rollback = ret != OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(ObTableGroupExecuteService::end_trans(*multi_batch_ctx_, group_ctx_->create_cb_functor_, is_rollback))) {
|
||||
LOG_WARN("fail to end trans", K(ret), K(tmp_ret));
|
||||
// end trans
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
bool is_rollback = ret != OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(ObTableGroupExecuteService::end_trans(*multi_batch_ctx_, group_ctx_->create_cb_functor_, is_rollback))) {
|
||||
LOG_WARN("fail to end trans", K(ret), K(tmp_ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -88,6 +88,7 @@ public:
|
||||
reset();
|
||||
}
|
||||
int init();
|
||||
virtual void reset_result() { /* do nothing, result_enity of hbase put is empty */ }
|
||||
public:
|
||||
bool is_inited_;
|
||||
ObTableLSOpRequest ls_req_;
|
||||
|
||||
@ -682,6 +682,11 @@ int ObTableCtx::adjust_properties()
|
||||
ObTableEntity *entity = static_cast<ObTableEntity*>(const_cast<ObITableEntity *>(entity_));
|
||||
const ObIArray<ObString> &prop_names = entity->get_properties_names();
|
||||
const ObIArray<ObObj> &prop_objs = entity->get_properties_values();
|
||||
if (prop_names.count() != prop_objs.count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("property name count is not equal to property obj count", K(ret),
|
||||
K(prop_names.count()), K(prop_objs.count()));
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < prop_names.count(); i++) {
|
||||
const ObString &col_name = prop_names.at(i);
|
||||
ObObj &prop_obj = const_cast<ObObj &>(prop_objs.at(i));
|
||||
|
||||
@ -55,7 +55,7 @@ public:
|
||||
public:
|
||||
DynamicSysVars()
|
||||
: binlog_row_image_(-1),
|
||||
kv_group_commit_batch_size_(1),
|
||||
kv_group_commit_batch_size_(10),
|
||||
group_rw_mode_(0),
|
||||
query_record_size_limit_(-1),
|
||||
enable_query_response_time_stats_(true)
|
||||
|
||||
@ -183,7 +183,10 @@ public:
|
||||
OB_INLINE RedisCommand* cmd() { return redis_cmd_; }
|
||||
OB_INLINE const RedisCommand* cmd() const { return redis_cmd_; }
|
||||
int get_key(ObString &key) const;
|
||||
|
||||
virtual void reset_result()
|
||||
{
|
||||
result_.reset();
|
||||
}
|
||||
VIRTUAL_TO_STRING_KV(K_(result),
|
||||
K_(db),
|
||||
K_(response),
|
||||
|
||||
@ -278,7 +278,11 @@ int ObITableEntity::add_retrieve_property(const ObString &prop_name)
|
||||
|
||||
////////////////////////////////////////////////////////////////
|
||||
ObTableEntity::ObTableEntity()
|
||||
{}
|
||||
{
|
||||
rowkey_.set_attr(ObMemAttr(MTL_ID(), "TblEntRk"));
|
||||
properties_names_.set_attr(ObMemAttr(MTL_ID(), "TblEntPropN"));
|
||||
properties_values_.set_attr(ObMemAttr(MTL_ID(), "TblEntPropV"));
|
||||
}
|
||||
|
||||
ObTableEntity::~ObTableEntity()
|
||||
{}
|
||||
|
||||
Reference in New Issue
Block a user