patch 4.1 bug fix and remove some secret url
This commit is contained in:
@ -303,6 +303,54 @@ int ObDirectLoadTabletMergeCtx::collect_sql_statistics(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDirectLoadTabletMergeCtx::init_sstable_array(
|
||||
const ObIArray<ObIDirectLoadPartitionTable *> &table_array)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < table_array.count(); ++i) {
|
||||
ObDirectLoadSSTable *sstable = nullptr;
|
||||
if (OB_ISNULL(sstable = dynamic_cast<ObDirectLoadSSTable *>(table_array.at(i)))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected table", KR(ret), K(i), K(table_array));
|
||||
} else if (OB_FAIL(sstable_array_.push_back(sstable))) {
|
||||
LOG_WARN("fail to push back sstable", KR(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDirectLoadTabletMergeCtx::init_multiple_sstable_array(
|
||||
const ObIArray<ObIDirectLoadPartitionTable *> &table_array)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < table_array.count(); ++i) {
|
||||
ObDirectLoadMultipleSSTable *sstable = nullptr;
|
||||
if (OB_ISNULL(sstable = dynamic_cast<ObDirectLoadMultipleSSTable *>(table_array.at(i)))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected table", KR(ret), K(i), K(table_array));
|
||||
} else if (OB_FAIL(multiple_sstable_array_.push_back(sstable))) {
|
||||
LOG_WARN("fail to push back multiple sstable", KR(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDirectLoadTabletMergeCtx::init_multiple_heap_table_array(
|
||||
const ObIArray<ObIDirectLoadPartitionTable *> &table_array)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < table_array.count(); ++i) {
|
||||
ObDirectLoadMultipleHeapTable *heap_table = nullptr;
|
||||
if (OB_ISNULL(heap_table = dynamic_cast<ObDirectLoadMultipleHeapTable *>(table_array.at(i)))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected table", KR(ret), K(i), K(table_array));
|
||||
} else if (OB_FAIL(multiple_heap_table_array_.push_back(heap_table))) {
|
||||
LOG_WARN("fail to push back multiple heap table", KR(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDirectLoadTabletMergeCtx::build_merge_task(
|
||||
const ObIArray<ObIDirectLoadPartitionTable *> &table_array,
|
||||
const ObIArray<ObColDesc> &col_descs,
|
||||
@ -393,16 +441,10 @@ int ObDirectLoadTabletMergeCtx::build_pk_table_merge_task(
|
||||
int64_t max_parallel_degree)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// split range
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < table_array.count(); ++i) {
|
||||
ObDirectLoadSSTable *sstable = nullptr;
|
||||
if (OB_ISNULL(sstable = dynamic_cast<ObDirectLoadSSTable *>(table_array.at(i)))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected table", KR(ret), K(i), K(table_array));
|
||||
} else if (OB_FAIL(sstable_array_.push_back(sstable))) {
|
||||
LOG_WARN("fail to push back sstable", KR(ret));
|
||||
}
|
||||
if (OB_FAIL(init_sstable_array(table_array))) {
|
||||
LOG_WARN("fail to init sstable array", KR(ret));
|
||||
}
|
||||
// split range
|
||||
if (OB_SUCC(ret)) {
|
||||
ObDirectLoadMergeRangeSplitter range_splitter;
|
||||
if (OB_FAIL(
|
||||
@ -444,16 +486,10 @@ int ObDirectLoadTabletMergeCtx::build_pk_table_multiple_merge_task(
|
||||
int64_t max_parallel_degree)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// split range
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < table_array.count(); ++i) {
|
||||
ObDirectLoadMultipleSSTable *sstable = nullptr;
|
||||
if (OB_ISNULL(sstable = dynamic_cast<ObDirectLoadMultipleSSTable *>(table_array.at(i)))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected table", KR(ret), K(i), K(table_array));
|
||||
} else if (OB_FAIL(multiple_sstable_array_.push_back(sstable))) {
|
||||
LOG_WARN("fail to push back sstable", KR(ret));
|
||||
}
|
||||
if (OB_FAIL(init_multiple_sstable_array(table_array))) {
|
||||
LOG_WARN("fail to init multiple sstable array", KR(ret));
|
||||
}
|
||||
// split range
|
||||
if (OB_SUCC(ret)) {
|
||||
ObDirectLoadMultipleMergeTabletRangeSplitter range_splitter;
|
||||
if (OB_FAIL(range_splitter.init(tablet_id_, &origin_table_, multiple_sstable_array_,
|
||||
@ -617,6 +653,9 @@ int ObDirectLoadTabletMergeCtx::build_heap_table_multiple_merge_task(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t parallel_idx = 0;
|
||||
if (OB_FAIL(init_multiple_heap_table_array(table_array))) {
|
||||
LOG_WARN("fail to init multiple heap table array", KR(ret));
|
||||
}
|
||||
// for existing data, construct task by split range
|
||||
if (OB_SUCC(ret)) {
|
||||
ObDirectLoadMergeRangeSplitter range_splitter;
|
||||
@ -651,16 +690,12 @@ int ObDirectLoadTabletMergeCtx::build_heap_table_multiple_merge_task(
|
||||
}
|
||||
}
|
||||
// for imported data, construct task by multiple heap table
|
||||
for (int64_t i = 0; OB_SUCC(ret) && !param_.is_fast_heap_table_ && i < table_array.count(); ++i) {
|
||||
ObDirectLoadMultipleHeapTable *heap_table = nullptr;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && !param_.is_fast_heap_table_ && i < multiple_heap_table_array_.count(); ++i) {
|
||||
ObDirectLoadMultipleHeapTable *heap_table = multiple_heap_table_array_.at(i);
|
||||
ObDirectLoadPartitionHeapTableMultipleMergeTask *merge_task = nullptr;
|
||||
int64_t row_count = 0;
|
||||
ObTabletCacheInterval pk_interval;
|
||||
if (OB_ISNULL(heap_table = dynamic_cast<ObDirectLoadMultipleHeapTable *>(table_array.at(i)))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected table", KR(ret), K(i), K(table_array));
|
||||
} else if (OB_FAIL(heap_table->get_tablet_row_count(tablet_id_, param_.table_data_desc_,
|
||||
row_count))) {
|
||||
if (OB_FAIL(heap_table->get_tablet_row_count(tablet_id_, param_.table_data_desc_, row_count))) {
|
||||
LOG_WARN("fail to get tablet row count", KR(ret), K(tablet_id_));
|
||||
} else if (0 == row_count) {
|
||||
// ignore
|
||||
@ -686,6 +721,50 @@ int ObDirectLoadTabletMergeCtx::build_heap_table_multiple_merge_task(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDirectLoadTabletMergeCtx::build_aggregate_merge_task_for_multiple_heap_table(
|
||||
const ObIArray<ObIDirectLoadPartitionTable *> &table_array)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t total_row_count = 0;
|
||||
ObTabletCacheInterval pk_interval;
|
||||
ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask *merge_task = nullptr;
|
||||
if (OB_FAIL(init_multiple_heap_table_array(table_array))) {
|
||||
LOG_WARN("fail to init multiple heap table array", KR(ret));
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < multiple_heap_table_array_.count(); ++i) {
|
||||
ObDirectLoadMultipleHeapTable *heap_table = multiple_heap_table_array_.at(i);
|
||||
int64_t row_count = 0;
|
||||
if (OB_FAIL(heap_table->get_tablet_row_count(tablet_id_, param_.table_data_desc_, row_count))) {
|
||||
LOG_WARN("fail to get tablet row count", KR(ret), K(tablet_id_));
|
||||
} else {
|
||||
total_row_count += row_count;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (total_row_count > 0 && OB_FAIL(get_autoincrement_value(total_row_count, pk_interval))) {
|
||||
LOG_WARN("fail to get autoincrement value", KR(ret), K(total_row_count));
|
||||
} else if (OB_ISNULL(merge_task =
|
||||
OB_NEWx(ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask,
|
||||
(&allocator_)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to new ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask", KR(ret));
|
||||
} else if (OB_FAIL(merge_task->init(param_, this, &origin_table_, multiple_heap_table_array_,
|
||||
pk_interval))) {
|
||||
LOG_WARN("fail to init merge task", KR(ret));
|
||||
} else if (OB_FAIL(task_array_.push_back(merge_task))) {
|
||||
LOG_WARN("fail to push back merge task", KR(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
if (nullptr != merge_task) {
|
||||
merge_task->~ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask();
|
||||
allocator_.free(merge_task);
|
||||
merge_task = nullptr;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDirectLoadTabletMergeCtx::get_autoincrement_value(uint64_t count,
|
||||
ObTabletCacheInterval &interval)
|
||||
{
|
||||
|
||||
@ -34,6 +34,7 @@ class ObDirectLoadTabletMergeCtx;
|
||||
class ObIDirectLoadPartitionTable;
|
||||
class ObDirectLoadSSTable;
|
||||
class ObDirectLoadMultipleSSTable;
|
||||
class ObDirectLoadMultipleHeapTable;
|
||||
class ObDirectLoadMultipleMergeRangeSplitter;
|
||||
|
||||
struct ObDirectLoadMergeParam
|
||||
@ -99,6 +100,8 @@ public:
|
||||
const common::ObIArray<ObDirectLoadMultipleSSTable *> &multiple_sstable_array,
|
||||
ObDirectLoadMultipleMergeRangeSplitter &range_splitter,
|
||||
int64_t max_parallel_degree);
|
||||
int build_aggregate_merge_task_for_multiple_heap_table(
|
||||
const common::ObIArray<ObIDirectLoadPartitionTable *> &table_array);
|
||||
int inc_finish_count(bool &is_ready);
|
||||
int collect_sql_statistics(
|
||||
const common::ObIArray<ObDirectLoadFastHeapTable *> &fast_heap_table_array, table::ObTableLoadSqlStatistics &sql_statistics);
|
||||
@ -111,6 +114,11 @@ public:
|
||||
}
|
||||
TO_STRING_KV(K_(param), K_(target_partition_id), K_(tablet_id), K_(target_tablet_id));
|
||||
private:
|
||||
int init_sstable_array(const common::ObIArray<ObIDirectLoadPartitionTable *> &table_array);
|
||||
int init_multiple_sstable_array(
|
||||
const common::ObIArray<ObIDirectLoadPartitionTable *> &table_array);
|
||||
int init_multiple_heap_table_array(
|
||||
const common::ObIArray<ObIDirectLoadPartitionTable *> &table_array);
|
||||
int build_empty_data_merge_task(const common::ObIArray<share::schema::ObColDesc> &col_descs,
|
||||
int64_t max_parallel_degree);
|
||||
int build_pk_table_merge_task(const common::ObIArray<ObIDirectLoadPartitionTable *> &table_array,
|
||||
@ -138,6 +146,7 @@ private:
|
||||
ObDirectLoadOriginTable origin_table_;
|
||||
common::ObSEArray<ObDirectLoadSSTable *, 64> sstable_array_;
|
||||
common::ObSEArray<ObDirectLoadMultipleSSTable *, 64> multiple_sstable_array_;
|
||||
common::ObSEArray<ObDirectLoadMultipleHeapTable *, 64> multiple_heap_table_array_;
|
||||
common::ObSEArray<blocksstable::ObDatumRange, 64> range_array_;
|
||||
common::ObSEArray<ObDirectLoadPartitionMergeTask *, 64> task_array_;
|
||||
int64_t task_finish_count_ CACHE_ALIGNED;
|
||||
|
||||
@ -5,13 +5,14 @@
|
||||
#define USING_LOG_PREFIX STORAGE
|
||||
|
||||
#include "storage/direct_load/ob_direct_load_partition_merge_task.h"
|
||||
#include "share/stat/ob_opt_column_stat.h"
|
||||
#include "share/stat/ob_stat_define.h"
|
||||
#include "storage/ddl/ob_direct_insert_sstable_ctx.h"
|
||||
#include "storage/direct_load/ob_direct_load_external_table.h"
|
||||
#include "storage/direct_load/ob_direct_load_insert_table_ctx.h"
|
||||
#include "storage/direct_load/ob_direct_load_multiple_heap_table.h"
|
||||
#include "storage/direct_load/ob_direct_load_merge_ctx.h"
|
||||
#include "share/stat/ob_opt_column_stat.h"
|
||||
#include "share/stat/ob_stat_define.h"
|
||||
#include "storage/direct_load/ob_direct_load_multiple_heap_table.h"
|
||||
#include "storage/direct_load/ob_direct_load_origin_table.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -822,5 +823,233 @@ int ObDirectLoadPartitionHeapTableMultipleMergeTask::construct_row_iter(
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask
|
||||
*/
|
||||
|
||||
ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::RowIterator()
|
||||
: origin_iter_(nullptr),
|
||||
rowkey_column_num_(0),
|
||||
store_column_count_(0),
|
||||
heap_table_array_(nullptr),
|
||||
pos_(0),
|
||||
deserialize_datums_(nullptr),
|
||||
deserialize_datum_cnt_(0),
|
||||
result_info_(nullptr),
|
||||
is_inited_(false)
|
||||
{
|
||||
}
|
||||
|
||||
ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::~RowIterator()
|
||||
{
|
||||
}
|
||||
|
||||
int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::init(
|
||||
const ObDirectLoadMergeParam &merge_param, const ObTabletID &tablet_id,
|
||||
ObDirectLoadOriginTable *origin_table,
|
||||
const ObIArray<ObDirectLoadMultipleHeapTable *> *heap_table_array,
|
||||
const ObTabletCacheInterval &pk_interval)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("ObDirectLoadPartitionHeapTableMultipleMergeTask::RowIterator init twice", KR(ret),
|
||||
KP(this));
|
||||
} else if (OB_UNLIKELY(!merge_param.is_valid() || !tablet_id.is_valid() ||
|
||||
nullptr == origin_table || nullptr == heap_table_array)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), K(merge_param), K(tablet_id), KP(origin_table),
|
||||
KP(heap_table_array));
|
||||
} else {
|
||||
range_.set_whole_range();
|
||||
if (OB_FAIL(origin_table->scan(range_, allocator_, origin_iter_))) {
|
||||
LOG_WARN("fail to scan origin table", KR(ret));
|
||||
}
|
||||
// init datum_row_
|
||||
else if (OB_FAIL(datum_row_.init(merge_param.store_column_count_ +
|
||||
ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt()))) {
|
||||
LOG_WARN("fail to init datum row", KR(ret));
|
||||
} else {
|
||||
datum_row_.row_flag_.set_flag(ObDmlFlag::DF_INSERT);
|
||||
datum_row_.mvcc_row_flag_.set_last_multi_version_row(true);
|
||||
datum_row_.storage_datums_[merge_param.rowkey_column_num_].set_int(
|
||||
-merge_param.snapshot_version_); // fill trans_version
|
||||
datum_row_.storage_datums_[merge_param.rowkey_column_num_ + 1].set_int(0); // fill sql_no
|
||||
deserialize_datums_ = datum_row_.storage_datums_ + merge_param.rowkey_column_num_ +
|
||||
ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
|
||||
deserialize_datum_cnt_ = merge_param.store_column_count_ - merge_param.rowkey_column_num_;
|
||||
rowkey_column_num_ = merge_param.rowkey_column_num_;
|
||||
store_column_count_ = merge_param.store_column_count_;
|
||||
tablet_id_ = tablet_id;
|
||||
table_data_desc_ = merge_param.table_data_desc_;
|
||||
heap_table_array_ = heap_table_array;
|
||||
pk_interval_ = pk_interval;
|
||||
result_info_ = merge_param.result_info_;
|
||||
is_inited_ = true;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::get_next_row(
|
||||
const ObDatumRow *&result_row)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
result_row = nullptr;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator not init",
|
||||
KR(ret), KP(this));
|
||||
} else {
|
||||
if (pos_ == 0) {
|
||||
const ObDatumRow *datum_row = nullptr;
|
||||
if (OB_FAIL(origin_iter_->get_next_row(datum_row))) {
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
LOG_WARN("fail to get next row", KR(ret));
|
||||
} else {
|
||||
// switch heap table
|
||||
ret = OB_SUCCESS;
|
||||
if (OB_FAIL(switch_next_heap_table())) {
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
LOG_WARN("fail to switch next heap table", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (OB_UNLIKELY(datum_row->count_ != store_column_count_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected column count", KR(ret), K(store_column_count_), KPC(datum_row));
|
||||
} else {
|
||||
// copy rowkey columns
|
||||
for (int64_t i = 0; i < rowkey_column_num_; ++i) {
|
||||
datum_row_.storage_datums_[i] = datum_row->storage_datums_[i];
|
||||
}
|
||||
// copy normal columns
|
||||
for (int64_t
|
||||
i = rowkey_column_num_,
|
||||
j = rowkey_column_num_ + ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
|
||||
i < datum_row->count_; ++i, ++j) {
|
||||
datum_row_.storage_datums_[j] = datum_row->storage_datums_[i];
|
||||
}
|
||||
result_row = &datum_row_;
|
||||
}
|
||||
}
|
||||
while (OB_SUCC(ret) && result_row == nullptr) {
|
||||
const ObDirectLoadMultipleExternalRow *external_row = nullptr;
|
||||
uint64_t pk_seq = OB_INVALID_ID;
|
||||
if (OB_FAIL(scanner_.get_next_row(external_row))) {
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
LOG_WARN("fail to get next row", KR(ret));
|
||||
} else {
|
||||
// switch next heap table
|
||||
ret = OB_SUCCESS;
|
||||
if (OB_FAIL(switch_next_heap_table())) {
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
LOG_WARN("fail to switch next heap table", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (OB_FAIL(external_row->to_datums(deserialize_datums_, deserialize_datum_cnt_))) {
|
||||
LOG_WARN("fail to transfer datum row", KR(ret));
|
||||
} else if (OB_FAIL(pk_interval_.next_value(pk_seq))) {
|
||||
LOG_WARN("fail to get next pk seq", KR(ret));
|
||||
} else {
|
||||
// fill hide pk
|
||||
datum_row_.storage_datums_[0].set_int(pk_seq);
|
||||
result_row = &datum_row_;
|
||||
ATOMIC_INC(&result_info_->rows_affected_);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::switch_next_heap_table()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (pos_ >= heap_table_array_->count()) {
|
||||
ret = OB_ITER_END;
|
||||
} else {
|
||||
ObDirectLoadMultipleHeapTable *heap_table = heap_table_array_->at(pos_);
|
||||
// restructure scanner
|
||||
scanner_.~ObDirectLoadMultipleHeapTableTabletWholeScanner();
|
||||
new (&scanner_) ObDirectLoadMultipleHeapTableTabletWholeScanner();
|
||||
if (OB_FAIL(scanner_.init(heap_table, tablet_id_, table_data_desc_))) {
|
||||
LOG_WARN("fail to init scanner", KR(ret));
|
||||
} else {
|
||||
++pos_;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::
|
||||
ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask()
|
||||
: origin_table_(nullptr), heap_table_array_(nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::
|
||||
~ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask()
|
||||
{
|
||||
}
|
||||
|
||||
int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::init(
|
||||
const ObDirectLoadMergeParam &merge_param, ObDirectLoadTabletMergeCtx *merge_ctx,
|
||||
ObDirectLoadOriginTable *origin_table,
|
||||
const ObIArray<ObDirectLoadMultipleHeapTable *> &heap_table_array,
|
||||
const ObTabletCacheInterval &pk_interval)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("ObDirectLoadPartitionHeapTableMultipleMergeTask init twice", KR(ret), KP(this));
|
||||
} else if (OB_UNLIKELY(!merge_param.is_valid() || nullptr == merge_ctx ||
|
||||
nullptr == origin_table || heap_table_array.empty())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), K(merge_param), KP(merge_ctx), KP(origin_table),
|
||||
K(heap_table_array));
|
||||
} else {
|
||||
merge_param_ = &merge_param;
|
||||
merge_ctx_ = merge_ctx;
|
||||
parallel_idx_ = 0;
|
||||
origin_table_ = origin_table;
|
||||
heap_table_array_ = &heap_table_array;
|
||||
pk_interval_ = pk_interval;
|
||||
is_inited_ = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::construct_row_iter(
|
||||
ObIAllocator &allocator, ObIStoreRowIterator *&result_row_iter)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
result_row_iter = nullptr;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask not init", KR(ret),
|
||||
KP(this));
|
||||
} else {
|
||||
RowIterator *row_iter = nullptr;
|
||||
if (OB_ISNULL(row_iter = OB_NEWx(RowIterator, (&allocator)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to new RowIterator", KR(ret));
|
||||
} else if (OB_FAIL(row_iter->init(*merge_param_, merge_ctx_->get_tablet_id(), origin_table_,
|
||||
heap_table_array_, pk_interval_))) {
|
||||
LOG_WARN("fail to init row iter", KR(ret));
|
||||
} else {
|
||||
result_row_iter = row_iter;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
if (nullptr != row_iter) {
|
||||
row_iter->~RowIterator();
|
||||
allocator.free(row_iter);
|
||||
row_iter = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace storage
|
||||
} // namespace oceanbase
|
||||
|
||||
@ -206,5 +206,56 @@ private:
|
||||
share::ObTabletCacheInterval pk_interval_;
|
||||
};
|
||||
|
||||
class ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask
|
||||
: public ObDirectLoadPartitionMergeTask
|
||||
{
|
||||
public:
|
||||
ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask();
|
||||
virtual ~ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask();
|
||||
int init(const ObDirectLoadMergeParam &merge_param, ObDirectLoadTabletMergeCtx *merge_ctx,
|
||||
ObDirectLoadOriginTable *origin_table,
|
||||
const common::ObIArray<ObDirectLoadMultipleHeapTable *> &heap_table_array,
|
||||
const share::ObTabletCacheInterval &pk_interval);
|
||||
protected:
|
||||
int construct_row_iter(common::ObIAllocator &allocator, ObIStoreRowIterator *&row_iter) override;
|
||||
private:
|
||||
class RowIterator : public ObIStoreRowIterator
|
||||
{
|
||||
public:
|
||||
RowIterator();
|
||||
virtual ~RowIterator();
|
||||
int init(const ObDirectLoadMergeParam &merge_param, const common::ObTabletID &tablet_id,
|
||||
ObDirectLoadOriginTable *origin_table,
|
||||
const common::ObIArray<ObDirectLoadMultipleHeapTable *> *heap_table_array,
|
||||
const share::ObTabletCacheInterval &pk_interval);
|
||||
int get_next_row(const blocksstable::ObDatumRow *&datum_row) override;
|
||||
private:
|
||||
int switch_next_heap_table();
|
||||
private:
|
||||
// for iter origin table
|
||||
common::ObArenaAllocator allocator_;
|
||||
blocksstable::ObDatumRange range_;
|
||||
ObIStoreRowIterator *origin_iter_;
|
||||
int64_t rowkey_column_num_;
|
||||
int64_t store_column_count_;
|
||||
// for iter multiple heap table
|
||||
common::ObTabletID tablet_id_;
|
||||
ObDirectLoadTableDataDesc table_data_desc_;
|
||||
const common::ObIArray<ObDirectLoadMultipleHeapTable *> *heap_table_array_;
|
||||
int64_t pos_;
|
||||
ObDirectLoadMultipleHeapTableTabletWholeScanner scanner_;
|
||||
blocksstable::ObDatumRow datum_row_;
|
||||
blocksstable::ObStorageDatum *deserialize_datums_;
|
||||
int64_t deserialize_datum_cnt_;
|
||||
share::ObTabletCacheInterval pk_interval_;
|
||||
table::ObTableLoadResultInfo *result_info_;
|
||||
bool is_inited_;
|
||||
};
|
||||
private:
|
||||
ObDirectLoadOriginTable *origin_table_;
|
||||
const common::ObIArray<ObDirectLoadMultipleHeapTable *> *heap_table_array_;
|
||||
share::ObTabletCacheInterval pk_interval_;
|
||||
};
|
||||
|
||||
} // namespace storage
|
||||
} // namespace oceanbase
|
||||
|
||||
Reference in New Issue
Block a user