[FEAT MERGE] JSON/XML phase2 and JSON Partial Update

Co-authored-by: Carrot-77 <1012982871@qq.com>
Co-authored-by: wu-xingying <729224612@qq.com>
This commit is contained in:
shadowao
2024-04-12 10:46:02 +00:00
committed by ob-robot
parent fbfcd0feaa
commit 4afa70a218
254 changed files with 55981 additions and 10150 deletions

View File

@ -89,7 +89,7 @@ int ObTextStringIter::init(uint32_t buffer_len,
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN,"Lob: invalid lob", K(ret));
} else if (FALSE_IT(is_outrow_ = !locator.has_inrow_data())) {
} else if (!is_outrow_) { // inrow lob always get full data, no need ctx_
} else if (!is_outrow_ && !locator.is_delta_temp_lob()) { // inrow lob always get full data, no need ctx_
} else if (OB_ISNULL(res_allocator)) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "Lob: iter with null allocator", K(ret));
@ -130,6 +130,9 @@ static int init_lob_access_param(storage::ObLobAccessParam &param,
} else if (!lob_iter_ctx->locator_.is_persist_lob()) {
ret = OB_NOT_IMPLEMENT;
COMMON_LOG(WARN, "Lob: outrow temp lob is not supported", K(ret), K(lob_iter_ctx->locator_));
} else if (lob_iter_ctx->locator_.is_delta_temp_lob()) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "Lob: is delta lob", K(ret), K(lob_iter_ctx->locator_));
} else if (OB_FAIL(lob_iter_ctx->locator_.get_disk_locator(disk_loc_str))) {
COMMON_LOG(WARN, "Lob: get disk locator failed.", K(ret));
} else if (FALSE_IT(disk_loc = reinterpret_cast<ObLobCommon *>(disk_loc_str.ptr()))){
@ -238,6 +241,81 @@ int ObTextStringIter::get_outrow_lob_full_data(ObIAllocator *allocator /*nullptr
return ret;
}
int ObTextStringIter::get_delta_lob_full_data(ObLobLocatorV2& lob_locator, ObIAllocator *allocator, ObString &data_str)
{
int ret = OB_SUCCESS;
ObLobCommon *lob_common = nullptr;
ObLobDiffHeader *diff_header = nullptr;
if (! ob_is_json(type_)) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "only json support", K(ret), K(type_));
} else if (OB_FAIL(lob_locator.get_disk_locator(lob_common))) {
COMMON_LOG(WARN, "get disk locator failed.", K(ret), K(lob_locator));
} else if (! lob_common->in_row_) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "Unsupport out row delta tmp lob locator", K(ret), KPC(lob_common));
} else if (OB_ISNULL(diff_header = reinterpret_cast<ObLobDiffHeader*>(lob_common->buffer_))) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "diff_header is null", K(ret), KPC(lob_common));
} else {
char *buf = diff_header->data_;
int64_t data_len = diff_header->persist_loc_size_;
int64_t pos = 0;
ObLobPartialData partial_data;
if (OB_FAIL(partial_data.init())) {
COMMON_LOG(WARN, "map create fail", K(ret));
} else if (OB_FAIL(partial_data.deserialize(buf, data_len, pos))) {
COMMON_LOG(WARN, "deserialize partial data fail", K(ret), K(data_len), K(pos));
} else {
storage::ObLobManager* lob_mngr = MTL(storage::ObLobManager*);
storage::ObLobAccessParam param;
ctx_->locator_ = partial_data.locator_;
if (OB_FAIL(init_lob_access_param(param, ctx_, cs_type_, allocator))) {
COMMON_LOG(WARN, "init_lob_access_param fail", K(ret));
} else if (!param.ls_id_.is_valid() || !param.tablet_id_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "Lob: invalid param.", K(ret), K(param));
} else if ((param.len_ = param.byte_size_) <= 0) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN,"Lob: calc byte size is negative.", K(ret), K(param));
} else if (param.byte_size_ > OB_MAX_LONGTEXT_LENGTH) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN,"Lob: unable to read full data over 512M lob.", K(ret), K(param));
} else if (partial_data.data_length_ > OB_MAX_LONGTEXT_LENGTH) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN,"Lob: unable to read full data over 512M lob.", K(ret), K(param), K(partial_data));
} else {
ctx_->total_byte_len_ = partial_data.data_length_;
ctx_->buff_byte_len_ = static_cast<uint32_t>(partial_data.data_length_);
ctx_->buff_ = static_cast<char *>(ctx_->alloc_->alloc(ctx_->buff_byte_len_));
ObString output_data;
if (OB_ISNULL(ctx_->buff_)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
COMMON_LOG(WARN,"Lob: failed to alloc output buffer",
K(ret), KP(ctx_->buff_), K(ctx_->buff_byte_len_));
} else {
output_data.assign_buffer(ctx_->buff_, ctx_->buff_byte_len_);
if (OB_FAIL(lob_mngr->query(param, output_data))) {
COMMON_LOG(WARN,"Lob: falied to query lob tablets.", K(ret), K(param));
} else {
output_data.set_length(static_cast<int32_t>(partial_data.data_length_));
for(int32_t i = 0; OB_SUCC(ret) && i < partial_data.index_.count(); ++i) {
ObLobChunkIndex &idx = partial_data.index_[i];
if (1 == idx.is_modified_ || 1 == idx.is_add_) {
ObLobChunkData &chunk_data = partial_data.data_[idx.data_idx_];
MEMCPY(output_data.ptr() + idx.offset_, chunk_data.data_.ptr() + idx.pos_, idx.byte_len_);
}
}
ctx_->content_byte_len_ = output_data.length();
data_str = output_data;
}
}
}
}
}
return ret;
}
int ObTextStringIter::get_outrow_prefix_data(uint32_t prefix_char_len)
{
int ret = OB_SUCCESS;
@ -304,6 +382,7 @@ int ObTextStringIter::get_current_block(ObString &str)
int ObTextStringIter::get_full_data(ObString &data_str)
{
int ret = OB_SUCCESS;
ObLobLocatorV2 loc(datum_str_, has_lob_header_);
if (!is_init_ || state_ != TEXTSTRING_ITER_INIT) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "Lob: iter state error", K(ret), K(is_init_), K(state_));
@ -312,8 +391,11 @@ int ObTextStringIter::get_full_data(ObString &data_str)
COMMON_LOG(DEBUG, "Lob: iter with null input", K(ret), K(*this));
} else if (!is_lob_ || !has_lob_header_) { // string types or 4.0 compatiable text
data_str.assign_ptr(datum_str_.ptr(), datum_str_.length());
} else if (loc.is_delta_temp_lob()) {
if (OB_FAIL(get_delta_lob_full_data(loc, tmp_alloc_, data_str))) {
COMMON_LOG(WARN, "get_delta_lob_full_data fail", K(ret), K(loc));
}
} else if (!is_outrow_) { // inrow lob
ObLobLocatorV2 loc(datum_str_, has_lob_header_);
if (OB_FAIL(loc.get_inrow_data(data_str))) {
COMMON_LOG(WARN, "Lob: get lob inrow data failed", K(ret));
}
@ -1344,5 +1426,112 @@ int ObTextStringResult::ob_convert_datum_temporay_lob(ObDatum &datum,
return ret;
}
int64_t ObDeltaLob::get_serialize_size() const
{
int64_t size = 0;
// header size
size += get_header_serialize_size();
// updated lob size
size += get_partial_data_serialize_size();
// lob diff size
size += get_lob_diff_serialize_size();
return size;
}
int64_t ObDeltaLob::get_header_serialize_size() const
{
int64_t size = 0;
// ObMemLobCommon
size += sizeof(ObMemLobCommon);
// ObLobCommon;
size += sizeof(ObLobCommon);
// ObLobDiffHeader
size += sizeof(ObLobDiffHeader);
return size;
}
int ObDeltaLob::serialize(char* buf, const int64_t buf_len, int64_t& pos) const
{
int ret = OB_SUCCESS;
ObLobDiffHeader *diff_header = nullptr;
if (OB_FAIL(serialize_header(buf, buf_len, pos, diff_header))) {
LOG_WARN("serialize_header fail", KR(ret), K(buf_len), K(pos), KP(buf));
} else if (OB_FAIL(serialize_partial_data(buf, buf_len, pos))) {
LOG_WARN("serialize_partial_data fail", KR(ret), K(buf_len), K(pos), KP(buf));
} else if (OB_FAIL(serialize_lob_diffs(buf, buf_len, diff_header))) {
LOG_WARN("serialize_lob_diffs fail", KR(ret), K(buf_len), K(pos), KP(buf));
}
return ret;
}
int ObDeltaLob::serialize_header(char* buf, const int64_t buf_len, int64_t& pos, ObLobDiffHeader *&diff_header) const
{
int ret = OB_SUCCESS;
int64_t size = get_header_serialize_size();
if (pos + size > buf_len) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("buffer not enough", KR(ret), K(pos), K(size), K(buf_len), KP(buf));
} else {
ObMemLobCommon *mem_common = new (buf + pos) ObMemLobCommon(ObMemLobType::TEMP_DELTA_LOB, false);
ObLobCommon *lob_common = new (mem_common->data_) ObLobCommon();
diff_header = new (lob_common->buffer_) ObLobDiffHeader();
diff_header->diff_cnt_ = get_lob_diff_cnt();
diff_header->persist_loc_size_ = static_cast<uint32_t>(get_partial_data_serialize_size());
pos += size;
}
return ret;
}
int ObDeltaLob::deserialize(const ObLobLocatorV2 &lob_locator)
{
int ret = OB_SUCCESS;
ObLobCommon *lob_common = nullptr;
ObLobDiffHeader *diff_header = nullptr;
if (! lob_locator.is_delta_temp_lob()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("input not delta tmp lob", KR(ret), K(lob_locator));
} else if (OB_FAIL(lob_locator.get_disk_locator(lob_common))) {
LOG_WARN("get disk locator failed.", K(ret), K(lob_locator));
} else if (OB_ISNULL(diff_header = reinterpret_cast<ObLobDiffHeader*>(lob_common->buffer_))){
ret = OB_ERR_UNEXPECTED;
} else if (OB_FAIL(deserialize_partial_data(diff_header))) {
LOG_WARN("deserialize_partial_data fail", KR(ret), K(lob_locator), KPC(diff_header));
} else if (OB_FAIL(deserialize_lob_diffs(lob_locator.ptr_, lob_locator.size_, diff_header))) {
LOG_WARN("deserialize_lob_diffs fail", KR(ret), K(lob_locator), KPC(diff_header));
}
return ret;
}
int ObDeltaLob::has_diff(const ObLobLocatorV2 &locator, int64_t &res)
{
int ret = OB_SUCCESS;
bool bres = false;
if (OB_FAIL(has_diff(locator, bres))) {
LOG_WARN("fail", KR(ret), K(locator));
} else {
res = bres;
}
return ret;
}
int ObDeltaLob::has_diff(const ObLobLocatorV2 &locator, bool &res)
{
int ret = OB_SUCCESS;
ObLobCommon *lob_common = nullptr;
if (! locator.is_delta_temp_lob()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("not delta lob", K(ret), K(locator));
} else if (OB_FAIL(locator.get_disk_locator(lob_common))) {
LOG_WARN("get disk locator failed.", KR(ret), K(locator));
} else if (! lob_common->in_row_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unsupport out row delta tmp lob locator", KR(ret), KPC(lob_common), K(locator));
} else {
ObLobDiffHeader *diff_header = reinterpret_cast<ObLobDiffHeader*>(lob_common->buffer_);
res = diff_header->diff_cnt_ > 0;
}
return ret;
}
}
}