diff --git a/src/sql/code_generator/ob_dml_cg_service.cpp b/src/sql/code_generator/ob_dml_cg_service.cpp index 8bbe865066..06e7fd4a80 100644 --- a/src/sql/code_generator/ob_dml_cg_service.cpp +++ b/src/sql/code_generator/ob_dml_cg_service.cpp @@ -853,6 +853,8 @@ int ObDmlCgService::generate_constraint_infos(ObLogInsert &op, "name", log_constraint_infos->at(i).constraint_name_); } else if (OB_FAIL(rowkey_cst_ctdef->rowkey_expr_.init(constraint_columns.count()))) { LOG_WARN("init rowkey failed", K(ret), K(constraint_columns.count())); + } else if (OB_FAIL(rowkey_cst_ctdef->rowkey_accuracys_.init(constraint_columns.count()))) { + LOG_WARN("init rowkey accuracy failed", K(ret)); } for (int64_t j = 0; OB_SUCC(ret) && j < constraint_columns.count(); ++j) { @@ -886,6 +888,8 @@ int ObDmlCgService::generate_constraint_infos(ObLogInsert &op, if (OB_SUCC(ret)) { if (OB_FAIL(rowkey_cst_ctdef->rowkey_expr_.push_back(expr))) { LOG_WARN("fail to push_back expr", K(ret)); + } else if (OB_FAIL(rowkey_cst_ctdef->rowkey_accuracys_.push_back(col_expr->get_accuracy()))) { + LOG_WARN("fail to store rowkey accuracy", K(ret)); } } } // end constraint_columns diff --git a/src/sql/das/ob_data_access_service.cpp b/src/sql/das/ob_data_access_service.cpp index 7d70cddd71..3c5a2683d5 100644 --- a/src/sql/das/ob_data_access_service.cpp +++ b/src/sql/das/ob_data_access_service.cpp @@ -608,7 +608,9 @@ int ObDataAccessService::process_task_resp(ObDASRef &das_ref, const ObDASTaskRes } // task_resp's error code indicate the last valid op result. if (OB_FAIL(task_resp.get_err_code())) { - LOG_WARN("error occurring in remote das task", K(ret), K(task_resp)); + LOG_WARN("error occurring in remote das task, " + "please use the current TRACE_ID to grep the original error message on the remote_addr.", + K(ret), "remote_addr", task_resp.get_runner_svr()); OB_ASSERT(op_results.count() <= task_ops.count()); } else { // decode last op result diff --git a/src/sql/engine/dml/ob_conflict_checker.cpp b/src/sql/engine/dml/ob_conflict_checker.cpp index e3d15202d3..a59917a1d4 100644 --- a/src/sql/engine/dml/ob_conflict_checker.cpp +++ b/src/sql/engine/dml/ob_conflict_checker.cpp @@ -17,6 +17,7 @@ #include "sql/engine/dml/ob_table_modify_op.h" #include "sql/engine/expr/ob_expr_calc_partition_id.h" #include "sql/das/ob_data_access_service.h" +#include "sql/das/ob_das_utils.h" #include "sql/engine/ob_exec_context.h" namespace oceanbase @@ -31,7 +32,8 @@ namespace sql OB_SERIALIZE_MEMBER(ObRowkeyCstCtdef, constraint_name_, rowkey_expr_, - calc_exprs_); + calc_exprs_, + rowkey_accuracys_); OB_DEF_SERIALIZE(ObConflictCheckerCtdef) { @@ -171,7 +173,8 @@ ObConflictChecker::ObConflictChecker(common::ObIAllocator &allocator, allocator_(allocator), das_ref_(eval_ctx, eval_ctx.exec_ctx_), local_tablet_loc_(nullptr), - table_loc_(nullptr) + table_loc_(nullptr), + tmp_mem_ctx_() { } @@ -213,14 +216,36 @@ int ObConflictChecker::init_conflict_checker(const ObExprFrameInfo *expr_frame_i return ret; } +int ObConflictChecker::get_tmp_string_buffer(ObIAllocator *&allocator) +{ + int ret = OB_SUCCESS; + allocator = nullptr; + if (OB_ISNULL(tmp_mem_ctx_)) { + lib::ContextParam param; + param.set_mem_attr(MTL_ID(), "ConflictRowkey", ObCtxIds::DEFAULT_CTX_ID) + .set_properties(lib::USE_TL_PAGE_OPTIONAL); + if (OB_FAIL(CURRENT_CONTEXT->CREATE_CONTEXT(tmp_mem_ctx_, param))) { + LOG_WARN("create conflict rowkey checker context entity failed", K(ret)); + } + } else { + tmp_mem_ctx_->reset_remain_one_page(); + } + if (OB_SUCC(ret)) { + allocator = &tmp_mem_ctx_->get_allocator(); + } + return ret; +} + int ObConflictChecker::build_rowkey(ObRowkey *&rowkey, ObRowkeyCstCtdef *rowkey_info) { int ret = OB_SUCCESS; ObObj *objs = NULL; int64_t rowkey_cnt = rowkey_info->rowkey_expr_.count(); + ObIAllocator &alloc = das_ref_.get_das_alloc(); + ObIAllocator *tmp_string_buffer = nullptr; - if (NULL == (rowkey = static_cast(das_ref_.get_das_alloc().alloc(sizeof(ObRowkey))))) { + if (NULL == (rowkey = static_cast(alloc.alloc(sizeof(ObRowkey))))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc memory", K(ret)); } else if (NULL == @@ -229,6 +254,8 @@ int ObConflictChecker::build_rowkey(ObRowkey *&rowkey, LOG_WARN("fail to alloc memory", K(ret)); } else if (OB_FAIL(ObSQLUtils::clear_evaluated_flag(rowkey_info->calc_exprs_, eval_ctx_))) { LOG_WARN("fail to clear rowkey flag", K(ret), K(rowkey_info->calc_exprs_)); + } else if (OB_FAIL(get_tmp_string_buffer(tmp_string_buffer))) { + LOG_WARN("get tmp string buffer failed", K(ret)); } for (int i = 0; OB_SUCC(ret) && i < rowkey_cnt; ++i) { @@ -236,6 +263,14 @@ int ObConflictChecker::build_rowkey(ObRowkey *&rowkey, ObObj tmp_obj; const ObObjMeta &col_obj_meta = rowkey_info->rowkey_expr_.at(i)->obj_meta_; ObExpr *expr = rowkey_info->rowkey_expr_.at(i); + const ObAccuracy *col_accuracy = nullptr; + + if (rowkey_info->rowkey_accuracys_.count() == rowkey_cnt) { + //To maintain compatibility with older versions, + //reshape_storage_value is only performed when rowkey_accuracys is not empty. + col_accuracy = &rowkey_info->rowkey_accuracys_.at(i); + } + if (OB_ISNULL(expr)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("expr is null", K(ret), K(i)); @@ -246,7 +281,13 @@ int ObConflictChecker::build_rowkey(ObRowkey *&rowkey, // 读后续行的时候会覆盖前边的数据,所以这里一定要做深拷贝 else if (OB_FAIL(datum->to_obj(tmp_obj, col_obj_meta))) { LOG_WARN("datum to obj fail", K(ret), K(i), KPC(expr), KPC(datum)); - } else if (OB_FAIL(ob_write_obj(das_ref_.get_das_alloc(), tmp_obj, objs[i]))) { + } else if (col_accuracy != nullptr && + OB_FAIL(ObDASUtils::reshape_storage_value(col_obj_meta, + *col_accuracy, + *tmp_string_buffer, + tmp_obj))) { + LOG_WARN("reshape storage value failed", K(ret)); + } else if (OB_FAIL(ob_write_obj(alloc, tmp_obj, objs[i]))) { LOG_WARN("deep copy rowkey value failed", K(ret), K(tmp_obj)); } } @@ -272,16 +313,27 @@ int ObConflictChecker::build_tmp_rowkey(ObRowkey *rowkey, ObRowkeyCstCtdef *rowk if (OB_SUCC(ret)) { ObObj *obj_ptr = rowkey->get_obj_ptr(); + ObIAllocator *tmp_string_buffer = nullptr; + if (OB_ISNULL(obj_ptr)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("obj_ptr is null", K(ret)); } else if (OB_FAIL(ObSQLUtils::clear_evaluated_flag(rowkey_info->calc_exprs_, eval_ctx_))) { LOG_WARN("clear eval flag failed", K(ret), K(rowkey_info->calc_exprs_)); + } else if (OB_FAIL(get_tmp_string_buffer(tmp_string_buffer))) { + LOG_WARN("get tmp string buffer failed", K(ret)); } for (int i = 0; OB_SUCC(ret) && i < rowkey_info->rowkey_expr_.count(); ++i) { ObDatum *datum = NULL; const ObObjMeta &col_obj_meta = rowkey_info->rowkey_expr_.at(i)->obj_meta_; ObExpr *expr = rowkey_info->rowkey_expr_.at(i); + const ObAccuracy *col_accuracy = nullptr; + + if (rowkey_info->rowkey_accuracys_.count() == rowkey_info->rowkey_expr_.count()) { + //To maintain compatibility with older versions, + //reshape_storage_value is only performed when rowkey_accuracys is not empty. + col_accuracy = &rowkey_info->rowkey_accuracys_.at(i); + } if (OB_ISNULL(expr)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("expr is null", K(ret), K(i)); @@ -289,6 +341,12 @@ int ObConflictChecker::build_tmp_rowkey(ObRowkey *rowkey, ObRowkeyCstCtdef *rowk LOG_WARN("expr eval fail", K(ret), K(i), KPC(expr)); } else if (OB_FAIL(datum->to_obj(obj_ptr[i], col_obj_meta))) { LOG_WARN("datum to obj fail", K(ret), K(i), KPC(expr), KPC(datum)); + } else if (col_accuracy != nullptr && + OB_FAIL(ObDASUtils::reshape_storage_value(col_obj_meta, + *col_accuracy, + *tmp_string_buffer, + obj_ptr[i]))) { + LOG_WARN("reshape storage value failed", K(ret)); } else { LOG_DEBUG("succ to build tmp rowkey obj", K(i), K(obj_ptr[i])); } @@ -599,6 +657,9 @@ int ObConflictChecker::reuse() LOG_WARN("fail to reuse conflict_map", K(ret), K(i)); } } + if (tmp_mem_ctx_ != nullptr) { + tmp_mem_ctx_->reset_remain_one_page(); + } return ret; } @@ -613,6 +674,10 @@ int ObConflictChecker::destroy() } das_ref_.reset(); das_scan_rtdef_.~ObDASScanRtDef(); + if (tmp_mem_ctx_ != nullptr) { + DESTROY_CONTEXT(tmp_mem_ctx_); + tmp_mem_ctx_ = nullptr; + } return ret; } diff --git a/src/sql/engine/dml/ob_conflict_checker.h b/src/sql/engine/dml/ob_conflict_checker.h index 4e99afbc6f..8ad854954c 100644 --- a/src/sql/engine/dml/ob_conflict_checker.h +++ b/src/sql/engine/dml/ob_conflict_checker.h @@ -31,14 +31,19 @@ public: ObRowkeyCstCtdef(common::ObIAllocator &alloc) : constraint_name_(), rowkey_expr_(alloc), - calc_exprs_(alloc) + calc_exprs_(alloc), + rowkey_accuracys_(alloc) { } virtual ~ObRowkeyCstCtdef() = default; - TO_STRING_KV(K_(constraint_name), K_(rowkey_expr), K_(calc_exprs)); + TO_STRING_KV(K_(constraint_name), + K_(rowkey_expr), + K_(rowkey_accuracys), + K_(calc_exprs)); ObString constraint_name_; // 冲突时打印表名用 ExprFixedArray rowkey_expr_; // 索引表的主键 ExprFixedArray calc_exprs_; // 计算逐渐信息依赖的表达式 + AccuracyFixedArray rowkey_accuracys_; }; enum ObNewRowSource @@ -214,6 +219,7 @@ private: int init_das_scan_rtdef(); + int get_tmp_string_buffer(common::ObIAllocator *&allocator); public: common::ObArrayWrap conflict_map_array_; ObEvalCtx &eval_ctx_; // 用于表达式的计算 @@ -227,6 +233,7 @@ public: ObDASRef das_ref_; ObDASTabletLoc *local_tablet_loc_; ObDASTableLoc *table_loc_; + lib::MemoryContext tmp_mem_ctx_; }; } // namespace sql } // namespace oceanbase diff --git a/src/sql/executor/ob_direct_receive_op.cpp b/src/sql/executor/ob_direct_receive_op.cpp index a6d349bcf9..09b4a11b31 100644 --- a/src/sql/executor/ob_direct_receive_op.cpp +++ b/src/sql/executor/ob_direct_receive_op.cpp @@ -200,9 +200,9 @@ int ObDirectReceiveOp::setup_next_scanner() // 这里要做到err_msg不为空的时候才给用户返回err_msg,否则就给用户返回ret默认的错误信息, // 因此直接写FORWARD_USER_ERROR(ret, err_msg)就可以了。 FORWARD_USER_ERROR(ret, err_msg); - LOG_WARN("while fetching first scanner, the remote rcode is not OB_SUCCESS", - K(ret), K(err_msg), - "dst_addr", to_cstring(resp_handler->get_dst_addr())); + LOG_WARN("error occurring in the remote sql execution, " + "please use the current TRACE_ID to grep the original error message on the remote_addr.", + K(ret), "remote_addr", resp_handler->get_dst_addr()); } else { scanner_ = scanner; first_request_received_ = true;