Use the corresponding column fill cg
Co-authored-by: Charles0429 <xiezhenjiang@gmail.com> Co-authored-by: Monk-Liu <1152761042@qq.com>
This commit is contained in:
@ -23,6 +23,7 @@
|
||||
#include "sql/engine/pdml/static/ob_px_sstable_insert_op.h"
|
||||
#include "storage/ddl/ob_direct_insert_sstable_ctx_new.h"
|
||||
#include "storage/lob/ob_lob_util.h"
|
||||
#include "storage/tablet/ob_tablet.h"
|
||||
#include "sql/engine/expr/ob_expr_lob_utils.h"
|
||||
#include "sql/das/ob_das_utils.h"
|
||||
#include "sql/engine/basic/chunk_store/ob_compact_store.h"
|
||||
@ -355,25 +356,18 @@ ObTabletDDLParam::~ObTabletDDLParam()
|
||||
|
||||
}
|
||||
|
||||
int ObChunkSliceStore::init(const int64_t rowkey_column_count, ObArenaAllocator &allocator,
|
||||
const ObIArray<ObColumnSchemaItem> &col_array,
|
||||
common::ObCompressorType compress_type)
|
||||
int ObChunkSliceStore::init(const int64_t rowkey_column_count, ObTabletHandle &tablet_handle,
|
||||
ObArenaAllocator &allocator, const ObIArray<ObColumnSchemaItem> &col_array, const int64_t dir_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t chunk_mem_limit = 2 * 1024L * 1024L; // 2M
|
||||
if (OB_UNLIKELY(is_inited_)) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("init twice", K(ret));
|
||||
} else if (OB_UNLIKELY(rowkey_column_count <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalida argument", K(ret), K(rowkey_column_count));
|
||||
} else if (OB_FAIL(datum_store_.init(chunk_mem_limit, col_array, MTL_ID(), ObCtxIds::DEFAULT_CTX_ID,
|
||||
"DL_SLICE_STORE", true/*enable_dump*/, 0, false/*disable truncate*/,
|
||||
compress_type == NONE_COMPRESSOR ? SORT_COMPACT_LEVEL : SORT_COMPRESSION_COMPACT_LEVEL,
|
||||
compress_type))) {
|
||||
LOG_WARN("failed to init chunk datum store", K(ret));
|
||||
} else if (OB_FAIL(datum_store_.alloc_dir_id())) {
|
||||
LOG_WARN("failed to alloc dir id", K(ret));
|
||||
} else if (OB_FAIL(prepare_datum_stores(MTL_ID(), tablet_handle, allocator, col_array, dir_id))) {
|
||||
LOG_WARN("fail to prepare datum stores");
|
||||
} else {
|
||||
arena_allocator_ = &allocator;
|
||||
rowkey_column_count_ = rowkey_column_count;
|
||||
@ -383,6 +377,112 @@ int ObChunkSliceStore::init(const int64_t rowkey_column_count, ObArenaAllocator
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObChunkSliceStore::reset()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_NOT_NULL(arena_allocator_)) {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < datum_stores_.count(); ++i) {
|
||||
sql::ObCompactStore *cur_store = datum_stores_.at(i);
|
||||
cur_store->~ObCompactStore();
|
||||
arena_allocator_->free(cur_store);
|
||||
cur_store = nullptr;
|
||||
}
|
||||
}
|
||||
datum_stores_.reset();
|
||||
cg_schemas_.reset();
|
||||
endkey_.reset();
|
||||
target_store_idx_ = -1;
|
||||
row_cnt_ = 0;
|
||||
arena_allocator_ = nullptr;
|
||||
is_inited_ = false;
|
||||
}
|
||||
|
||||
int64_t ObChunkSliceStore::calc_chunk_limit(const ObStorageColumnGroupSchema &cg_schema)
|
||||
{
|
||||
const int64_t basic_column_cnt = 10;
|
||||
const int64_t basic_chunk_memory_limit = 512L * 1024L; // 512KB
|
||||
return ((cg_schema.column_cnt_ / basic_column_cnt) + 1) * basic_chunk_memory_limit;
|
||||
}
|
||||
|
||||
int ObChunkSliceStore::prepare_datum_stores(const uint64_t tenant_id, ObTabletHandle &tablet_handle, ObIAllocator &allocator, const ObIArray<ObColumnSchemaItem> &col_array, const int64_t dir_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t chunk_mem_limit = 64 * 1024L; // 64K
|
||||
ObCompactStore *datum_store = nullptr;
|
||||
void *buf = nullptr;
|
||||
if (OB_UNLIKELY(tenant_id <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(tenant_id));
|
||||
} else {
|
||||
|
||||
ObStorageSchema *storage_schema = nullptr;
|
||||
if (OB_UNLIKELY(!tablet_handle.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid tablet handle", K(ret), K(tablet_handle));
|
||||
} else if (OB_FAIL(tablet_handle.get_obj()->load_storage_schema(allocator, storage_schema))) {
|
||||
LOG_WARN("load storage schema failed", K(ret), K(tablet_handle));
|
||||
} else {
|
||||
const ObIArray<ObStorageColumnGroupSchema> &cg_schemas = storage_schema->get_column_groups();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < cg_schemas.count(); ++i) {
|
||||
const ObStorageColumnGroupSchema &cur_cg_schema = cg_schemas.at(i);
|
||||
ObCompressorType compressor_type = cur_cg_schema.compressor_type_;
|
||||
compressor_type = NONE_COMPRESSOR == compressor_type ? (CS_ENCODING_ROW_STORE == cur_cg_schema.row_store_type_ ? ZSTD_1_3_8_COMPRESSOR : NONE_COMPRESSOR) : compressor_type;
|
||||
if (cur_cg_schema.is_rowkey_column_group() || cur_cg_schema.is_all_column_group()) {
|
||||
target_store_idx_ = i;
|
||||
}
|
||||
if (OB_ISNULL(buf = allocator.alloc(sizeof(ObCompactStore)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("allocate memory failed", K(ret));
|
||||
} else {
|
||||
datum_store = new (buf) ObCompactStore();
|
||||
ObArray<ObColumnSchemaItem> cur_column_items;
|
||||
cur_column_items.set_attr(ObMemAttr(tenant_id, "tmp_cg_item"));
|
||||
for (int64_t j = 0; OB_SUCC(ret) && j < cur_cg_schema.column_cnt_; ++j) {
|
||||
int64_t column_idx = cur_cg_schema.column_idxs_ ? cur_cg_schema.column_idxs_[j] : j; // all_cg column_idxs_ = null
|
||||
if (column_idx >= col_array.count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid column idex", K(ret), K(column_idx), K(col_array.count()), K(i), K(cur_cg_schema));
|
||||
} else if (OB_FAIL(cur_column_items.push_back(col_array.at(column_idx)))) {
|
||||
LOG_WARN("fail to push_back col_item", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(datum_store->init(chunk_mem_limit, cur_column_items, tenant_id, ObCtxIds::DEFAULT_CTX_ID,
|
||||
"DL_SLICE_STORE", true/*enable_dump*/, 0, false/*disable truncate*/,
|
||||
compressor_type == NONE_COMPRESSOR ? SORT_COMPACT_LEVEL : SORT_COMPRESSION_COMPACT_LEVEL,
|
||||
compressor_type))) {
|
||||
LOG_WARN("failed to init chunk datum store", K(ret));
|
||||
} else {
|
||||
datum_store->set_dir_id(dir_id);
|
||||
LOG_INFO("set dir id", K(dir_id));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(datum_stores_.push_back(datum_store))) {
|
||||
LOG_WARN("fail to push back datum_store", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
if (OB_NOT_NULL(datum_store)) {
|
||||
datum_store->~ObCompactStore();
|
||||
allocator.free(datum_store);
|
||||
datum_store = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(cg_schemas_.assign(cg_schemas))) {
|
||||
LOG_WARN("fail to copy cg schemas", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
ObTabletObjLoadHelper::free(allocator, storage_schema);
|
||||
}
|
||||
LOG_INFO("init ObChunkSliceStore", K(*this));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObChunkSliceStore::append_row(const blocksstable::ObDatumRow &datum_row)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -392,8 +492,17 @@ int ObChunkSliceStore::append_row(const blocksstable::ObDatumRow &datum_row)
|
||||
} else if (OB_UNLIKELY(!datum_row.is_valid() || datum_row.get_column_count() < rowkey_column_count_)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(datum_row), K(rowkey_column_count_));
|
||||
} else if (OB_FAIL(datum_store_.add_row(datum_row.storage_datums_, datum_row.get_column_count(), 0/*extra_size*/))) {
|
||||
LOG_WARN("chunk datum store add row failed", K(ret));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < cg_schemas_.count(); ++i) {
|
||||
ObStorageColumnGroupSchema &cur_cg_schema = cg_schemas_.at(i);
|
||||
sql::ObCompactStore *cur_store = datum_stores_.at(i);
|
||||
if (OB_FAIL(cur_store->add_row(datum_row, cur_cg_schema, 0/*extra_size*/))) {
|
||||
LOG_WARN("chunk datum store add row failed", K(ret), K(i), K(datum_row.get_column_count()), K(cur_cg_schema), K(cg_schemas_));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
++row_cnt_;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -404,9 +513,10 @@ int ObChunkSliceStore::close()
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else if (datum_store_.get_row_cnt() > 0) { // save endkey
|
||||
} else if (datum_stores_.count() > 0 && OB_NOT_NULL(datum_stores_.at(target_store_idx_)) && datum_stores_.at(target_store_idx_)->get_row_cnt() > 0) { // save endkey
|
||||
const ObChunkDatumStore::StoredRow *stored_row = nullptr;
|
||||
if (OB_FAIL(datum_store_.get_last_stored_row(stored_row))) {
|
||||
ObCompactStore *target_store = datum_stores_.at(target_store_idx_);
|
||||
if (OB_FAIL(target_store->get_last_stored_row(stored_row))) {
|
||||
LOG_WARN("fail to get last stored row", K(ret));
|
||||
} else if (OB_UNLIKELY(nullptr == stored_row || stored_row->cnt_ < rowkey_column_count_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -430,8 +540,10 @@ int ObChunkSliceStore::close()
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(datum_store_.finish_add_row(true/*need_dump*/))) {
|
||||
LOG_WARN("finish add row failed", K(ret));
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < datum_stores_.count(); ++i) {
|
||||
if (OB_FAIL(datum_stores_.at(i)->finish_add_row(true/*need_dump*/))) {
|
||||
LOG_WARN("finish add row failed", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG_DEBUG("chunk slice store closed", K(ret), K(endkey_));
|
||||
@ -539,7 +651,8 @@ ObDirectLoadSliceWriter::~ObDirectLoadSliceWriter()
|
||||
int ObDirectLoadSliceWriter::prepare_slice_store_if_need(
|
||||
const int64_t schema_rowkey_column_num,
|
||||
const bool is_column_store,
|
||||
const ObCompressorType compress_type,
|
||||
const int64_t dir_id,
|
||||
ObTabletHandle &tablet_handle,
|
||||
const SCN &start_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -552,15 +665,13 @@ int ObDirectLoadSliceWriter::prepare_slice_store_if_need(
|
||||
if (is_column_store) {
|
||||
need_column_store_ = true;
|
||||
ObChunkSliceStore *chunk_slice_store = nullptr;
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
if (OB_UNLIKELY(!tablet_handle.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid tablet handle", K(ret), K(tablet_handle));
|
||||
} else if (OB_ISNULL(chunk_slice_store = OB_NEWx(ObChunkSliceStore, &allocator_))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("allocate memory for chunk slice store failed", K(ret));
|
||||
} else if (OB_FAIL(chunk_slice_store->init(schema_rowkey_column_num +
|
||||
ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(), allocator_,
|
||||
tablet_direct_load_mgr_->get_column_info(),
|
||||
compress_type))) {
|
||||
} else if (OB_FAIL(chunk_slice_store->init(schema_rowkey_column_num + ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(), tablet_handle, allocator_, tablet_direct_load_mgr_->get_column_info(), dir_id))) {
|
||||
LOG_WARN("init chunk slice store failed", K(ret));
|
||||
} else {
|
||||
slice_store_ = chunk_slice_store;
|
||||
@ -754,6 +865,7 @@ int ObDirectLoadSliceWriter::fill_lob_into_macro_block(
|
||||
info.trans_id_, info.seq_no_, timeout_ts, lob_inrow_threshold, info.src_tenant_id_, row_iter))) {
|
||||
LOG_WARN("fail to prepare iters", K(ret), KP(row_iter), K(datum));
|
||||
} else {
|
||||
ObTabletHandle unused_tablet_handle; //lob no need to get storageschema with handle
|
||||
while (OB_SUCC(ret)) {
|
||||
const blocksstable::ObDatumRow *cur_row = nullptr;
|
||||
if (OB_FAIL(THIS_WORKER.check_status())) {
|
||||
@ -774,7 +886,7 @@ int ObDirectLoadSliceWriter::fill_lob_into_macro_block(
|
||||
} else if (OB_FAIL(check_null(false/*is_index_table*/, ObLobMetaUtil::LOB_META_SCHEMA_ROWKEY_COL_CNT, *cur_row))) {
|
||||
LOG_WARN("fail to check null value in row", KR(ret), KPC(cur_row));
|
||||
} else if (OB_FAIL(prepare_slice_store_if_need(ObLobMetaUtil::LOB_META_SCHEMA_ROWKEY_COL_CNT,
|
||||
false/*is_column_store*/, NONE_COMPRESSOR/*do not use compressort*/, start_scn))) {
|
||||
false/*is_column_store*/, 1L/*unsued*/, unused_tablet_handle, start_scn))) {
|
||||
LOG_WARN("prepare macro block writer failed", K(ret));
|
||||
} else if (OB_FAIL(slice_store_->append_row(*cur_row))) {
|
||||
LOG_WARN("macro block writer append row failed", K(ret), KPC(cur_row));
|
||||
@ -803,10 +915,12 @@ int ObDirectLoadSliceWriter::fill_sstable_slice(
|
||||
const SCN &start_scn,
|
||||
const uint64_t table_id,
|
||||
const ObTabletID &tablet_id,
|
||||
ObTabletHandle &tablet_handle,
|
||||
ObIStoreRowIterator *row_iter,
|
||||
const ObTableSchemaItem &schema_item,
|
||||
const ObDirectLoadType &direct_load_type,
|
||||
const ObArray<ObColumnSchemaItem> &column_items,
|
||||
const int64_t dir_id,
|
||||
int64_t &affected_rows,
|
||||
ObInsertMonitor *insert_monitor)
|
||||
{
|
||||
@ -857,8 +971,7 @@ int ObDirectLoadSliceWriter::fill_sstable_slice(
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(check_null(schema_item.is_index_table_, schema_item.rowkey_column_num_, *cur_row))) {
|
||||
LOG_WARN("fail to check null value in row", KR(ret), KPC(cur_row));
|
||||
} else if (OB_FAIL(prepare_slice_store_if_need(schema_item.rowkey_column_num_, schema_item.is_column_store_,
|
||||
schema_item.compress_type_, start_scn))) {
|
||||
} else if (OB_FAIL(prepare_slice_store_if_need(schema_item.rowkey_column_num_, schema_item.is_column_store_, dir_id, tablet_handle, start_scn))) {
|
||||
LOG_WARN("prepare macro block writer failed", K(ret));
|
||||
} else if (OB_FAIL(slice_store_->append_row(*cur_row))) {
|
||||
if (is_full_direct_load_task && OB_ERR_PRIMARY_KEY_DUPLICATE == ret && schema_item.is_unique_index_) {
|
||||
@ -978,114 +1091,67 @@ int ObDirectLoadSliceWriter::fill_column_group(const ObStorageSchema *storage_sc
|
||||
LOG_WARN("fil cg task canceled", K(ret), K(is_canceled_));
|
||||
} else {
|
||||
const ObIArray<ObStorageColumnGroupSchema> &cg_schemas = storage_schema->get_column_groups();
|
||||
const int64_t MAX_CO_BATCH_SIZE = 10; // todo @qilu: add opt hint for batch_cnt
|
||||
ObArray<ObCOSliceWriter *> co_ddl_writers;
|
||||
co_ddl_writers.set_attr(ObMemAttr(MTL_ID(), "DL_co_writers"));
|
||||
ObTimeGuard tg("fill_column_group", 1000L * 1000L * 600L); // 10 mins
|
||||
FLOG_INFO("[DDL_FILL_CG] fill column group start",
|
||||
"tablet_id", tablet_direct_load_mgr_->get_tablet_id(),
|
||||
"row_count", chunk_slice_store->get_row_count(),
|
||||
"column_group_count", cg_schemas.count());
|
||||
|
||||
// 1. reserve writers
|
||||
const int64_t batch_count = MIN(MAX_CO_BATCH_SIZE, cg_schemas.count());
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < batch_count; ++i) {
|
||||
ObCOSliceWriter *tmp_writer = nullptr;
|
||||
if (OB_ISNULL(tmp_writer = OB_NEWx(ObCOSliceWriter, &allocator_))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("allocate memory for co writer failed", K(ret));
|
||||
} else if (OB_FAIL(co_ddl_writers.reserve(batch_count))) {
|
||||
LOG_WARN("fail to reserve writers array", K(ret), K(batch_count));
|
||||
} else if (OB_FAIL(co_ddl_writers.push_back(tmp_writer))) {
|
||||
LOG_WARN("push back co writer failed", K(ret));
|
||||
tmp_writer->~ObCOSliceWriter();
|
||||
allocator_.free(tmp_writer);
|
||||
}
|
||||
}
|
||||
int64_t cg_idx = 0;
|
||||
while (OB_SUCC(ret) && cg_idx < cg_schemas.count()) {
|
||||
tg.click("batch_fill");
|
||||
int64_t current_batch_count = batch_count;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < batch_count; ++i) {
|
||||
if (cg_idx >= cg_schemas.count()) {
|
||||
current_batch_count = i;
|
||||
break;
|
||||
ObCOSliceWriter *cur_writer = nullptr;
|
||||
if (OB_ISNULL(cur_writer = OB_NEWx(ObCOSliceWriter, &allocator_))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("allocate memory for co writer failed", K(ret));
|
||||
} else {
|
||||
// 2. rescan and write
|
||||
for (int64_t cg_idx = 0; OB_SUCC(ret) && cg_idx < cg_schemas.count(); ++cg_idx) {
|
||||
const ObStorageColumnGroupSchema &cg_schema = cg_schemas.at(cg_idx);
|
||||
cur_writer->reset();
|
||||
if (OB_FAIL(cur_writer->init(storage_schema, cg_idx, tablet_direct_load_mgr_, start_seq_, row_offset_, start_scn))) {
|
||||
LOG_WARN("init co ddl writer failed", K(ret), KPC(cur_writer), K(cg_idx), KPC(this));
|
||||
} else {
|
||||
const ObStorageColumnGroupSchema &cg_schema = cg_schemas.at(cg_idx);
|
||||
ObCOSliceWriter *cur_writer = co_ddl_writers.at(i);
|
||||
cur_writer->reset();
|
||||
if (OB_FAIL(cur_writer->init(storage_schema, cg_idx, tablet_direct_load_mgr_, start_seq_, row_offset_, start_scn))) {
|
||||
LOG_WARN("init co ddl writer failed", K(ret), K(i), K(cg_idx), KPC(this));
|
||||
} else {
|
||||
++cg_idx;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
// 2. rescan and write
|
||||
const ObChunkDatumStore::StoredRow *stored_row = nullptr;
|
||||
bool has_next = false;
|
||||
chunk_slice_store->datum_store_.rescan();
|
||||
int64_t begin_ts = ObTimeUtility::fast_current_time();
|
||||
while (OB_SUCC(ret) && OB_SUCC(chunk_slice_store->datum_store_.has_next(has_next)) && has_next) {
|
||||
int64_t row_count = 0;
|
||||
if (row_count > 0 && row_count % (10L * 10000L) == 0) { // print log per 10w records
|
||||
int64_t curr_ts = ObTimeUtility::fast_current_time();
|
||||
FLOG_INFO("[DDL_FILL_CG] rescan and fill", "tablet_id", tablet_direct_load_mgr_->get_tablet_id(),
|
||||
"start_cg_idx", cg_idx - current_batch_count,
|
||||
K(current_batch_count), K(row_count), "cost_time_us", curr_ts - begin_ts);
|
||||
begin_ts = curr_ts;
|
||||
}
|
||||
if (OB_FAIL(chunk_slice_store->datum_store_.get_next_row(stored_row))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
sql::ObCompactStore * cur_datum_store = chunk_slice_store->datum_stores_.at(cg_idx);
|
||||
const ObChunkDatumStore::StoredRow *stored_row = nullptr;
|
||||
bool has_next = false;
|
||||
while (OB_SUCC(ret) && OB_SUCC(cur_datum_store->has_next(has_next)) && has_next) {
|
||||
if (OB_FAIL(cur_datum_store->get_next_row(stored_row))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
} else {
|
||||
LOG_WARN("get next row failed", K(ret));
|
||||
}
|
||||
} else {
|
||||
LOG_WARN("get next row failed", K(ret));
|
||||
}
|
||||
} else {
|
||||
++row_count;
|
||||
if (OB_NOT_NULL(insert_monitor)) {
|
||||
insert_monitor->inserted_cg_row_cnt_ = insert_monitor->inserted_cg_row_cnt_ + current_batch_count;
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < current_batch_count; ++i) {
|
||||
ObCOSliceWriter *cur_writer = co_ddl_writers.at(i);
|
||||
if (OB_NOT_NULL(insert_monitor)) {
|
||||
insert_monitor->inserted_cg_row_cnt_ += 1;
|
||||
}
|
||||
if (OB_FAIL(cur_writer->append_row(stored_row))) {
|
||||
LOG_WARN("append row failed", K(ret), KPC(stored_row), K(row_count));
|
||||
LOG_WARN("append row failed", K(ret), KPC(stored_row));
|
||||
} else {
|
||||
if (OB_NOT_NULL(insert_monitor)) {
|
||||
insert_monitor->inserted_cg_row_cnt_ += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
// 3. close writers
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < current_batch_count; ++i) {
|
||||
ObCOSliceWriter *cur_writer = co_ddl_writers.at(i);
|
||||
if (OB_FAIL(cur_writer->close())) {
|
||||
LOG_WARN("close co ddl writer failed", K(ret));
|
||||
if (OB_SUCC(ret)) {
|
||||
// 3. close writers
|
||||
if (OB_FAIL(cur_writer->close())) {
|
||||
LOG_WARN("close co ddl writer failed", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
FLOG_INFO("[DDL_FILL_CG] finish cg batch", "tablet_id", tablet_direct_load_mgr_->get_tablet_id(),
|
||||
"next_cg_idx", cg_idx, "total_cg_count", cg_schemas.count(), K(current_batch_count));
|
||||
}
|
||||
|
||||
tg.click("fill_end");
|
||||
// 4. free writers, ignore ret
|
||||
for (int64_t i = 0; i < co_ddl_writers.count(); ++i) {
|
||||
ObCOSliceWriter *cur_writer = co_ddl_writers.at(i);
|
||||
if (OB_NOT_NULL(cur_writer)) {
|
||||
cur_writer->~ObCOSliceWriter();
|
||||
allocator_.free(cur_writer);
|
||||
}
|
||||
if (OB_NOT_NULL(cur_writer)) {
|
||||
cur_writer->~ObCOSliceWriter();
|
||||
allocator_.free(cur_writer);
|
||||
}
|
||||
co_ddl_writers.reset();
|
||||
FLOG_INFO("[DDL_FILL_CG] fill column group finished",
|
||||
"tablet_id", tablet_direct_load_mgr_->get_tablet_id(),
|
||||
"row_count", chunk_slice_store->get_row_count(),
|
||||
"column_group_count", cg_schemas.count(),
|
||||
"time_cost_us", tg.get_diff());
|
||||
"column_group_count", cg_schemas.count());
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -1182,19 +1248,13 @@ int ObCOSliceWriter::project_cg_row(const ObStorageColumnGroupSchema &cg_schema,
|
||||
if (OB_UNLIKELY(!cg_schema.is_valid() || nullptr == stored_row)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(cg_schema), KP(stored_row));
|
||||
} else if (cg_schema.column_cnt_ > stored_row->cnt_ || cg_row.get_column_count() != cg_schema.column_cnt_) {
|
||||
} else if (cg_schema.column_cnt_ != stored_row->cnt_ || cg_row.get_column_count() != cg_schema.column_cnt_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("column count not match", K(ret), K(stored_row->cnt_), K(cg_row), K(cg_schema));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < cg_schema.column_cnt_; ++i) {
|
||||
int64_t column_idx = cg_schema.column_idxs_ ? cg_schema.column_idxs_[i] : i;
|
||||
if (column_idx >= stored_row->cnt_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid column idex", K(ret));
|
||||
} else {
|
||||
const ObDatum &cur_datum = stored_row->cells()[column_idx];
|
||||
cg_row.storage_datums_[i].set_datum(cur_datum);
|
||||
}
|
||||
const ObDatum &cur_datum = stored_row->cells()[i];
|
||||
cg_row.storage_datums_[i].set_datum(cur_datum);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
||||
Reference in New Issue
Block a user