issue/47576627 add lob compare function for row update change detect
This commit is contained in:
@ -241,6 +241,46 @@ int ObDMLService::create_rowkey_check_hashset(int64_t estimate_row,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDMLService::check_lob_column_changed(ObEvalCtx &eval_ctx,
|
||||
const ObExpr& old_expr, ObDatum& old_datum,
|
||||
const ObExpr& new_expr, ObDatum& new_datum,
|
||||
int64_t& result) {
|
||||
INIT_SUCC(ret);
|
||||
ObLobManager *lob_mngr = MTL(ObLobManager*);
|
||||
int64_t timeout = 0;
|
||||
int64_t query_st = eval_ctx.exec_ctx_.get_my_session()->get_query_start_time();
|
||||
if (OB_ISNULL(lob_mngr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get lob manager handle null.", K(ret));
|
||||
} else if (OB_FAIL(eval_ctx.exec_ctx_.get_my_session()->get_query_timeout(timeout))) {
|
||||
LOG_WARN("failed to get session query timeout", K(ret));
|
||||
} else {
|
||||
timeout += query_st;
|
||||
ObString old_str = old_datum.get_string();
|
||||
ObString new_str = new_datum.get_string();
|
||||
bool old_set_has_lob_header = old_expr.obj_meta_.has_lob_header() && old_str.length() > 0;
|
||||
bool new_set_has_lob_header = new_expr.obj_meta_.has_lob_header() && new_str.length() > 0;
|
||||
ObLobLocatorV2 old_lob(old_str, old_set_has_lob_header);
|
||||
ObLobLocatorV2 new_lob(new_str, new_set_has_lob_header);
|
||||
ObLobCompareParams cmp_params;
|
||||
// binary compare ignore charset
|
||||
cmp_params.collation_left_ = CS_TYPE_BINARY;
|
||||
cmp_params.collation_right_ = CS_TYPE_BINARY;
|
||||
cmp_params.offset_left_ = 0;
|
||||
cmp_params.offset_right_ = 0;
|
||||
cmp_params.compare_len_ = UINT64_MAX;
|
||||
cmp_params.timeout_ = timeout;
|
||||
if(old_set_has_lob_header && new_set_has_lob_header) {
|
||||
if(OB_FAIL(lob_mngr->compare(old_lob, new_lob, cmp_params, result))) {
|
||||
LOG_WARN("fail to compare lob", K(ret), K(old_lob), K(new_lob));
|
||||
}
|
||||
} else {
|
||||
result = ObDatum::binary_equal(old_datum, new_datum) ? 0 : 1;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDMLService::check_row_whether_changed(const ObUpdCtDef &upd_ctdef,
|
||||
ObUpdRtDef &upd_rtdef,
|
||||
ObEvalCtx &eval_ctx)
|
||||
@ -276,7 +316,18 @@ int ObDMLService::check_row_whether_changed(const ObUpdCtDef &upd_ctdef,
|
||||
|| OB_FAIL(new_row.at(idx)->eval(eval_ctx, new_datum))) {
|
||||
LOG_WARN("evaluate value failed", K(ret));
|
||||
} else {
|
||||
upd_rtdef.is_row_changed_ = !ObDatum::binary_equal(*old_datum, *new_datum);
|
||||
if(is_lob_storage(old_row.at(idx)->datum_meta_.type_)
|
||||
&& is_lob_storage(new_row.at(idx)->datum_meta_.type_))
|
||||
{
|
||||
int64_t cmp_res = 0;
|
||||
if(OB_FAIL(check_lob_column_changed(eval_ctx, *old_row.at(idx), *old_datum, *new_row.at(idx), *new_datum, cmp_res))) {
|
||||
LOG_WARN("compare lob datum failed", K(ret));
|
||||
} else {
|
||||
upd_rtdef.is_row_changed_ = (cmp_res != 0);
|
||||
}
|
||||
} else {
|
||||
upd_rtdef.is_row_changed_ = !ObDatum::binary_equal(*old_datum, *new_datum);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -297,7 +348,18 @@ int ObDMLService::check_row_whether_changed(const ObUpdCtDef &upd_ctdef,
|
||||
|| OB_FAIL(new_row.at(idx)->eval(eval_ctx, new_datum))) {
|
||||
LOG_WARN("evaluate value failed", K(ret));
|
||||
} else {
|
||||
upd_rtdef.is_row_changed_ = !ObDatum::binary_equal(*old_datum, *new_datum);
|
||||
if(is_lob_storage(old_row.at(idx)->datum_meta_.type_)
|
||||
&& is_lob_storage(new_row.at(idx)->datum_meta_.type_))
|
||||
{
|
||||
int64_t cmp_res = 0;
|
||||
if(OB_FAIL(check_lob_column_changed(eval_ctx, *old_row.at(idx), *old_datum, *new_row.at(idx), *new_datum, cmp_res))) {
|
||||
LOG_WARN("compare lob datum failed", K(ret));
|
||||
} else {
|
||||
upd_rtdef.is_row_changed_ = (cmp_res != 0);
|
||||
}
|
||||
} else {
|
||||
upd_rtdef.is_row_changed_ = !ObDatum::binary_equal(*old_datum, *new_datum);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -50,6 +50,11 @@ public:
|
||||
static int create_rowkey_check_hashset(int64_t estimate_row,
|
||||
ObExecContext *root_ctx,
|
||||
SeRowkeyDistCtx *&rowkey_dist_ctx);
|
||||
|
||||
static int check_lob_column_changed(ObEvalCtx &eval_ctx,
|
||||
const ObExpr& old_expr, ObDatum& old_datum,
|
||||
const ObExpr& new_expr, ObDatum& new_datum,
|
||||
int64_t& result);
|
||||
static int check_row_whether_changed(const ObUpdCtDef &upd_ctdef, ObUpdRtDef &upd_rtdef, ObEvalCtx &eval_ctx);
|
||||
static int filter_row_for_check_cst(const ExprFixedArray &cst_exprs,
|
||||
ObEvalCtx &eval_ctx,
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include "lib/objectpool/ob_server_object_pool.h"
|
||||
#include "observer/ob_server.h"
|
||||
#include "storage/tx_storage/ob_ls_service.h"
|
||||
#include "sql/engine/expr/ob_expr_util.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -689,6 +690,166 @@ int ObLobManager::query(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLobManager::compare(ObLobLocatorV2& lob_left,
|
||||
ObLobLocatorV2& lob_right,
|
||||
ObLobCompareParams& cmp_params,
|
||||
int64_t& result) {
|
||||
INIT_SUCC(ret);
|
||||
ObArenaAllocator tmp_allocator(ObModIds::OB_LOB_READER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
|
||||
ObLobManager *lob_mngr = MTL(ObLobManager*);
|
||||
if (OB_ISNULL(lob_mngr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get lob manager handle null.", K(ret));
|
||||
} else if(!lob_left.has_lob_header() || !lob_right.has_lob_header()) {
|
||||
ret = OB_ERR_ARG_INVALID;
|
||||
LOG_WARN("invalid lob. should have lob locator", K(ret));
|
||||
} else {
|
||||
// get lob access param
|
||||
ObLobAccessParam param_left;
|
||||
ObLobAccessParam param_right;
|
||||
|
||||
if (OB_FAIL(build_lob_param(param_left, tmp_allocator, cmp_params.collation_left_,
|
||||
cmp_params.offset_left_, cmp_params.compare_len_, cmp_params.timeout_, lob_left))) {
|
||||
LOG_WARN("fail to build read param left", K(ret), K(lob_left), K(cmp_params));
|
||||
} else if(OB_FAIL(build_lob_param(param_right, tmp_allocator, cmp_params.collation_right_,
|
||||
cmp_params.offset_right_, cmp_params.compare_len_, cmp_params.timeout_, lob_right))) {
|
||||
LOG_WARN("fail to build read param new", K(ret), K(lob_right));
|
||||
} else if(OB_FAIL(compare(param_left, param_right, result))) {
|
||||
LOG_WARN("fail to compare lob", K(ret), K(lob_left), K(lob_right), K(cmp_params));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLobManager::compare(ObLobAccessParam& param_left,
|
||||
ObLobAccessParam& param_right,
|
||||
int64_t& result) {
|
||||
INIT_SUCC(ret);
|
||||
common::ObCollationType collation_left = param_left.coll_type_;
|
||||
common::ObCollationType collation_right = param_right.coll_type_;
|
||||
common::ObCollationType cmp_collation = collation_left;
|
||||
ObIAllocator* tmp_allocator = param_left.allocator_;
|
||||
ObLobQueryIter *iter_left = nullptr;
|
||||
ObLobQueryIter *iter_right = nullptr;
|
||||
if(OB_ISNULL(tmp_allocator)) {
|
||||
ret = OB_ERR_ARG_INVALID;
|
||||
LOG_WARN("invalid alloctor param", K(ret), K(param_left));
|
||||
} else if((collation_left == CS_TYPE_BINARY && collation_right != CS_TYPE_BINARY)
|
||||
|| (collation_left != CS_TYPE_BINARY && collation_right == CS_TYPE_BINARY)) {
|
||||
ret = OB_ERR_ARG_INVALID;
|
||||
LOG_WARN("invalid collation param", K(ret), K(param_left), K(param_right));
|
||||
} else if (OB_FAIL(query(param_left, iter_left))) {
|
||||
LOG_WARN("query param left by iter failed.", K(ret), K(param_left));
|
||||
} else if (OB_FAIL(query(param_right, iter_right))) {
|
||||
LOG_WARN("query param right by iter failed.", K(ret), K(param_right));
|
||||
} else {
|
||||
uint64_t read_buff_size = ObLobManager::LOB_READ_BUFFER_LEN;
|
||||
char *read_buff = nullptr;
|
||||
char *charset_convert_buff_ptr = nullptr;
|
||||
uint64_t charset_convert_buff_size = read_buff_size * ObCharset::CharConvertFactorNum;
|
||||
|
||||
if (OB_ISNULL((read_buff = static_cast<char*>(tmp_allocator->alloc(read_buff_size * 2))))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("alloc read buffer failed.", K(ret), K(read_buff_size));
|
||||
} else if (OB_ISNULL((charset_convert_buff_ptr = static_cast<char*>(tmp_allocator->alloc(charset_convert_buff_size))))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("alloc charset convert buffer failed.", K(ret), K(charset_convert_buff_size));
|
||||
} else {
|
||||
ObDataBuffer charset_convert_buff(charset_convert_buff_ptr, charset_convert_buff_size);
|
||||
ObString read_buffer_left;
|
||||
ObString read_buffer_right;
|
||||
read_buffer_left.assign_buffer(read_buff, read_buff_size);
|
||||
read_buffer_right.assign_buffer(read_buff + read_buff_size, read_buff_size);
|
||||
|
||||
// compare right after charset convert
|
||||
ObString convert_buffer_right;
|
||||
convert_buffer_right.assign_ptr(nullptr, 0);
|
||||
|
||||
while (OB_SUCC(ret) && result == 0) {
|
||||
if (read_buffer_left.length() == 0) {
|
||||
// reset buffer and read next block
|
||||
read_buffer_left.assign_buffer(read_buff, read_buff_size);
|
||||
if (OB_FAIL(iter_left->get_next_row(read_buffer_left))) {
|
||||
if (ret != OB_ITER_END) {
|
||||
LOG_WARN("failed to get next buffer for left lob.", K(ret));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && convert_buffer_right.length() == 0) {
|
||||
read_buffer_right.assign_buffer(read_buff + read_buff_size, read_buff_size);
|
||||
charset_convert_buff.set_data(charset_convert_buff_ptr, charset_convert_buff_size);
|
||||
convert_buffer_right.assign_ptr(nullptr, 0);
|
||||
|
||||
if (OB_FAIL(iter_right->get_next_row(read_buffer_right))) {
|
||||
if (ret != OB_ITER_END) {
|
||||
LOG_WARN("failed to get next buffer for right lob", K(ret));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
// convert right lob to left charset if necessary
|
||||
if(OB_FAIL(ObExprUtil::convert_string_collation(
|
||||
read_buffer_right, collation_right,
|
||||
convert_buffer_right, cmp_collation,
|
||||
charset_convert_buff))) {
|
||||
LOG_WARN("fail to convert string collation", K(ret),
|
||||
K(read_buffer_right), K(collation_right),
|
||||
K(convert_buffer_right), K(cmp_collation));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (read_buffer_left.length() == 0 && convert_buffer_right.length() == 0) {
|
||||
result = 0;
|
||||
ret = OB_ITER_END;
|
||||
} else if (read_buffer_left.length() == 0 && convert_buffer_right.length() > 0) {
|
||||
result = -1;
|
||||
} else if (read_buffer_left.length() > 0 && convert_buffer_right.length() == 0) {
|
||||
result = 1;
|
||||
} else {
|
||||
uint64_t cmp_len = read_buffer_left.length() > convert_buffer_right.length() ?
|
||||
convert_buffer_right.length() : read_buffer_left.length();
|
||||
ObString substr_lob_left;
|
||||
ObString substr_lob_right;
|
||||
substr_lob_left.assign_ptr(read_buffer_left.ptr(), cmp_len);
|
||||
substr_lob_right.assign_ptr(convert_buffer_right.ptr(), cmp_len);
|
||||
result = common::ObCharset::strcmp(cmp_collation, substr_lob_left, substr_lob_right);
|
||||
if (result > 0) {
|
||||
result = 1;
|
||||
} else if (result < 0) {
|
||||
result = -1;
|
||||
}
|
||||
|
||||
read_buffer_left.assign_ptr(read_buffer_left.ptr() + cmp_len, read_buffer_left.length() - cmp_len);
|
||||
convert_buffer_right.assign_ptr(convert_buffer_right.ptr() + cmp_len, convert_buffer_right.length() - cmp_len);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (ret == OB_ITER_END) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
if (OB_NOT_NULL(read_buff)) {
|
||||
tmp_allocator->free(read_buff);
|
||||
}
|
||||
if (OB_NOT_NULL(charset_convert_buff_ptr)) {
|
||||
tmp_allocator->free(charset_convert_buff_ptr);
|
||||
}
|
||||
}
|
||||
if (OB_NOT_NULL(iter_left)) {
|
||||
iter_left->reset();
|
||||
common::sop_return(ObLobQueryIter, iter_left);
|
||||
}
|
||||
if (OB_NOT_NULL(iter_right)) {
|
||||
iter_right->reset();
|
||||
common::sop_return(ObLobQueryIter, iter_right);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLobManager::write_one_piece(ObLobAccessParam& param,
|
||||
common::ObTabletID& piece_tablet_id,
|
||||
ObLobCtx& lob_ctx,
|
||||
|
@ -38,6 +38,35 @@ struct ObLobQueryResult {
|
||||
TO_STRING_KV(K_(meta_result), K_(piece_info));
|
||||
};
|
||||
|
||||
struct ObLobCompareParams {
|
||||
|
||||
ObLobCompareParams()
|
||||
: collation_left_(CS_TYPE_INVALID),
|
||||
collation_right_(CS_TYPE_INVALID),
|
||||
offset_left_(0),
|
||||
offset_right_(0),
|
||||
compare_len_(0),
|
||||
timeout_(0)
|
||||
{
|
||||
}
|
||||
|
||||
TO_STRING_KV(K(collation_left_),
|
||||
KP(collation_right_),
|
||||
K(offset_left_),
|
||||
K(offset_right_),
|
||||
K(compare_len_),
|
||||
K(timeout_));
|
||||
|
||||
ObCollationType collation_left_;
|
||||
ObCollationType collation_right_;
|
||||
uint64_t offset_left_;
|
||||
uint64_t offset_right_;
|
||||
|
||||
// compare length
|
||||
uint64_t compare_len_;
|
||||
int64_t timeout_;
|
||||
};
|
||||
|
||||
class ObLobQueryRemoteReader
|
||||
{
|
||||
public:
|
||||
@ -166,6 +195,23 @@ public:
|
||||
int write(ObLobAccessParam& param,
|
||||
ObLobLocatorV2& lob,
|
||||
uint64_t offset);
|
||||
|
||||
// compare lob byte wise, collation type is binary
|
||||
// @param [in] lob_left lob param of left operand for comparison
|
||||
// @param [in] collation_left collation type of left operand for comparison
|
||||
// @param [in] offset_left start position of left lob for comparison
|
||||
// @param [in] lob_right lob param of right operand for comparison
|
||||
// @param [in] collation_right collation type of right operand for comparison
|
||||
// @param [in] offset_right start position of right lob for comparison
|
||||
// @param [in] amount_len comparison length
|
||||
// @param [in] timeout lob read timeout
|
||||
// @param [out] result: 0 if the data exactly matches over the range specified by the offset and amount parameters.
|
||||
// -1 if the first is less than the second, and 1 if it is greater.
|
||||
int compare(ObLobLocatorV2& lob_left,
|
||||
ObLobLocatorV2& lob_right,
|
||||
ObLobCompareParams& cmp_params,
|
||||
int64_t& result);
|
||||
|
||||
// int insert(const common::ObTabletID &tablet_id, ObObj *obj, uint64_t offset, char *data, uint64_t len);
|
||||
// int erase(const common::ObTabletID &tablet_id, ObObj *obj, uint64_t offset, uint64_t len);
|
||||
int get_real_data(ObLobAccessParam& param,
|
||||
@ -248,6 +294,11 @@ private:
|
||||
bool lob_handle_has_char_len(ObLobAccessParam& param);
|
||||
int64_t* get_char_len_ptr(ObLobAccessParam& param);
|
||||
int fill_lob_locator_extern(ObLobAccessParam& param);
|
||||
|
||||
int compare(ObLobAccessParam& param_left,
|
||||
ObLobAccessParam& param_right,
|
||||
int64_t& result);
|
||||
|
||||
private:
|
||||
static const int64_t DEFAULT_LOB_META_BUCKET_CNT = 1543;
|
||||
static const int64_t LOB_IN_ROW_MAX_LENGTH = 4096; // 4K
|
||||
|
Reference in New Issue
Block a user