diff --git a/src/sql/engine/dml/ob_dml_service.cpp b/src/sql/engine/dml/ob_dml_service.cpp index a7df00ed34..150bda8816 100644 --- a/src/sql/engine/dml/ob_dml_service.cpp +++ b/src/sql/engine/dml/ob_dml_service.cpp @@ -241,6 +241,46 @@ int ObDMLService::create_rowkey_check_hashset(int64_t estimate_row, return ret; } +int ObDMLService::check_lob_column_changed(ObEvalCtx &eval_ctx, + const ObExpr& old_expr, ObDatum& old_datum, + const ObExpr& new_expr, ObDatum& new_datum, + int64_t& result) { + INIT_SUCC(ret); + ObLobManager *lob_mngr = MTL(ObLobManager*); + int64_t timeout = 0; + int64_t query_st = eval_ctx.exec_ctx_.get_my_session()->get_query_start_time(); + if (OB_ISNULL(lob_mngr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get lob manager handle null.", K(ret)); + } else if (OB_FAIL(eval_ctx.exec_ctx_.get_my_session()->get_query_timeout(timeout))) { + LOG_WARN("failed to get session query timeout", K(ret)); + } else { + timeout += query_st; + ObString old_str = old_datum.get_string(); + ObString new_str = new_datum.get_string(); + bool old_set_has_lob_header = old_expr.obj_meta_.has_lob_header() && old_str.length() > 0; + bool new_set_has_lob_header = new_expr.obj_meta_.has_lob_header() && new_str.length() > 0; + ObLobLocatorV2 old_lob(old_str, old_set_has_lob_header); + ObLobLocatorV2 new_lob(new_str, new_set_has_lob_header); + ObLobCompareParams cmp_params; + // binary compare ignore charset + cmp_params.collation_left_ = CS_TYPE_BINARY; + cmp_params.collation_right_ = CS_TYPE_BINARY; + cmp_params.offset_left_ = 0; + cmp_params.offset_right_ = 0; + cmp_params.compare_len_ = UINT64_MAX; + cmp_params.timeout_ = timeout; + if(old_set_has_lob_header && new_set_has_lob_header) { + if(OB_FAIL(lob_mngr->compare(old_lob, new_lob, cmp_params, result))) { + LOG_WARN("fail to compare lob", K(ret), K(old_lob), K(new_lob)); + } + } else { + result = ObDatum::binary_equal(old_datum, new_datum) ? 0 : 1; + } + } + return ret; +} + int ObDMLService::check_row_whether_changed(const ObUpdCtDef &upd_ctdef, ObUpdRtDef &upd_rtdef, ObEvalCtx &eval_ctx) @@ -276,7 +316,18 @@ int ObDMLService::check_row_whether_changed(const ObUpdCtDef &upd_ctdef, || OB_FAIL(new_row.at(idx)->eval(eval_ctx, new_datum))) { LOG_WARN("evaluate value failed", K(ret)); } else { - upd_rtdef.is_row_changed_ = !ObDatum::binary_equal(*old_datum, *new_datum); + if(is_lob_storage(old_row.at(idx)->datum_meta_.type_) + && is_lob_storage(new_row.at(idx)->datum_meta_.type_)) + { + int64_t cmp_res = 0; + if(OB_FAIL(check_lob_column_changed(eval_ctx, *old_row.at(idx), *old_datum, *new_row.at(idx), *new_datum, cmp_res))) { + LOG_WARN("compare lob datum failed", K(ret)); + } else { + upd_rtdef.is_row_changed_ = (cmp_res != 0); + } + } else { + upd_rtdef.is_row_changed_ = !ObDatum::binary_equal(*old_datum, *new_datum); + } } } } else { @@ -297,7 +348,18 @@ int ObDMLService::check_row_whether_changed(const ObUpdCtDef &upd_ctdef, || OB_FAIL(new_row.at(idx)->eval(eval_ctx, new_datum))) { LOG_WARN("evaluate value failed", K(ret)); } else { - upd_rtdef.is_row_changed_ = !ObDatum::binary_equal(*old_datum, *new_datum); + if(is_lob_storage(old_row.at(idx)->datum_meta_.type_) + && is_lob_storage(new_row.at(idx)->datum_meta_.type_)) + { + int64_t cmp_res = 0; + if(OB_FAIL(check_lob_column_changed(eval_ctx, *old_row.at(idx), *old_datum, *new_row.at(idx), *new_datum, cmp_res))) { + LOG_WARN("compare lob datum failed", K(ret)); + } else { + upd_rtdef.is_row_changed_ = (cmp_res != 0); + } + } else { + upd_rtdef.is_row_changed_ = !ObDatum::binary_equal(*old_datum, *new_datum); + } } } } diff --git a/src/sql/engine/dml/ob_dml_service.h b/src/sql/engine/dml/ob_dml_service.h index 24c1a140e0..6c6385c50f 100644 --- a/src/sql/engine/dml/ob_dml_service.h +++ b/src/sql/engine/dml/ob_dml_service.h @@ -50,6 +50,11 @@ public: static int create_rowkey_check_hashset(int64_t estimate_row, ObExecContext *root_ctx, SeRowkeyDistCtx *&rowkey_dist_ctx); + + static int check_lob_column_changed(ObEvalCtx &eval_ctx, + const ObExpr& old_expr, ObDatum& old_datum, + const ObExpr& new_expr, ObDatum& new_datum, + int64_t& result); static int check_row_whether_changed(const ObUpdCtDef &upd_ctdef, ObUpdRtDef &upd_rtdef, ObEvalCtx &eval_ctx); static int filter_row_for_check_cst(const ExprFixedArray &cst_exprs, ObEvalCtx &eval_ctx, diff --git a/src/storage/lob/ob_lob_manager.cpp b/src/storage/lob/ob_lob_manager.cpp index 596be527b2..d9648c604e 100644 --- a/src/storage/lob/ob_lob_manager.cpp +++ b/src/storage/lob/ob_lob_manager.cpp @@ -18,6 +18,7 @@ #include "lib/objectpool/ob_server_object_pool.h" #include "observer/ob_server.h" #include "storage/tx_storage/ob_ls_service.h" +#include "sql/engine/expr/ob_expr_util.h" namespace oceanbase { @@ -689,6 +690,166 @@ int ObLobManager::query( return ret; } +int ObLobManager::compare(ObLobLocatorV2& lob_left, + ObLobLocatorV2& lob_right, + ObLobCompareParams& cmp_params, + int64_t& result) { + INIT_SUCC(ret); + ObArenaAllocator tmp_allocator(ObModIds::OB_LOB_READER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); + ObLobManager *lob_mngr = MTL(ObLobManager*); + if (OB_ISNULL(lob_mngr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get lob manager handle null.", K(ret)); + } else if(!lob_left.has_lob_header() || !lob_right.has_lob_header()) { + ret = OB_ERR_ARG_INVALID; + LOG_WARN("invalid lob. should have lob locator", K(ret)); + } else { + // get lob access param + ObLobAccessParam param_left; + ObLobAccessParam param_right; + + if (OB_FAIL(build_lob_param(param_left, tmp_allocator, cmp_params.collation_left_, + cmp_params.offset_left_, cmp_params.compare_len_, cmp_params.timeout_, lob_left))) { + LOG_WARN("fail to build read param left", K(ret), K(lob_left), K(cmp_params)); + } else if(OB_FAIL(build_lob_param(param_right, tmp_allocator, cmp_params.collation_right_, + cmp_params.offset_right_, cmp_params.compare_len_, cmp_params.timeout_, lob_right))) { + LOG_WARN("fail to build read param new", K(ret), K(lob_right)); + } else if(OB_FAIL(compare(param_left, param_right, result))) { + LOG_WARN("fail to compare lob", K(ret), K(lob_left), K(lob_right), K(cmp_params)); + } + } + return ret; +} + +int ObLobManager::compare(ObLobAccessParam& param_left, + ObLobAccessParam& param_right, + int64_t& result) { + INIT_SUCC(ret); + common::ObCollationType collation_left = param_left.coll_type_; + common::ObCollationType collation_right = param_right.coll_type_; + common::ObCollationType cmp_collation = collation_left; + ObIAllocator* tmp_allocator = param_left.allocator_; + ObLobQueryIter *iter_left = nullptr; + ObLobQueryIter *iter_right = nullptr; + if(OB_ISNULL(tmp_allocator)) { + ret = OB_ERR_ARG_INVALID; + LOG_WARN("invalid alloctor param", K(ret), K(param_left)); + } else if((collation_left == CS_TYPE_BINARY && collation_right != CS_TYPE_BINARY) + || (collation_left != CS_TYPE_BINARY && collation_right == CS_TYPE_BINARY)) { + ret = OB_ERR_ARG_INVALID; + LOG_WARN("invalid collation param", K(ret), K(param_left), K(param_right)); + } else if (OB_FAIL(query(param_left, iter_left))) { + LOG_WARN("query param left by iter failed.", K(ret), K(param_left)); + } else if (OB_FAIL(query(param_right, iter_right))) { + LOG_WARN("query param right by iter failed.", K(ret), K(param_right)); + } else { + uint64_t read_buff_size = ObLobManager::LOB_READ_BUFFER_LEN; + char *read_buff = nullptr; + char *charset_convert_buff_ptr = nullptr; + uint64_t charset_convert_buff_size = read_buff_size * ObCharset::CharConvertFactorNum; + + if (OB_ISNULL((read_buff = static_cast(tmp_allocator->alloc(read_buff_size * 2))))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("alloc read buffer failed.", K(ret), K(read_buff_size)); + } else if (OB_ISNULL((charset_convert_buff_ptr = static_cast(tmp_allocator->alloc(charset_convert_buff_size))))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("alloc charset convert buffer failed.", K(ret), K(charset_convert_buff_size)); + } else { + ObDataBuffer charset_convert_buff(charset_convert_buff_ptr, charset_convert_buff_size); + ObString read_buffer_left; + ObString read_buffer_right; + read_buffer_left.assign_buffer(read_buff, read_buff_size); + read_buffer_right.assign_buffer(read_buff + read_buff_size, read_buff_size); + + // compare right after charset convert + ObString convert_buffer_right; + convert_buffer_right.assign_ptr(nullptr, 0); + + while (OB_SUCC(ret) && result == 0) { + if (read_buffer_left.length() == 0) { + // reset buffer and read next block + read_buffer_left.assign_buffer(read_buff, read_buff_size); + if (OB_FAIL(iter_left->get_next_row(read_buffer_left))) { + if (ret != OB_ITER_END) { + LOG_WARN("failed to get next buffer for left lob.", K(ret)); + } else { + ret = OB_SUCCESS; + } + } + } + + if (OB_SUCC(ret) && convert_buffer_right.length() == 0) { + read_buffer_right.assign_buffer(read_buff + read_buff_size, read_buff_size); + charset_convert_buff.set_data(charset_convert_buff_ptr, charset_convert_buff_size); + convert_buffer_right.assign_ptr(nullptr, 0); + + if (OB_FAIL(iter_right->get_next_row(read_buffer_right))) { + if (ret != OB_ITER_END) { + LOG_WARN("failed to get next buffer for right lob", K(ret)); + } else { + ret = OB_SUCCESS; + } + } else { + // convert right lob to left charset if necessary + if(OB_FAIL(ObExprUtil::convert_string_collation( + read_buffer_right, collation_right, + convert_buffer_right, cmp_collation, + charset_convert_buff))) { + LOG_WARN("fail to convert string collation", K(ret), + K(read_buffer_right), K(collation_right), + K(convert_buffer_right), K(cmp_collation)); + } + } + } + if (OB_SUCC(ret)) { + if (read_buffer_left.length() == 0 && convert_buffer_right.length() == 0) { + result = 0; + ret = OB_ITER_END; + } else if (read_buffer_left.length() == 0 && convert_buffer_right.length() > 0) { + result = -1; + } else if (read_buffer_left.length() > 0 && convert_buffer_right.length() == 0) { + result = 1; + } else { + uint64_t cmp_len = read_buffer_left.length() > convert_buffer_right.length() ? + convert_buffer_right.length() : read_buffer_left.length(); + ObString substr_lob_left; + ObString substr_lob_right; + substr_lob_left.assign_ptr(read_buffer_left.ptr(), cmp_len); + substr_lob_right.assign_ptr(convert_buffer_right.ptr(), cmp_len); + result = common::ObCharset::strcmp(cmp_collation, substr_lob_left, substr_lob_right); + if (result > 0) { + result = 1; + } else if (result < 0) { + result = -1; + } + + read_buffer_left.assign_ptr(read_buffer_left.ptr() + cmp_len, read_buffer_left.length() - cmp_len); + convert_buffer_right.assign_ptr(convert_buffer_right.ptr() + cmp_len, convert_buffer_right.length() - cmp_len); + } + } + } + if (ret == OB_ITER_END) { + ret = OB_SUCCESS; + } + } + if (OB_NOT_NULL(read_buff)) { + tmp_allocator->free(read_buff); + } + if (OB_NOT_NULL(charset_convert_buff_ptr)) { + tmp_allocator->free(charset_convert_buff_ptr); + } + } + if (OB_NOT_NULL(iter_left)) { + iter_left->reset(); + common::sop_return(ObLobQueryIter, iter_left); + } + if (OB_NOT_NULL(iter_right)) { + iter_right->reset(); + common::sop_return(ObLobQueryIter, iter_right); + } + return ret; +} + int ObLobManager::write_one_piece(ObLobAccessParam& param, common::ObTabletID& piece_tablet_id, ObLobCtx& lob_ctx, diff --git a/src/storage/lob/ob_lob_manager.h b/src/storage/lob/ob_lob_manager.h index e047746fbc..2e6fbd1b7a 100644 --- a/src/storage/lob/ob_lob_manager.h +++ b/src/storage/lob/ob_lob_manager.h @@ -38,6 +38,35 @@ struct ObLobQueryResult { TO_STRING_KV(K_(meta_result), K_(piece_info)); }; +struct ObLobCompareParams { + + ObLobCompareParams() + : collation_left_(CS_TYPE_INVALID), + collation_right_(CS_TYPE_INVALID), + offset_left_(0), + offset_right_(0), + compare_len_(0), + timeout_(0) + { + } + + TO_STRING_KV(K(collation_left_), + KP(collation_right_), + K(offset_left_), + K(offset_right_), + K(compare_len_), + K(timeout_)); + + ObCollationType collation_left_; + ObCollationType collation_right_; + uint64_t offset_left_; + uint64_t offset_right_; + + // compare length + uint64_t compare_len_; + int64_t timeout_; +}; + class ObLobQueryRemoteReader { public: @@ -166,6 +195,23 @@ public: int write(ObLobAccessParam& param, ObLobLocatorV2& lob, uint64_t offset); + + // compare lob byte wise, collation type is binary + // @param [in] lob_left lob param of left operand for comparison + // @param [in] collation_left collation type of left operand for comparison + // @param [in] offset_left start position of left lob for comparison + // @param [in] lob_right lob param of right operand for comparison + // @param [in] collation_right collation type of right operand for comparison + // @param [in] offset_right start position of right lob for comparison + // @param [in] amount_len comparison length + // @param [in] timeout lob read timeout + // @param [out] result: 0 if the data exactly matches over the range specified by the offset and amount parameters. + // -1 if the first is less than the second, and 1 if it is greater. + int compare(ObLobLocatorV2& lob_left, + ObLobLocatorV2& lob_right, + ObLobCompareParams& cmp_params, + int64_t& result); + // int insert(const common::ObTabletID &tablet_id, ObObj *obj, uint64_t offset, char *data, uint64_t len); // int erase(const common::ObTabletID &tablet_id, ObObj *obj, uint64_t offset, uint64_t len); int get_real_data(ObLobAccessParam& param, @@ -248,6 +294,11 @@ private: bool lob_handle_has_char_len(ObLobAccessParam& param); int64_t* get_char_len_ptr(ObLobAccessParam& param); int fill_lob_locator_extern(ObLobAccessParam& param); + + int compare(ObLobAccessParam& param_left, + ObLobAccessParam& param_right, + int64_t& result); + private: static const int64_t DEFAULT_LOB_META_BUCKET_CNT = 1543; static const int64_t LOB_IN_ROW_MAX_LENGTH = 4096; // 4K