fix master adpat no char_len
This commit is contained in:
@ -39,6 +39,19 @@ static int check_write_length(ObLobAccessParam& param, int64_t expected_len)
|
||||
|
||||
const ObLobCommon ObLobManager::ZERO_LOB = ObLobCommon();
|
||||
|
||||
static bool lob_handle_has_char_len_field(ObLobAccessParam& param)
|
||||
{
|
||||
bool bret = false;
|
||||
if (param.lob_common_ != nullptr && !param.lob_common_->in_row_) {
|
||||
if (param.handle_size_ >= ObLobManager::LOB_OUTROW_FULL_SIZE) {
|
||||
bret = true;
|
||||
} else {
|
||||
LOG_INFO("old old data", K(param));
|
||||
}
|
||||
}
|
||||
return bret;
|
||||
}
|
||||
|
||||
int ObLobManager::mtl_new(ObLobManager *&m) {
|
||||
int ret = OB_SUCCESS;
|
||||
const uint64_t tenant_id = MTL_ID();
|
||||
@ -565,6 +578,43 @@ int ObLobManager::query_remote(ObLobAccessParam& param, common::ObAddr& dst_addr
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int read_all(
|
||||
ObLobAccessParam& param,
|
||||
ObLobMetaScanIter& meta_iter,
|
||||
ObString& output_data)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLobQueryResult result;
|
||||
meta_iter.set_not_calc_char_len(true);
|
||||
while (OB_SUCC(ret)) {
|
||||
ret = meta_iter.get_next_row(result.meta_result_);
|
||||
const char *lob_data = result.meta_result_.info_.lob_data_.ptr();
|
||||
uint32_t byte_len = result.meta_result_.info_.lob_data_.length();
|
||||
if (OB_FAIL(ret)) {
|
||||
if (ret == OB_ITER_END) {
|
||||
} else {
|
||||
LOG_WARN("failed to get next row.", K(ret));
|
||||
}
|
||||
} else if (ObTimeUtility::current_time() > param.timeout_) {
|
||||
ret = OB_TIMEOUT;
|
||||
int64_t cur_time = ObTimeUtility::current_time();
|
||||
LOG_WARN("query timeout", K(cur_time), K(param.timeout_), K(ret));
|
||||
} else if (param.scan_backward_ && output_data.write_front(lob_data, byte_len) != byte_len) {
|
||||
ret = OB_ERR_INTERVAL_INVALID;
|
||||
LOG_WARN("failed to write buffer to output_data.", K(ret),
|
||||
K(output_data.length()), K(output_data.remain()), K(byte_len));
|
||||
} else if (!param.scan_backward_ && output_data.write(lob_data, byte_len) != byte_len) {
|
||||
ret = OB_ERR_INTERVAL_INVALID;
|
||||
LOG_WARN("failed to write buffer to output_data.", K(ret),
|
||||
K(output_data.length()), K(output_data.remain()), K(byte_len));
|
||||
}
|
||||
}
|
||||
if (ret == OB_ITER_END) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLobManager::query(
|
||||
ObLobAccessParam& param,
|
||||
ObString& output_data)
|
||||
@ -632,6 +682,10 @@ int ObLobManager::query(
|
||||
param.lob_data_ = reinterpret_cast<ObLobData*>(lob_common->buffer_);
|
||||
if (OB_FAIL(lob_ctx.lob_meta_mngr_->scan(param, meta_iter))) {
|
||||
LOG_WARN("do lob meta scan failed.", K(ret), K(param));
|
||||
} else if (param.is_full_read()) {
|
||||
if (OB_FAIL(read_all(param, meta_iter, output_data))) {
|
||||
LOG_WARN("read_all fail", K(ret), K(param));
|
||||
}
|
||||
} else {
|
||||
ObLobQueryResult result;
|
||||
while (OB_SUCC(ret)) {
|
||||
@ -1176,6 +1230,35 @@ int ObLobManager::check_need_out_row(
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (! param.lob_common_->in_row_ && need_out_row) {
|
||||
// outrow -> outrow : keep outrow
|
||||
bool has_char_len = lob_handle_has_char_len(param);
|
||||
if (has_char_len) { // skip
|
||||
} else if (! lob_handle_has_char_len_field(param)) { // skip
|
||||
} else if (param.op_type_ != ObLobDataOutRowCtx::OpType::SQL) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected case", K(ret), K(param), K(has_char_len));
|
||||
} else if (0 != param.offset_ || 0 != param.byte_size_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected case", K(ret), K(param), K(has_char_len));
|
||||
} else if (param.is_full_insert()) {
|
||||
// reset char_len to 0 from UINT64_MAX
|
||||
int64_t *char_len = ObLobManager::get_char_len_ptr(param);
|
||||
if (OB_ISNULL(char_len)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("char_len ptr is null", K(ret), K(param), K(has_char_len));
|
||||
} else if (*char_len != UINT64_MAX) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("char_len should be zero", K(ret), K(param), K(has_char_len), K(*char_len));
|
||||
} else {
|
||||
*char_len = 0;
|
||||
LOG_INFO("no_char_len to has_char_len", K(param));
|
||||
}
|
||||
} else {
|
||||
// partial update aloways store char_len beacaure is only support in oracle mode
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unsupport situation", K(ret), K(param), K(has_char_len));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -2192,8 +2275,18 @@ int ObLobManager::getlength_remote(ObLobAccessParam& param, common::ObAddr& dst_
|
||||
|
||||
bool ObLobManager::lob_handle_has_char_len(ObLobAccessParam& param)
|
||||
{
|
||||
return (param.lob_common_ != nullptr && !param.lob_common_->in_row_ &&
|
||||
param.handle_size_ >= LOB_OUTROW_FULL_SIZE);
|
||||
bool bret = false;
|
||||
if (param.lob_common_ != nullptr && !param.lob_common_->in_row_ && param.handle_size_ >= LOB_OUTROW_FULL_SIZE) {
|
||||
char *ptr = reinterpret_cast<char*>(param.lob_common_);
|
||||
uint64_t *len = reinterpret_cast<uint64_t*>(ptr + LOB_WITH_OUTROW_CTX_SIZE);
|
||||
if (*len != UINT64_MAX) {
|
||||
bret = true;
|
||||
} else {
|
||||
LOG_WARN_RET(OB_SUCCESS, "found no char_len, this only happen in inner QA upgrade test, can not happen in user situation", K(param));
|
||||
}
|
||||
}
|
||||
return bret;
|
||||
|
||||
}
|
||||
|
||||
int64_t* ObLobManager::get_char_len_ptr(ObLobAccessParam& param)
|
||||
@ -3424,6 +3517,41 @@ int ObLobManager::prepare_erase_buffer(ObLobAccessParam& param, ObString &tmp_bu
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLobManager::batch_delete(ObLobAccessParam& param, ObLobMetaScanIter &meta_iter)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLobQueryResult result;
|
||||
meta_iter.set_not_calc_char_len(true);
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(meta_iter.get_next_row(result.meta_result_))) {
|
||||
if (ret == OB_ITER_END) {
|
||||
} else {
|
||||
LOG_WARN("failed to get next row.", K(ret));
|
||||
}
|
||||
} else if (ObTimeUtility::current_time() > param.timeout_) {
|
||||
ret = OB_TIMEOUT;
|
||||
int64_t cur_time = ObTimeUtility::current_time();
|
||||
LOG_WARN("query timeout", K(cur_time), K(param.timeout_), K(ret));
|
||||
} else if (OB_FAIL(lob_ctx_.lob_meta_mngr_->erase(param, result.meta_result_.info_))) {
|
||||
LOG_WARN("write lob meta row failed.", K(ret));
|
||||
} else if (OB_FAIL(update_out_ctx(param, nullptr, result.meta_result_.info_))) { // old row
|
||||
LOG_WARN("failed update checksum.", K(ret));
|
||||
} else {
|
||||
param.lob_data_->byte_size_ -= result.meta_result_.info_.byte_len_;
|
||||
if (lob_handle_has_char_len(param)) {
|
||||
int64_t *len = get_char_len_ptr(param);
|
||||
*len = *len - result.meta_result_.info_.char_len_;
|
||||
OB_ASSERT(*len >= 0);
|
||||
}
|
||||
param.byte_size_ = param.lob_data_->byte_size_;
|
||||
}
|
||||
}
|
||||
if (ret == OB_ITER_END) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLobManager::erase_imple_inner(ObLobAccessParam& param)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -3435,6 +3563,10 @@ int ObLobManager::erase_imple_inner(ObLobAccessParam& param)
|
||||
LOG_WARN("init lob data out row ctx failed", K(ret));
|
||||
} else if (OB_FAIL(lob_ctx.lob_meta_mngr_->scan(param, meta_iter))) {
|
||||
LOG_WARN("do lob meta scan failed.", K(ret), K(param));
|
||||
} else if (param.is_full_delete()) {
|
||||
if (OB_FAIL(batch_delete(param, meta_iter))) {
|
||||
LOG_WARN("batch_delete fail", K(ret), K(param));
|
||||
}
|
||||
} else if(OB_FAIL(prepare_erase_buffer(param, tmp_buff))) {
|
||||
LOG_WARN("fail to prepare buffers", K(ret), K(param));
|
||||
} else {
|
||||
|
||||
@ -277,6 +277,8 @@ private:
|
||||
ObLobMetaInfo& meta_info,
|
||||
ObLobPieceInfo& piece_info);
|
||||
|
||||
int batch_delete(ObLobAccessParam& param, ObLobMetaScanIter &iter);
|
||||
|
||||
void transform_query_result_charset(const common::ObCollationType& coll_type,
|
||||
const char* data,
|
||||
uint32_t len,
|
||||
|
||||
@ -38,7 +38,7 @@ int ObLobMetaScanIter::open(ObLobAccessParam ¶m, ObILobApator* lob_adatper)
|
||||
}
|
||||
|
||||
ObLobMetaScanIter::ObLobMetaScanIter()
|
||||
: lob_adatper_(nullptr), meta_iter_(nullptr), param_(), scan_param_(), cur_pos_(0), cur_byte_pos_(0) {}
|
||||
: lob_adatper_(nullptr), meta_iter_(nullptr), param_(), scan_param_(), cur_pos_(0), cur_byte_pos_(0), not_calc_char_len_(false) {}
|
||||
|
||||
int ObLobMetaScanIter::get_next_row(ObLobMetaInfo &row)
|
||||
{
|
||||
@ -77,6 +77,19 @@ int ObLobMetaScanIter::get_next_row(ObLobMetaInfo &row)
|
||||
LOG_WARN("get meta info from row failed.", K(ret), KPC(datum_row));
|
||||
} else {
|
||||
cur_info_ = row;
|
||||
if (is_char && row.char_len_ == UINT32_MAX) {
|
||||
LOG_INFO("found no char_len, this only happen in inner QA upgrade test, can not happen in user situation", K(param_), KPC(this));
|
||||
if (row.byte_len_ != param_.byte_size_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected situation", K(ret), K(param_), KPC(this), K(row));
|
||||
} else if (not_calc_char_len()) {
|
||||
LOG_DEBUG("not_calc_char_len", K(param_), KPC(this), K(row));
|
||||
} else {
|
||||
// char len has not been calc, just calc char len here
|
||||
row.char_len_ = ObCharset::strlen_char(param_.coll_type_, row.lob_data_.ptr(), row.lob_data_.length());
|
||||
LOG_DEBUG("calc_char_len", K(param_), KPC(this), K(row));
|
||||
}
|
||||
}
|
||||
if (is_range_over(row)) {
|
||||
// if row cur_pos > offset + len, need break;
|
||||
ret = OB_ITER_END;
|
||||
|
||||
@ -81,7 +81,9 @@ public:
|
||||
bool is_range_begin(const ObLobMetaInfo& info);
|
||||
bool is_range_end(const ObLobMetaInfo& info);
|
||||
bool is_range_over(const ObLobMetaInfo& info);
|
||||
TO_STRING_KV(K_(cur_pos), K_(cur_byte_pos), K_(cur_info));
|
||||
void set_not_calc_char_len(bool not_calc_char_len) { not_calc_char_len_ = not_calc_char_len; }
|
||||
bool not_calc_char_len() const { return not_calc_char_len_; }
|
||||
TO_STRING_KV(K_(cur_pos), K_(cur_byte_pos), K_(cur_info), K_(not_calc_char_len));
|
||||
private:
|
||||
bool is_in_range(const ObLobMetaInfo& info);
|
||||
private:
|
||||
@ -92,6 +94,7 @@ private:
|
||||
uint64_t cur_pos_;
|
||||
uint64_t cur_byte_pos_;
|
||||
ObLobMetaInfo cur_info_;
|
||||
bool not_calc_char_len_;
|
||||
};
|
||||
|
||||
struct ObLobMetaWriteResult {
|
||||
|
||||
@ -64,6 +64,11 @@ struct ObLobAccessParam {
|
||||
}
|
||||
}
|
||||
public:
|
||||
|
||||
bool is_full_read() const { return op_type_ == ObLobDataOutRowCtx::OpType::SQL && 0 == offset_ && (len_ == byte_size_ || INT64_MAX == len_ || UINT64_MAX == len_); }
|
||||
bool is_full_delete() const { return op_type_ == ObLobDataOutRowCtx::OpType::SQL && 0 == offset_ && len_ >= byte_size_; }
|
||||
bool is_full_insert() const { return op_type_ == ObLobDataOutRowCtx::OpType::SQL && 0 == offset_ && 0 == byte_size_; }
|
||||
|
||||
int set_lob_locator(common::ObLobLocatorV2 *lob_locator);
|
||||
int64_t get_inrow_threshold();
|
||||
TO_STRING_KV(K_(tenant_id), K_(src_tenant_id), K_(ls_id), K_(tablet_id), KPC_(lob_locator), KPC_(lob_common),
|
||||
|
||||
Reference in New Issue
Block a user