Use the corresponding column fill cg

Co-authored-by: Charles0429 <xiezhenjiang@gmail.com>
Co-authored-by: Monk-Liu <1152761042@qq.com>
This commit is contained in:
renju96
2024-02-09 22:21:21 +00:00
committed by ob-robot
parent d969ad5db0
commit 19d5e20cf1
15 changed files with 355 additions and 261 deletions

View File

@ -23,6 +23,7 @@
#include "sql/engine/pdml/static/ob_px_sstable_insert_op.h"
#include "storage/ddl/ob_direct_insert_sstable_ctx_new.h"
#include "storage/lob/ob_lob_util.h"
#include "storage/tablet/ob_tablet.h"
#include "sql/engine/expr/ob_expr_lob_utils.h"
#include "sql/das/ob_das_utils.h"
#include "sql/engine/basic/chunk_store/ob_compact_store.h"
@ -355,25 +356,18 @@ ObTabletDDLParam::~ObTabletDDLParam()
}
int ObChunkSliceStore::init(const int64_t rowkey_column_count, ObArenaAllocator &allocator,
const ObIArray<ObColumnSchemaItem> &col_array,
common::ObCompressorType compress_type)
int ObChunkSliceStore::init(const int64_t rowkey_column_count, ObTabletHandle &tablet_handle,
ObArenaAllocator &allocator, const ObIArray<ObColumnSchemaItem> &col_array, const int64_t dir_id)
{
int ret = OB_SUCCESS;
const int64_t chunk_mem_limit = 2 * 1024L * 1024L; // 2M
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret));
} else if (OB_UNLIKELY(rowkey_column_count <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalida argument", K(ret), K(rowkey_column_count));
} else if (OB_FAIL(datum_store_.init(chunk_mem_limit, col_array, MTL_ID(), ObCtxIds::DEFAULT_CTX_ID,
"DL_SLICE_STORE", true/*enable_dump*/, 0, false/*disable truncate*/,
compress_type == NONE_COMPRESSOR ? SORT_COMPACT_LEVEL : SORT_COMPRESSION_COMPACT_LEVEL,
compress_type))) {
LOG_WARN("failed to init chunk datum store", K(ret));
} else if (OB_FAIL(datum_store_.alloc_dir_id())) {
LOG_WARN("failed to alloc dir id", K(ret));
} else if (OB_FAIL(prepare_datum_stores(MTL_ID(), tablet_handle, allocator, col_array, dir_id))) {
LOG_WARN("fail to prepare datum stores");
} else {
arena_allocator_ = &allocator;
rowkey_column_count_ = rowkey_column_count;
@ -383,6 +377,112 @@ int ObChunkSliceStore::init(const int64_t rowkey_column_count, ObArenaAllocator
return ret;
}
void ObChunkSliceStore::reset()
{
int ret = OB_SUCCESS;
if (OB_NOT_NULL(arena_allocator_)) {
for (int64_t i = 0; OB_SUCC(ret) && i < datum_stores_.count(); ++i) {
sql::ObCompactStore *cur_store = datum_stores_.at(i);
cur_store->~ObCompactStore();
arena_allocator_->free(cur_store);
cur_store = nullptr;
}
}
datum_stores_.reset();
cg_schemas_.reset();
endkey_.reset();
target_store_idx_ = -1;
row_cnt_ = 0;
arena_allocator_ = nullptr;
is_inited_ = false;
}
int64_t ObChunkSliceStore::calc_chunk_limit(const ObStorageColumnGroupSchema &cg_schema)
{
const int64_t basic_column_cnt = 10;
const int64_t basic_chunk_memory_limit = 512L * 1024L; // 512KB
return ((cg_schema.column_cnt_ / basic_column_cnt) + 1) * basic_chunk_memory_limit;
}
int ObChunkSliceStore::prepare_datum_stores(const uint64_t tenant_id, ObTabletHandle &tablet_handle, ObIAllocator &allocator, const ObIArray<ObColumnSchemaItem> &col_array, const int64_t dir_id)
{
int ret = OB_SUCCESS;
const int64_t chunk_mem_limit = 64 * 1024L; // 64K
ObCompactStore *datum_store = nullptr;
void *buf = nullptr;
if (OB_UNLIKELY(tenant_id <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(tenant_id));
} else {
ObStorageSchema *storage_schema = nullptr;
if (OB_UNLIKELY(!tablet_handle.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tablet handle", K(ret), K(tablet_handle));
} else if (OB_FAIL(tablet_handle.get_obj()->load_storage_schema(allocator, storage_schema))) {
LOG_WARN("load storage schema failed", K(ret), K(tablet_handle));
} else {
const ObIArray<ObStorageColumnGroupSchema> &cg_schemas = storage_schema->get_column_groups();
for (int64_t i = 0; OB_SUCC(ret) && i < cg_schemas.count(); ++i) {
const ObStorageColumnGroupSchema &cur_cg_schema = cg_schemas.at(i);
ObCompressorType compressor_type = cur_cg_schema.compressor_type_;
compressor_type = NONE_COMPRESSOR == compressor_type ? (CS_ENCODING_ROW_STORE == cur_cg_schema.row_store_type_ ? ZSTD_1_3_8_COMPRESSOR : NONE_COMPRESSOR) : compressor_type;
if (cur_cg_schema.is_rowkey_column_group() || cur_cg_schema.is_all_column_group()) {
target_store_idx_ = i;
}
if (OB_ISNULL(buf = allocator.alloc(sizeof(ObCompactStore)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret));
} else {
datum_store = new (buf) ObCompactStore();
ObArray<ObColumnSchemaItem> cur_column_items;
cur_column_items.set_attr(ObMemAttr(tenant_id, "tmp_cg_item"));
for (int64_t j = 0; OB_SUCC(ret) && j < cur_cg_schema.column_cnt_; ++j) {
int64_t column_idx = cur_cg_schema.column_idxs_ ? cur_cg_schema.column_idxs_[j] : j; // all_cg column_idxs_ = null
if (column_idx >= col_array.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid column idex", K(ret), K(column_idx), K(col_array.count()), K(i), K(cur_cg_schema));
} else if (OB_FAIL(cur_column_items.push_back(col_array.at(column_idx)))) {
LOG_WARN("fail to push_back col_item", K(ret));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(datum_store->init(chunk_mem_limit, cur_column_items, tenant_id, ObCtxIds::DEFAULT_CTX_ID,
"DL_SLICE_STORE", true/*enable_dump*/, 0, false/*disable truncate*/,
compressor_type == NONE_COMPRESSOR ? SORT_COMPACT_LEVEL : SORT_COMPRESSION_COMPACT_LEVEL,
compressor_type))) {
LOG_WARN("failed to init chunk datum store", K(ret));
} else {
datum_store->set_dir_id(dir_id);
LOG_INFO("set dir id", K(dir_id));
}
if (OB_SUCC(ret)) {
if (OB_FAIL(datum_stores_.push_back(datum_store))) {
LOG_WARN("fail to push back datum_store", K(ret));
}
}
if (OB_FAIL(ret)) {
if (OB_NOT_NULL(datum_store)) {
datum_store->~ObCompactStore();
allocator.free(datum_store);
datum_store = nullptr;
}
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(cg_schemas_.assign(cg_schemas))) {
LOG_WARN("fail to copy cg schemas", K(ret));
}
}
}
ObTabletObjLoadHelper::free(allocator, storage_schema);
}
LOG_INFO("init ObChunkSliceStore", K(*this));
return ret;
}
int ObChunkSliceStore::append_row(const blocksstable::ObDatumRow &datum_row)
{
int ret = OB_SUCCESS;
@ -392,8 +492,17 @@ int ObChunkSliceStore::append_row(const blocksstable::ObDatumRow &datum_row)
} else if (OB_UNLIKELY(!datum_row.is_valid() || datum_row.get_column_count() < rowkey_column_count_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(datum_row), K(rowkey_column_count_));
} else if (OB_FAIL(datum_store_.add_row(datum_row.storage_datums_, datum_row.get_column_count(), 0/*extra_size*/))) {
LOG_WARN("chunk datum store add row failed", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < cg_schemas_.count(); ++i) {
ObStorageColumnGroupSchema &cur_cg_schema = cg_schemas_.at(i);
sql::ObCompactStore *cur_store = datum_stores_.at(i);
if (OB_FAIL(cur_store->add_row(datum_row, cur_cg_schema, 0/*extra_size*/))) {
LOG_WARN("chunk datum store add row failed", K(ret), K(i), K(datum_row.get_column_count()), K(cur_cg_schema), K(cg_schemas_));
}
}
if (OB_SUCC(ret)) {
++row_cnt_;
}
}
return ret;
}
@ -404,9 +513,10 @@ int ObChunkSliceStore::close()
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (datum_store_.get_row_cnt() > 0) { // save endkey
} else if (datum_stores_.count() > 0 && OB_NOT_NULL(datum_stores_.at(target_store_idx_)) && datum_stores_.at(target_store_idx_)->get_row_cnt() > 0) { // save endkey
const ObChunkDatumStore::StoredRow *stored_row = nullptr;
if (OB_FAIL(datum_store_.get_last_stored_row(stored_row))) {
ObCompactStore *target_store = datum_stores_.at(target_store_idx_);
if (OB_FAIL(target_store->get_last_stored_row(stored_row))) {
LOG_WARN("fail to get last stored row", K(ret));
} else if (OB_UNLIKELY(nullptr == stored_row || stored_row->cnt_ < rowkey_column_count_)) {
ret = OB_ERR_UNEXPECTED;
@ -430,8 +540,10 @@ int ObChunkSliceStore::close()
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(datum_store_.finish_add_row(true/*need_dump*/))) {
LOG_WARN("finish add row failed", K(ret));
for (int64_t i = 0; OB_SUCC(ret) && i < datum_stores_.count(); ++i) {
if (OB_FAIL(datum_stores_.at(i)->finish_add_row(true/*need_dump*/))) {
LOG_WARN("finish add row failed", K(ret));
}
}
}
LOG_DEBUG("chunk slice store closed", K(ret), K(endkey_));
@ -539,7 +651,8 @@ ObDirectLoadSliceWriter::~ObDirectLoadSliceWriter()
int ObDirectLoadSliceWriter::prepare_slice_store_if_need(
const int64_t schema_rowkey_column_num,
const bool is_column_store,
const ObCompressorType compress_type,
const int64_t dir_id,
ObTabletHandle &tablet_handle,
const SCN &start_scn)
{
int ret = OB_SUCCESS;
@ -552,15 +665,13 @@ int ObDirectLoadSliceWriter::prepare_slice_store_if_need(
if (is_column_store) {
need_column_store_ = true;
ObChunkSliceStore *chunk_slice_store = nullptr;
if (OB_FAIL(ret)) {
if (OB_UNLIKELY(!tablet_handle.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tablet handle", K(ret), K(tablet_handle));
} else if (OB_ISNULL(chunk_slice_store = OB_NEWx(ObChunkSliceStore, &allocator_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory for chunk slice store failed", K(ret));
} else if (OB_FAIL(chunk_slice_store->init(schema_rowkey_column_num +
ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(), allocator_,
tablet_direct_load_mgr_->get_column_info(),
compress_type))) {
} else if (OB_FAIL(chunk_slice_store->init(schema_rowkey_column_num + ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(), tablet_handle, allocator_, tablet_direct_load_mgr_->get_column_info(), dir_id))) {
LOG_WARN("init chunk slice store failed", K(ret));
} else {
slice_store_ = chunk_slice_store;
@ -754,6 +865,7 @@ int ObDirectLoadSliceWriter::fill_lob_into_macro_block(
info.trans_id_, info.seq_no_, timeout_ts, lob_inrow_threshold, info.src_tenant_id_, row_iter))) {
LOG_WARN("fail to prepare iters", K(ret), KP(row_iter), K(datum));
} else {
ObTabletHandle unused_tablet_handle; //lob no need to get storageschema with handle
while (OB_SUCC(ret)) {
const blocksstable::ObDatumRow *cur_row = nullptr;
if (OB_FAIL(THIS_WORKER.check_status())) {
@ -774,7 +886,7 @@ int ObDirectLoadSliceWriter::fill_lob_into_macro_block(
} else if (OB_FAIL(check_null(false/*is_index_table*/, ObLobMetaUtil::LOB_META_SCHEMA_ROWKEY_COL_CNT, *cur_row))) {
LOG_WARN("fail to check null value in row", KR(ret), KPC(cur_row));
} else if (OB_FAIL(prepare_slice_store_if_need(ObLobMetaUtil::LOB_META_SCHEMA_ROWKEY_COL_CNT,
false/*is_column_store*/, NONE_COMPRESSOR/*do not use compressort*/, start_scn))) {
false/*is_column_store*/, 1L/*unsued*/, unused_tablet_handle, start_scn))) {
LOG_WARN("prepare macro block writer failed", K(ret));
} else if (OB_FAIL(slice_store_->append_row(*cur_row))) {
LOG_WARN("macro block writer append row failed", K(ret), KPC(cur_row));
@ -803,10 +915,12 @@ int ObDirectLoadSliceWriter::fill_sstable_slice(
const SCN &start_scn,
const uint64_t table_id,
const ObTabletID &tablet_id,
ObTabletHandle &tablet_handle,
ObIStoreRowIterator *row_iter,
const ObTableSchemaItem &schema_item,
const ObDirectLoadType &direct_load_type,
const ObArray<ObColumnSchemaItem> &column_items,
const int64_t dir_id,
int64_t &affected_rows,
ObInsertMonitor *insert_monitor)
{
@ -857,8 +971,7 @@ int ObDirectLoadSliceWriter::fill_sstable_slice(
if (OB_FAIL(ret)) {
} else if (OB_FAIL(check_null(schema_item.is_index_table_, schema_item.rowkey_column_num_, *cur_row))) {
LOG_WARN("fail to check null value in row", KR(ret), KPC(cur_row));
} else if (OB_FAIL(prepare_slice_store_if_need(schema_item.rowkey_column_num_, schema_item.is_column_store_,
schema_item.compress_type_, start_scn))) {
} else if (OB_FAIL(prepare_slice_store_if_need(schema_item.rowkey_column_num_, schema_item.is_column_store_, dir_id, tablet_handle, start_scn))) {
LOG_WARN("prepare macro block writer failed", K(ret));
} else if (OB_FAIL(slice_store_->append_row(*cur_row))) {
if (is_full_direct_load_task && OB_ERR_PRIMARY_KEY_DUPLICATE == ret && schema_item.is_unique_index_) {
@ -978,114 +1091,67 @@ int ObDirectLoadSliceWriter::fill_column_group(const ObStorageSchema *storage_sc
LOG_WARN("fil cg task canceled", K(ret), K(is_canceled_));
} else {
const ObIArray<ObStorageColumnGroupSchema> &cg_schemas = storage_schema->get_column_groups();
const int64_t MAX_CO_BATCH_SIZE = 10; // todo @qilu: add opt hint for batch_cnt
ObArray<ObCOSliceWriter *> co_ddl_writers;
co_ddl_writers.set_attr(ObMemAttr(MTL_ID(), "DL_co_writers"));
ObTimeGuard tg("fill_column_group", 1000L * 1000L * 600L); // 10 mins
FLOG_INFO("[DDL_FILL_CG] fill column group start",
"tablet_id", tablet_direct_load_mgr_->get_tablet_id(),
"row_count", chunk_slice_store->get_row_count(),
"column_group_count", cg_schemas.count());
// 1. reserve writers
const int64_t batch_count = MIN(MAX_CO_BATCH_SIZE, cg_schemas.count());
for (int64_t i = 0; OB_SUCC(ret) && i < batch_count; ++i) {
ObCOSliceWriter *tmp_writer = nullptr;
if (OB_ISNULL(tmp_writer = OB_NEWx(ObCOSliceWriter, &allocator_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory for co writer failed", K(ret));
} else if (OB_FAIL(co_ddl_writers.reserve(batch_count))) {
LOG_WARN("fail to reserve writers array", K(ret), K(batch_count));
} else if (OB_FAIL(co_ddl_writers.push_back(tmp_writer))) {
LOG_WARN("push back co writer failed", K(ret));
tmp_writer->~ObCOSliceWriter();
allocator_.free(tmp_writer);
}
}
int64_t cg_idx = 0;
while (OB_SUCC(ret) && cg_idx < cg_schemas.count()) {
tg.click("batch_fill");
int64_t current_batch_count = batch_count;
for (int64_t i = 0; OB_SUCC(ret) && i < batch_count; ++i) {
if (cg_idx >= cg_schemas.count()) {
current_batch_count = i;
break;
ObCOSliceWriter *cur_writer = nullptr;
if (OB_ISNULL(cur_writer = OB_NEWx(ObCOSliceWriter, &allocator_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory for co writer failed", K(ret));
} else {
// 2. rescan and write
for (int64_t cg_idx = 0; OB_SUCC(ret) && cg_idx < cg_schemas.count(); ++cg_idx) {
const ObStorageColumnGroupSchema &cg_schema = cg_schemas.at(cg_idx);
cur_writer->reset();
if (OB_FAIL(cur_writer->init(storage_schema, cg_idx, tablet_direct_load_mgr_, start_seq_, row_offset_, start_scn))) {
LOG_WARN("init co ddl writer failed", K(ret), KPC(cur_writer), K(cg_idx), KPC(this));
} else {
const ObStorageColumnGroupSchema &cg_schema = cg_schemas.at(cg_idx);
ObCOSliceWriter *cur_writer = co_ddl_writers.at(i);
cur_writer->reset();
if (OB_FAIL(cur_writer->init(storage_schema, cg_idx, tablet_direct_load_mgr_, start_seq_, row_offset_, start_scn))) {
LOG_WARN("init co ddl writer failed", K(ret), K(i), K(cg_idx), KPC(this));
} else {
++cg_idx;
}
}
}
if (OB_SUCC(ret)) {
// 2. rescan and write
const ObChunkDatumStore::StoredRow *stored_row = nullptr;
bool has_next = false;
chunk_slice_store->datum_store_.rescan();
int64_t begin_ts = ObTimeUtility::fast_current_time();
while (OB_SUCC(ret) && OB_SUCC(chunk_slice_store->datum_store_.has_next(has_next)) && has_next) {
int64_t row_count = 0;
if (row_count > 0 && row_count % (10L * 10000L) == 0) { // print log per 10w records
int64_t curr_ts = ObTimeUtility::fast_current_time();
FLOG_INFO("[DDL_FILL_CG] rescan and fill", "tablet_id", tablet_direct_load_mgr_->get_tablet_id(),
"start_cg_idx", cg_idx - current_batch_count,
K(current_batch_count), K(row_count), "cost_time_us", curr_ts - begin_ts);
begin_ts = curr_ts;
}
if (OB_FAIL(chunk_slice_store->datum_store_.get_next_row(stored_row))) {
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
break;
sql::ObCompactStore * cur_datum_store = chunk_slice_store->datum_stores_.at(cg_idx);
const ObChunkDatumStore::StoredRow *stored_row = nullptr;
bool has_next = false;
while (OB_SUCC(ret) && OB_SUCC(cur_datum_store->has_next(has_next)) && has_next) {
if (OB_FAIL(cur_datum_store->get_next_row(stored_row))) {
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
break;
} else {
LOG_WARN("get next row failed", K(ret));
}
} else {
LOG_WARN("get next row failed", K(ret));
}
} else {
++row_count;
if (OB_NOT_NULL(insert_monitor)) {
insert_monitor->inserted_cg_row_cnt_ = insert_monitor->inserted_cg_row_cnt_ + current_batch_count;
}
for (int64_t i = 0; OB_SUCC(ret) && i < current_batch_count; ++i) {
ObCOSliceWriter *cur_writer = co_ddl_writers.at(i);
if (OB_NOT_NULL(insert_monitor)) {
insert_monitor->inserted_cg_row_cnt_ += 1;
}
if (OB_FAIL(cur_writer->append_row(stored_row))) {
LOG_WARN("append row failed", K(ret), KPC(stored_row), K(row_count));
LOG_WARN("append row failed", K(ret), KPC(stored_row));
} else {
if (OB_NOT_NULL(insert_monitor)) {
insert_monitor->inserted_cg_row_cnt_ += 1;
}
}
}
}
}
}
if (OB_SUCC(ret)) {
// 3. close writers
for (int64_t i = 0; OB_SUCC(ret) && i < current_batch_count; ++i) {
ObCOSliceWriter *cur_writer = co_ddl_writers.at(i);
if (OB_FAIL(cur_writer->close())) {
LOG_WARN("close co ddl writer failed", K(ret));
if (OB_SUCC(ret)) {
// 3. close writers
if (OB_FAIL(cur_writer->close())) {
LOG_WARN("close co ddl writer failed", K(ret));
}
}
}
}
FLOG_INFO("[DDL_FILL_CG] finish cg batch", "tablet_id", tablet_direct_load_mgr_->get_tablet_id(),
"next_cg_idx", cg_idx, "total_cg_count", cg_schemas.count(), K(current_batch_count));
}
tg.click("fill_end");
// 4. free writers, ignore ret
for (int64_t i = 0; i < co_ddl_writers.count(); ++i) {
ObCOSliceWriter *cur_writer = co_ddl_writers.at(i);
if (OB_NOT_NULL(cur_writer)) {
cur_writer->~ObCOSliceWriter();
allocator_.free(cur_writer);
}
if (OB_NOT_NULL(cur_writer)) {
cur_writer->~ObCOSliceWriter();
allocator_.free(cur_writer);
}
co_ddl_writers.reset();
FLOG_INFO("[DDL_FILL_CG] fill column group finished",
"tablet_id", tablet_direct_load_mgr_->get_tablet_id(),
"row_count", chunk_slice_store->get_row_count(),
"column_group_count", cg_schemas.count(),
"time_cost_us", tg.get_diff());
"column_group_count", cg_schemas.count());
}
return ret;
}
@ -1182,19 +1248,13 @@ int ObCOSliceWriter::project_cg_row(const ObStorageColumnGroupSchema &cg_schema,
if (OB_UNLIKELY(!cg_schema.is_valid() || nullptr == stored_row)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(cg_schema), KP(stored_row));
} else if (cg_schema.column_cnt_ > stored_row->cnt_ || cg_row.get_column_count() != cg_schema.column_cnt_) {
} else if (cg_schema.column_cnt_ != stored_row->cnt_ || cg_row.get_column_count() != cg_schema.column_cnt_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("column count not match", K(ret), K(stored_row->cnt_), K(cg_row), K(cg_schema));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < cg_schema.column_cnt_; ++i) {
int64_t column_idx = cg_schema.column_idxs_ ? cg_schema.column_idxs_[i] : i;
if (column_idx >= stored_row->cnt_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid column idex", K(ret));
} else {
const ObDatum &cur_datum = stored_row->cells()[column_idx];
cg_row.storage_datums_[i].set_datum(cur_datum);
}
const ObDatum &cur_datum = stored_row->cells()[i];
cg_row.storage_datums_[i].set_datum(cur_datum);
}
}
return ret;