support strict replace

This commit is contained in:
yongshige 2023-08-08 05:12:15 +00:00 committed by ob-robot
parent 51bb12a3f6
commit a1fa978f84
45 changed files with 559 additions and 129 deletions

View File

@ -447,7 +447,10 @@ int ObTableDirectLoadInsertExecutor::process()
} else {
ObTableLoadCoordinator coordinator(table_ctx);
ObTableLoadTransId trans_id;
if (OB_FAIL(client_task->get_next_trans_id(trans_id))) {
int64_t batch_id = client_task->get_next_batch_id();
if (OB_FAIL(set_batch_seq_no(batch_id, obj_rows))) {
LOG_WARN("fail to set batch seq no", KR(ret));
} else if (OB_FAIL(client_task->get_next_trans_id(trans_id))) {
LOG_WARN("fail to get next trans id", KR(ret));
} else if (OB_FAIL(coordinator.init())) {
LOG_WARN("fail to init coordinator", KR(ret));
@ -497,5 +500,26 @@ int ObTableDirectLoadInsertExecutor::decode_payload(const ObString &payload,
return ret;
}
int ObTableDirectLoadInsertExecutor::set_batch_seq_no(int64_t batch_id,
ObTableLoadObjRowArray &obj_row_array)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(obj_row_array.empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(obj_row_array));
} else if (OB_UNLIKELY(batch_id > ObTableLoadSequenceNo::MAX_BATCH_ID ||
obj_row_array.count() > ObTableLoadSequenceNo::MAX_BATCH_SEQ_NO)) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("size is overflow", KR(ret), K(batch_id), K(obj_row_array.count()));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < obj_row_array.count(); ++i) {
ObTableLoadObjRow &row = obj_row_array.at(i);
row.seq_no_.batch_id_ = batch_id;
row.seq_no_.batch_seq_no_ = i;
}
}
return ret;
}
} // namespace observer
} // namespace oceanbase

View File

@ -158,6 +158,7 @@ protected:
private:
static int decode_payload(const common::ObString &payload,
table::ObTableLoadObjRowArray &obj_row_array);
int set_batch_seq_no(int64_t batch_id, table::ObTableLoadObjRowArray &obj_row_array);
};
} // namespace observer

View File

@ -37,6 +37,7 @@ ObTableLoadClientTask::ObTableLoadClientTask()
exec_ctx_(nullptr),
task_scheduler_(nullptr),
next_trans_idx_(0),
next_batch_id_(0),
table_ctx_(nullptr),
client_status_(ObTableLoadClientStatus::MAX_STATUS),
error_code_(OB_SUCCESS),

View File

@ -43,6 +43,7 @@ public:
OB_INLINE ObTableLoadClientExecCtx *get_exec_ctx() { return exec_ctx_; }
int add_trans_id(const table::ObTableLoadTransId &trans_id);
int get_next_trans_id(table::ObTableLoadTransId &trans_id);
int64_t get_next_batch_id() { return ATOMIC_FAA(&next_batch_id_, 1); }
OB_INLINE const common::ObIArray<table::ObTableLoadTransId> &get_trans_ids() const
{
return trans_ids_;
@ -82,6 +83,7 @@ private:
ObITableLoadTaskScheduler *task_scheduler_;
common::ObArray<table::ObTableLoadTransId> trans_ids_;
int64_t next_trans_idx_;
int64_t next_batch_id_ CACHE_ALIGNED;
mutable obsys::ObRWLock rw_lock_;
ObTableLoadTableCtx *table_ctx_;
table::ObTableLoadClientStatus client_status_;

View File

@ -1260,26 +1260,15 @@ public:
int set_objs(const ObTableLoadObjRowArray &obj_rows, const ObIArray<int64_t> &idx_array)
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && (i < obj_rows.count()); ++i) {
const ObTableLoadObjRow &src_obj_row = obj_rows.at(i);
ObTableLoadObjRow out_obj_row;
if (OB_FAIL(out_obj_row.init(src_obj_row.count_, src_obj_row.allocator_handle_))) {
LOG_WARN("failed to init out_obj_row", KR(ret), K(src_obj_row.count_));
} else {
for (int64_t j = 0; OB_SUCC(ret) && (j < src_obj_row.count_); ++j) {
out_obj_row.cells_[j] = src_obj_row.cells_[idx_array.at(j)];
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(obj_rows_.push_back(out_obj_row))) {
LOG_WARN("failed to add row to obj_rows_", KR(ret), K(out_obj_row));
}
if (src_obj_row.project(idx_array, out_obj_row)) {
LOG_WARN("failed to projecte out_obj_row", KR(ret), K(src_obj_row.count_));
} else if (OB_FAIL(obj_rows_.push_back(out_obj_row))) {
LOG_WARN("failed to add row to obj_rows_", KR(ret), K(out_obj_row));
}
}
return ret;
}
int process() override

View File

@ -26,6 +26,7 @@ using namespace common::hash;
using namespace storage;
using namespace table;
using namespace blocksstable;
using namespace sql;
class ObTableLoadMemCompactor::SampleTaskProcessor : public ObITableLoadTaskProcessor
{
@ -276,6 +277,7 @@ int ObTableLoadMemCompactor::inner_init()
mem_ctx_.column_count_ = param_->column_count_;
mem_ctx_.dml_row_handler_ = store_ctx_->error_row_handler_;
mem_ctx_.file_mgr_ = store_ctx_->tmp_file_mgr_;
mem_ctx_.dup_action_ = param_->dup_action_;
}
if (OB_SUCC(ret)) {
if (OB_FAIL(mem_ctx_.init())) {

View File

@ -321,6 +321,7 @@ int ObTableLoadMultipleHeapTableCompactor::inner_init()
mem_ctx_.column_count_ = param_->column_count_;
mem_ctx_.dml_row_handler_ = store_ctx_->error_row_handler_;
mem_ctx_.file_mgr_ = store_ctx_->tmp_file_mgr_;
mem_ctx_.dup_action_ = param_->dup_action_;
if (OB_SUCC(ret)) {
if (OB_FAIL(mem_ctx_.init())) {

View File

@ -294,7 +294,7 @@ int ObTableLoadTransStoreWriter::write(int32_t session_id,
} else {
ret = OB_SUCCESS;
}
} else if (OB_FAIL(write_row_to_table_store(session_ctx.table_store_, row.tablet_id_, session_ctx.datum_row_))) {
} else if (OB_FAIL(write_row_to_table_store(session_ctx.table_store_, row.tablet_id_, row.obj_row_.seq_no_, session_ctx.datum_row_))) {
LOG_WARN("fail to write row", KR(ret), K(session_id), K(row.tablet_id_), K(i));
}
}
@ -319,6 +319,7 @@ int ObTableLoadTransStoreWriter::write(int32_t session_id,
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(session_id), K(row_array.empty()));
} else {
ObTableLoadSequenceNo seq_no(0); // pdml导入的行目前不存在主键冲突,先都用一个默认的seq_no
SessionContext &session_ctx = session_ctx_array_[session_id - 1];
for (int64_t i = 0; OB_SUCC(ret) && i < row_array.count(); ++i) {
const ObNewRow &row = row_array.at(i);
@ -329,7 +330,7 @@ int ObTableLoadTransStoreWriter::write(int32_t session_id,
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(write_row_to_table_store(session_ctx.table_store_, tablet_id, session_ctx.datum_row_))) {
if (OB_FAIL(write_row_to_table_store(session_ctx.table_store_, tablet_id, seq_no, session_ctx.datum_row_))) {
LOG_WARN("fail to write row", KR(ret), K(session_id), K(tablet_id));
}
}
@ -463,10 +464,11 @@ int ObTableLoadTransStoreWriter::handle_identity_column(const ObColumnSchemaV2 *
int ObTableLoadTransStoreWriter::write_row_to_table_store(ObDirectLoadTableStore &table_store,
const ObTabletID &tablet_id,
const ObTableLoadSequenceNo &seq_no,
const ObDatumRow &datum_row)
{
int ret = OB_SUCCESS;
if (OB_FAIL(table_store.append_row(tablet_id, datum_row))) {
if (OB_FAIL(table_store.append_row(tablet_id, seq_no, datum_row))) {
LOG_WARN("fail to append row", KR(ret), K(datum_row));
}
if (OB_FAIL(ret)) {

View File

@ -83,6 +83,7 @@ private:
common::ObArenaAllocator &cast_allocator);
int write_row_to_table_store(storage::ObDirectLoadTableStore &table_store,
const common::ObTabletID &tablet_id,
const table::ObTableLoadSequenceNo &seq_no,
const blocksstable::ObDatumRow &datum_row);
private:
ObTableLoadTransStore *const trans_store_;

View File

@ -40,5 +40,7 @@ OB_SERIALIZE_MEMBER_SIMPLE(ObTableLoadResultInfo,
skipped_,
warnings_);
OB_SERIALIZE_MEMBER_SIMPLE(ObTableLoadSequenceNo, sequence_no_);
} // namespace table
} // namespace oceanbase

View File

@ -428,5 +428,66 @@ public:
uint64_t warnings_ CACHE_ALIGNED;
};
struct ObTableLoadSequenceNo
{
OB_UNIS_VERSION(1);
public:
static const uint64_t MAX_DATA_ID = (1LL << 16) - 1;
static const uint64_t MAX_CHUNK_ID = (1LL << 32) - 1;
static const uint64_t MAX_BATCH_ID = (1LL << 48) - 1;
static const uint64_t MAX_DATA_SEQ_NO = (1LL << 48) - 1;
static const uint64_t MAX_CHUNK_SEQ_NO = (1LL << 32) - 1;
static const uint64_t MAX_BATCH_SEQ_NO = (1LL << 16) - 1;
union {
// multi file
struct {
uint64_t data_id_ : 16;
uint64_t data_seq_no_ : 48;
};
// single file
struct {
uint64_t chunk_id_ : 32;
uint64_t chunk_seq_no_ : 32;
};
// client
struct {
uint64_t batch_id_ : 48;
uint64_t batch_seq_no_ : 16;
};
uint64_t sequence_no_;
};
ObTableLoadSequenceNo() : sequence_no_(OB_INVALID_ID) {}
ObTableLoadSequenceNo(uint64_t sequence_no) { sequence_no_ = sequence_no; }
void reset() { sequence_no_ = OB_INVALID_ID; }
bool is_valid () const {return sequence_no_ != OB_INVALID_ID; }
bool operator == (const ObTableLoadSequenceNo &other) const { return sequence_no_ == other.sequence_no_; }
bool operator < (const ObTableLoadSequenceNo &other) const { return sequence_no_ < other.sequence_no_; }
bool operator > (const ObTableLoadSequenceNo &other) const { return sequence_no_ > other.sequence_no_; }
ObTableLoadSequenceNo &operator=(const ObTableLoadSequenceNo &other) { sequence_no_ = other.sequence_no_; return *this; }
ObTableLoadSequenceNo &operator++()
{
sequence_no_++;
return *this;
}
ObTableLoadSequenceNo operator++(int)
{
ObTableLoadSequenceNo tmp = *this;
sequence_no_++;
return tmp;
}
ObTableLoadSequenceNo &operator--()
{
sequence_no_--;
return *this;
}
ObTableLoadSequenceNo operator--(int)
{
ObTableLoadSequenceNo tmp = *this;
sequence_no_--;
return tmp;
}
TO_STRING_KV(K_(sequence_no), K_(data_id), K_(data_seq_no), K_(chunk_id), K_(chunk_seq_no), K_(batch_id), K_(batch_seq_no));
};
} // namespace table
} // namespace oceanbase

View File

@ -11,6 +11,7 @@
#include "lib/utility/ob_print_utils.h"
#include "common/object/ob_object.h"
#include "common/ob_tablet_id.h"
#include "ob_table_load_define.h"
namespace oceanbase
{
@ -25,8 +26,8 @@ public:
virtual ~ObTableLoadRow() {}
void reset();
int init(int64_t count, const ObTableLoadSharedAllocatorHandle &allocator_handle);
int deep_copy_and_assign(const T *row, int64_t count,
const ObTableLoadSharedAllocatorHandle &allocator_handle);
int project(const ObIArray<int64_t> &idx_projector, ObTableLoadRow<T> &projected_row) const;
int deep_copy(const ObTableLoadRow<T> &other, const ObTableLoadSharedAllocatorHandle &allocator_handle);
// for deserialize()
void set_allocator(const ObTableLoadSharedAllocatorHandle &allocator_handle)
{
@ -36,6 +37,10 @@ public:
{
return allocator_handle_;
}
ObTableLoadSequenceNo& get_sequence_no()
{
return seq_no_;
}
TO_STRING_KV(K_(count));
private:
@ -44,6 +49,7 @@ private:
public:
ObTableLoadSharedAllocatorHandle allocator_handle_;
ObTableLoadSequenceNo seq_no_;
T *cells_;
int64_t count_;
};
@ -52,6 +58,7 @@ template<class T>
void ObTableLoadRow<T>::reset()
{
allocator_handle_.reset();
seq_no_.reset();
cells_ = nullptr;
count_ = 0;
}
@ -101,7 +108,7 @@ int ObTableLoadRow<T>::allocate_cells(T *&cells, int64_t count,
}
template<class T>
int ObTableLoadRow<T>::deep_copy_and_assign(const T *row, int64_t count,
int ObTableLoadRow<T>::deep_copy(const ObTableLoadRow<T> &other,
const ObTableLoadSharedAllocatorHandle &allocator_handle)
{
@ -109,21 +116,36 @@ int ObTableLoadRow<T>::deep_copy_and_assign(const T *row, int64_t count,
T *cells = nullptr;
reset();
if (OB_FAIL(allocate_cells(cells, count, allocator_handle))) {
OB_LOG(WARN, "failed to allocate cells", KR(ret), K(count));
if (OB_FAIL(allocate_cells(cells, other.count_, allocator_handle))) {
OB_LOG(WARN, "failed to allocate cells", KR(ret), K(other.count_));
}
for (int64_t i = 0; OB_SUCC(ret) && i < count; i ++) {
if (OB_FAIL(observer::ObTableLoadUtils::deep_copy(row[i],
for (int64_t i = 0; OB_SUCC(ret) && i < other.count_; i ++) {
if (OB_FAIL(observer::ObTableLoadUtils::deep_copy(other.cells_[i],
cells[i], *allocator_handle))) {
OB_LOG(WARN, "fail to deep copy object", KR(ret));
}
}
if (OB_SUCC(ret)) {
allocator_handle_ = allocator_handle;
seq_no_ = other.seq_no_;
cells_ = cells;
count_ = count;
count_ = other.count_;
}
return ret;
}
template<class T>
int ObTableLoadRow<T>::project(const ObIArray<int64_t> &idx_projector, ObTableLoadRow<T> &projected_row) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(projected_row.init(count_, allocator_handle_))) {
OB_LOG(WARN, "failed to alloate cells", KR(ret), K(projected_row.count_));
} else {
for (int64_t j = 0; j < count_; ++j) {
projected_row.cells_[j] = cells_[idx_projector.at(j)];
}
projected_row.seq_no_ = seq_no_;
}
return ret;
}
@ -131,6 +153,7 @@ template<class T>
int ObTableLoadRow<T>::serialize(SERIAL_PARAMS) const
{
int ret = OB_SUCCESS;
OB_UNIS_ENCODE(seq_no_);
OB_UNIS_ENCODE_ARRAY(cells_, count_);
return ret;
}
@ -140,6 +163,7 @@ int ObTableLoadRow<T>::deserialize(DESERIAL_PARAMS)
{
int ret = OB_SUCCESS;
int64_t count = 0;
OB_UNIS_DECODE(seq_no_);
OB_UNIS_DECODE(count);
if (OB_SUCC(ret) && (count > 0)) {
T *cells = nullptr;
@ -162,6 +186,7 @@ template<class T>
int64_t ObTableLoadRow<T>::get_serialize_size() const
{
int64_t len = 0;
OB_UNIS_ADD_LEN(seq_no_);
OB_UNIS_ADD_LEN_ARRAY(cells_, count_);
return len;
}
@ -178,7 +203,6 @@ public:
obj_row_.set_allocator(allocator_handle);
}
TO_STRING_KV(K_(tablet_id), K_(obj_row));
public:
common::ObTabletID tablet_id_;
ObTableLoadObjRow obj_row_;

View File

@ -485,12 +485,13 @@ int ObLoadDataDirectImpl::DataDescIterator::add_data_desc(const DataDesc &data_d
return ret;
}
int ObLoadDataDirectImpl::DataDescIterator::get_next_data_desc(DataDesc &data_desc)
int ObLoadDataDirectImpl::DataDescIterator::get_next_data_desc(DataDesc &data_desc, int64_t &pos)
{
int ret = OB_SUCCESS;
if (pos_ >= data_descs_.count()) {
ret = OB_ITER_END;
} else {
pos = pos_;
data_desc = data_descs_.at(pos_++);
}
return ret;
@ -1340,9 +1341,13 @@ int ObLoadDataDirectImpl::FileLoadExecutor::process_task_handle(TaskHandle *hand
//此时row中的每个obj的内容指向的是data parser中的内存
//因此得把它们深拷贝一遍
ObTableLoadObjRow tmp_obj_row;
if (OB_FAIL(tmp_obj_row.deep_copy_and_assign(row.cells_, row.count_, allocator_handle))) {
tmp_obj_row.seq_no_= handle->get_next_seq_no();
tmp_obj_row.cells_ = row.cells_;
tmp_obj_row.count_ = row.count_;
ObTableLoadObjRow row;
if (OB_FAIL(row.deep_copy(tmp_obj_row, allocator_handle))) {
LOG_WARN("failed to deep copy add assign to tmp_obj_row", KR(ret));
} else if (OB_FAIL(obj_rows.push_back(tmp_obj_row))) {
} else if (OB_FAIL(obj_rows.push_back(row))) {
LOG_WARN("failed to add tmp_obj_row to obj_rows", KR(ret));
} else {
++processed_line_count;
@ -1422,6 +1427,9 @@ int ObLoadDataDirectImpl::LargeFileLoadTaskProcessor::process()
int64_t line_count = 0;
if (OB_FAIL(file_load_executor_->process_task_handle(handle_, line_count))) {
LOG_WARN("fail to process task handle", KR(ret));
} else if (OB_UNLIKELY(line_count > ObTableLoadSequenceNo::MAX_CHUNK_SEQ_NO)){
ret = OB_SIZE_OVERFLOW;
LOG_WARN("size is overflow", KR(ret), K(line_count));
}
return ret;
}
@ -1431,7 +1439,8 @@ int ObLoadDataDirectImpl::LargeFileLoadTaskProcessor::process()
*/
ObLoadDataDirectImpl::LargeFileLoadExecutor::LargeFileLoadExecutor()
: next_worker_idx_(0)
: next_worker_idx_(0),
next_chunk_id_(0)
{
}
@ -1452,6 +1461,7 @@ int ObLoadDataDirectImpl::LargeFileLoadExecutor::init(const LoadExecuteParam &ex
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(execute_param), K(execute_ctx), K(data_desc_iter));
} else {
int64_t data_id = 0;
DataDescIterator copy_data_desc_iter;
DataDesc data_desc;
if (OB_FAIL(inner_init(execute_param, execute_ctx, execute_param.thread_count_,
@ -1461,7 +1471,7 @@ int ObLoadDataDirectImpl::LargeFileLoadExecutor::init(const LoadExecuteParam &ex
// data_desc_
else if (OB_FAIL(copy_data_desc_iter.copy(data_desc_iter))) {
LOG_WARN("fail to copy data desc iter", KR(ret));
} else if (OB_FAIL(copy_data_desc_iter.get_next_data_desc(data_desc))) {
} else if (OB_FAIL(copy_data_desc_iter.get_next_data_desc(data_desc, data_id))) {
LOG_WARN("fail to get next data desc", KR(ret));
}
// expr_buffer_
@ -1493,8 +1503,12 @@ int ObLoadDataDirectImpl::LargeFileLoadExecutor::get_next_task_handle(TaskHandle
{
int ret = OB_SUCCESS;
int64_t current_line_count = 0;
const int64_t chunk_id = next_chunk_id_ ++;
expr_buffer_.reuse();
if (OB_FAIL(data_reader_.get_next_buffer(*expr_buffer_.file_buffer_, current_line_count))) {
if (OB_UNLIKELY(chunk_id > ObTableLoadSequenceNo::MAX_CHUNK_ID)) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("size is overflow", KR(ret), K(chunk_id));
} else if (OB_FAIL(data_reader_.get_next_buffer(*expr_buffer_.file_buffer_, current_line_count))) {
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("fail to get next buffer", KR(ret));
}
@ -1507,6 +1521,8 @@ int ObLoadDataDirectImpl::LargeFileLoadExecutor::get_next_task_handle(TaskHandle
handle->data_desc_ = data_desc_;
handle->start_line_no_ = total_line_count_ + 1;
handle->result_.created_ts_ = ObTimeUtil::current_time();
handle->sequence_no_.chunk_id_ = chunk_id;
handle->sequence_no_.chunk_seq_no_ = 0;
handle->data_buffer_.swap(expr_buffer_);
handle->data_buffer_.is_end_file_ = data_reader_.is_end_file();
}
@ -1601,6 +1617,7 @@ int ObLoadDataDirectImpl::MultiFilesLoadTaskProcessor::process()
int ret = OB_SUCCESS;
handle_->result_.start_process_ts_ = ObTimeUtil::current_time();
int64_t current_line_count = 0;
int64_t total_line_count = 0;
if (OB_FAIL(data_reader_.init(execute_param_->data_access_param_, *execute_ctx_,
handle_->data_desc_, true))) {
LOG_WARN("fail to init data reader", KR(ret));
@ -1617,6 +1634,12 @@ int ObLoadDataDirectImpl::MultiFilesLoadTaskProcessor::process()
current_line_count = 0;
if (OB_FAIL(file_load_executor_->process_task_handle(handle_, current_line_count))) {
LOG_WARN("fail to process task handle", KR(ret));
} else {
total_line_count += current_line_count;
if (OB_UNLIKELY(total_line_count > ObTableLoadSequenceNo::MAX_DATA_SEQ_NO)){
ret = OB_SIZE_OVERFLOW;
LOG_WARN("size is overflow", KR(ret), K(total_line_count));
}
}
}
}
@ -1640,6 +1663,12 @@ int ObLoadDataDirectImpl::MultiFilesLoadTaskProcessor::process()
ret = OB_NOT_SUPPORTED;
LOG_WARN("direct-load does not support big row", KR(ret), "size",
handle_->data_buffer_.get_data_length());
} else {
total_line_count += current_line_count;
if (OB_UNLIKELY(total_line_count > ObTableLoadSequenceNo::MAX_DATA_SEQ_NO)){
ret = OB_SIZE_OVERFLOW;
LOG_WARN("size is overflow", KR(ret), K(total_line_count));
}
}
}
}
@ -1741,10 +1770,14 @@ int ObLoadDataDirectImpl::MultiFilesLoadExecutor::get_next_task_handle(TaskHandl
{
int ret = OB_SUCCESS;
DataDesc data_desc;
if (OB_FAIL(data_desc_iter_.get_next_data_desc(data_desc))) {
int64_t data_id = 0;
if (OB_FAIL(data_desc_iter_.get_next_data_desc(data_desc, data_id))) {
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("fail to get next data desc", KR(ret));
}
} else if (OB_UNLIKELY(data_id > ObTableLoadSequenceNo::MAX_DATA_ID)) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("size is overflow", KR(ret), K(data_id));
} else if (OB_FAIL(fetch_task_handle(handle))) {
LOG_WARN("fail to fetch task handle", KR(ret));
} else {
@ -1752,6 +1785,8 @@ int ObLoadDataDirectImpl::MultiFilesLoadExecutor::get_next_task_handle(TaskHandl
handle->data_desc_ = data_desc;
handle->start_line_no_ = 0;
handle->result_.created_ts_ = ObTimeUtil::current_time();
handle->sequence_no_.data_id_ = data_id;
handle->sequence_no_.data_seq_no_ = 0;
}
return ret;
}

View File

@ -155,7 +155,7 @@ private:
int copy(const ObLoadFileIterator &file_iter);
int copy(const DataDescIterator &desc_iter);
int add_data_desc(const DataDesc &data_desc);
int get_next_data_desc(DataDesc &data_desc);
int get_next_data_desc(DataDesc &data_desc, int64_t &pos);
TO_STRING_KV(K_(data_descs), K_(pos));
private:
common::ObSEArray<DataDesc, 64> data_descs_;
@ -337,16 +337,20 @@ private:
struct TaskHandle
{
public:
TaskHandle()
: task_id_(common::OB_INVALID_ID), worker_idx_(-1), session_id_(0), start_line_no_(0)
{
}
table::ObTableLoadSequenceNo get_next_seq_no () { return sequence_no_ ++ ; }
public:
int64_t task_id_;
DataBuffer data_buffer_;
int64_t worker_idx_; // parse thread idx
int32_t session_id_; // table load session id
DataDesc data_desc_;
int64_t start_line_no_; // 从1开始
table::ObTableLoadSequenceNo sequence_no_;
TaskResult result_;
TO_STRING_KV(K_(task_id), K_(data_buffer), K_(worker_idx), K_(session_id), K_(data_desc),
K_(start_line_no), K_(result));
@ -429,6 +433,7 @@ private:
DataBuffer expr_buffer_;
DataReader data_reader_;
int64_t next_worker_idx_;
int64_t next_chunk_id_;
DISALLOW_COPY_AND_ASSIGN(LargeFileLoadExecutor);
};

View File

@ -28,8 +28,23 @@ int ObDirectLoadDatumRowkeyCompare::init(const ObStorageDatumUtils &datum_utils)
return ret;
}
bool ObDirectLoadDatumRowkeyCompare::operator()(const ObDatumRowkey *lhs,
const ObDatumRowkey *rhs)
int ObDirectLoadDatumRowkeyCompare::compare(const ObDatumRowkey *lhs, const ObDatumRowkey *rhs,
int &cmp_ret)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(datum_utils_) || OB_ISNULL(lhs) || OB_ISNULL(rhs) ||
OB_UNLIKELY(!lhs->is_valid() || !rhs->is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(datum_utils_), KP(lhs), KP(rhs));
} else {
if (OB_FAIL(lhs->compare(*rhs, *datum_utils_, cmp_ret))) {
LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(datum_utils_));
}
}
return ret;
}
bool ObDirectLoadDatumRowkeyCompare::operator()(const ObDatumRowkey *lhs, const ObDatumRowkey *rhs)
{
int ret = OB_SUCCESS;
int cmp_ret = 0;
@ -73,6 +88,33 @@ int ObDirectLoadDatumRowCompare::init(const ObStorageDatumUtils &datum_utils, in
return ret;
}
int ObDirectLoadDatumRowCompare::compare(const blocksstable::ObDatumRow *lhs,
const blocksstable::ObDatumRow *rhs, int &cmp_ret)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObDirectLoadDatumRowCompare not init", KR(ret), KP(this));
} else if (OB_ISNULL(lhs) || OB_ISNULL(rhs) ||
OB_UNLIKELY(!lhs->is_valid() || !rhs->is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(lhs), KP(rhs));
} else if (OB_UNLIKELY(lhs->get_column_count() < rowkey_size_ ||
rhs->get_column_count() < rowkey_size_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected row column cnt", KR(ret), K(lhs), K(rhs), K_(rowkey_size));
} else {
if (OB_FAIL(lhs_rowkey_.assign(lhs->storage_datums_, rowkey_size_))) {
LOG_WARN("Failed to assign datum rowkey", KR(ret), K(lhs), K_(rowkey_size));
} else if (OB_FAIL(rhs_rowkey_.assign(rhs->storage_datums_, rowkey_size_))) {
LOG_WARN("Failed to assign datum rowkey", KR(ret), K(rhs), K_(rowkey_size));
} else if (OB_FAIL(rowkey_compare_.compare(&lhs_rowkey_, &rhs_rowkey_, cmp_ret))) {
LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(cmp_ret));
}
}
return ret;
}
bool ObDirectLoadDatumRowCompare::operator()(const ObDatumRow *lhs, const ObDatumRow *rhs)
{
int ret = OB_SUCCESS;
@ -125,6 +167,29 @@ int ObDirectLoadDatumArrayCompare::init(const ObStorageDatumUtils &datum_utils)
return ret;
}
int ObDirectLoadDatumArrayCompare::compare(const ObDirectLoadDatumArray *lhs,
const ObDirectLoadDatumArray *rhs, int &cmp_ret)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObDirectLoadDatumArrayCompare not init", KR(ret), KP(this));
} else if (OB_UNLIKELY(nullptr == lhs || nullptr == rhs || !lhs->is_valid() || !rhs->is_valid() ||
lhs->count_ != rhs->count_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(lhs), KP(rhs));
} else if (lhs->count_ > 0) {
if (OB_FAIL(lhs_rowkey_.assign(lhs->datums_, lhs->count_))) {
LOG_WARN("fail to assign rowkey", KR(ret));
} else if (OB_FAIL(rhs_rowkey_.assign(rhs->datums_, rhs->count_))) {
LOG_WARN("fail to assign rowkey", KR(ret));
} else if (OB_FAIL(rowkey_compare_.compare(&lhs_rowkey_, &rhs_rowkey_, cmp_ret))) {
LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(cmp_ret));
}
}
return ret;
}
bool ObDirectLoadDatumArrayCompare::operator()(const ObDirectLoadDatumArray *lhs,
const ObDirectLoadDatumArray *rhs)
{
@ -154,6 +219,29 @@ bool ObDirectLoadDatumArrayCompare::operator()(const ObDirectLoadDatumArray *lhs
return bret;
}
int ObDirectLoadDatumArrayCompare::compare(const ObDirectLoadConstDatumArray *lhs,
const ObDirectLoadConstDatumArray *rhs, int &cmp_ret)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObDirectLoadDatumArrayCompare not init", KR(ret), KP(this));
} else if (OB_UNLIKELY(nullptr == lhs || nullptr == rhs || !lhs->is_valid() || !rhs->is_valid() ||
lhs->count_ != rhs->count_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(lhs), KP(rhs));
} else if (lhs->count_ > 0) {
if (OB_FAIL(lhs_rowkey_.assign(lhs->datums_, lhs->count_))) {
LOG_WARN("fail to assign rowkey", KR(ret));
} else if (OB_FAIL(rhs_rowkey_.assign(rhs->datums_, rhs->count_))) {
LOG_WARN("fail to assign rowkey", KR(ret));
} else if (OB_FAIL(rowkey_compare_.compare(&lhs_rowkey_, &rhs_rowkey_, cmp_ret))) {
LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(cmp_ret));
}
}
return ret;
}
bool ObDirectLoadDatumArrayCompare::operator()(const ObDirectLoadConstDatumArray *lhs,
const ObDirectLoadConstDatumArray *rhs)
{
@ -187,7 +275,8 @@ bool ObDirectLoadDatumArrayCompare::operator()(const ObDirectLoadConstDatumArray
* ObDirectLoadExternalRowCompare
*/
int ObDirectLoadExternalRowCompare::init(const ObStorageDatumUtils &datum_utils)
int ObDirectLoadExternalRowCompare::init(const ObStorageDatumUtils &datum_utils,
sql::ObLoadDupActionType dup_action)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
@ -197,6 +286,7 @@ int ObDirectLoadExternalRowCompare::init(const ObStorageDatumUtils &datum_utils)
if (OB_FAIL(datum_array_compare_.init(datum_utils))) {
LOG_WARN("fail to init datum array compare", KR(ret));
} else {
dup_action_ = dup_action;
is_inited_ = true;
}
}
@ -207,7 +297,7 @@ bool ObDirectLoadExternalRowCompare::operator()(const ObDirectLoadExternalRow *l
const ObDirectLoadExternalRow *rhs)
{
int ret = OB_SUCCESS;
bool bret = false;
int cmp_ret = 0;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObDirectLoadDatumRowCompare not init", KR(ret), KP(this));
@ -215,22 +305,57 @@ bool ObDirectLoadExternalRowCompare::operator()(const ObDirectLoadExternalRow *l
!rhs->is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(lhs), KP(rhs));
} else {
bret = datum_array_compare_.operator()(&lhs->rowkey_datum_array_, &rhs->rowkey_datum_array_);
} else if (OB_FAIL(compare(lhs, rhs, cmp_ret))) {
LOG_WARN("Fail to compare datum array", KR(ret), KP(lhs), KP(rhs));
}
if (OB_FAIL(ret)) {
result_code_ = ret;
} else if (OB_FAIL(datum_array_compare_.get_error_code())) {
result_code_ = datum_array_compare_.get_error_code();
}
return bret;
return cmp_ret < 0;
}
int ObDirectLoadExternalRowCompare::compare(const ObDirectLoadExternalRow *lhs,
const ObDirectLoadExternalRow *rhs, int &cmp_ret)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObDirectLoadDatumRowCompare not init", KR(ret), KP(this));
} else if (OB_UNLIKELY(nullptr == lhs || nullptr == rhs || !lhs->is_valid() ||
!rhs->is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(lhs), KP(rhs));
} else if (OB_FAIL(datum_array_compare_.compare(&lhs->rowkey_datum_array_,
&rhs->rowkey_datum_array_, cmp_ret))) {
LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(cmp_ret));
} else {
if (cmp_ret == 0) {
if (lhs->seq_no_ == rhs->seq_no_) {
cmp_ret = 0;
} else if (lhs->seq_no_ > rhs->seq_no_) {
if (dup_action_ == sql::ObLoadDupActionType::LOAD_REPLACE) {
cmp_ret = -1;
} else {
cmp_ret = 1;
}
} else {
if (dup_action_ == sql::ObLoadDupActionType::LOAD_REPLACE) {
cmp_ret = 1;
} else {
cmp_ret = -1;
}
}
}
}
return ret;
}
/**
* ObDirectLoadExternalMultiPartitionRowCompare
*/
int ObDirectLoadExternalMultiPartitionRowCompare::init(const ObStorageDatumUtils &datum_utils)
int ObDirectLoadExternalMultiPartitionRowCompare::init(const ObStorageDatumUtils &datum_utils,
sql::ObLoadDupActionType dup_action)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
@ -240,6 +365,7 @@ int ObDirectLoadExternalMultiPartitionRowCompare::init(const ObStorageDatumUtils
if (OB_FAIL(datum_array_compare_.init(datum_utils))) {
LOG_WARN("fail to init datum array compare", KR(ret));
} else {
dup_action_ = dup_action;
is_inited_ = true;
}
}
@ -251,7 +377,7 @@ bool ObDirectLoadExternalMultiPartitionRowCompare::operator()(
const ObDirectLoadExternalMultiPartitionRow *rhs)
{
int ret = OB_SUCCESS;
bool bret = false;
int cmp_ret = 0;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObDirectLoadExternalMultiPartitionRowCompare not init", KR(ret), KP(this));
@ -259,25 +385,40 @@ bool ObDirectLoadExternalMultiPartitionRowCompare::operator()(
!rhs->is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(lhs), KP(rhs));
} else {
if (lhs->tablet_id_ != rhs->tablet_id_) {
bret = lhs->tablet_id_ < rhs->tablet_id_;
} else {
bret = datum_array_compare_.operator()(&lhs->external_row_.rowkey_datum_array_,
&rhs->external_row_.rowkey_datum_array_);
}
} else if (OB_FAIL(compare(lhs, rhs, cmp_ret))) {
LOG_WARN("Fail to compare datum array", KR(ret), KP(lhs), KP(rhs));
}
if (OB_FAIL(ret)) {
result_code_ = ret;
} else if (OB_FAIL(datum_array_compare_.get_error_code())) {
result_code_ = datum_array_compare_.get_error_code();
}
return bret;
return cmp_ret < 0;
}
bool ObDirectLoadExternalMultiPartitionRowCompare::operator()(
const ObDirectLoadConstExternalMultiPartitionRow *lhs,
const ObDirectLoadConstExternalMultiPartitionRow *rhs)
{
int ret = OB_SUCCESS;
int cmp_ret = 0;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObDirectLoadExternalMultiPartitionRowCompare not init", KR(ret), KP(this));
} else if (OB_UNLIKELY(nullptr == lhs || nullptr == rhs || !lhs->is_valid() ||
!rhs->is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(lhs), KP(rhs));
} else if (OB_FAIL(compare(lhs, rhs, cmp_ret))) {
LOG_WARN("Fail to compare datum array", KR(ret), KP(lhs), KP(rhs));
}
if (OB_FAIL(ret)) {
result_code_ = ret;
}
return cmp_ret < 0;
}
int ObDirectLoadExternalMultiPartitionRowCompare::compare(
const ObDirectLoadExternalMultiPartitionRow *lhs,
const ObDirectLoadExternalMultiPartitionRow *rhs, int &cmp_ret)
{
int ret = OB_SUCCESS;
bool bret = false;
@ -290,18 +431,70 @@ bool ObDirectLoadExternalMultiPartitionRowCompare::operator()(
LOG_WARN("invalid args", KR(ret), KP(lhs), KP(rhs));
} else {
if (lhs->tablet_id_ != rhs->tablet_id_) {
bret = lhs->tablet_id_ < rhs->tablet_id_;
} else {
bret = datum_array_compare_.operator()(&lhs->rowkey_datum_array_, &rhs->rowkey_datum_array_);
cmp_ret = lhs->tablet_id_ < rhs->tablet_id_ ? -1 : 1;
} else if (OB_FAIL(datum_array_compare_.compare(&lhs->external_row_.rowkey_datum_array_,
&rhs->external_row_.rowkey_datum_array_,
cmp_ret))) {
LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(cmp_ret));
} else if (cmp_ret == 0) {
if (lhs->external_row_.seq_no_ == rhs->external_row_.seq_no_) {
cmp_ret = 0;
} else if (lhs->external_row_.seq_no_ > rhs->external_row_.seq_no_) {
if (dup_action_ == sql::ObLoadDupActionType::LOAD_REPLACE) {
cmp_ret = -1;
} else {
cmp_ret = 1;
}
} else {
if (dup_action_ == sql::ObLoadDupActionType::LOAD_REPLACE) {
cmp_ret = 1;
} else {
cmp_ret = -1;
}
}
}
}
if (OB_FAIL(ret)) {
result_code_ = ret;
} else if (OB_FAIL(datum_array_compare_.get_error_code())) {
result_code_ = datum_array_compare_.get_error_code();
}
return bret;
return ret;
}
} // namespace storage
} // namespace oceanbase
int ObDirectLoadExternalMultiPartitionRowCompare::compare(
const ObDirectLoadConstExternalMultiPartitionRow *lhs,
const ObDirectLoadConstExternalMultiPartitionRow *rhs, int &cmp_ret)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObDirectLoadExternalMultiPartitionRowCompare not init", KR(ret), KP(this));
} else if (OB_UNLIKELY(nullptr == lhs || nullptr == rhs || !lhs->is_valid() ||
!rhs->is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(lhs), KP(rhs));
} else {
if (lhs->tablet_id_ != rhs->tablet_id_) {
cmp_ret = lhs->tablet_id_ < rhs->tablet_id_ ? -1 : 1;
} else if (OB_FAIL(datum_array_compare_.compare(&lhs->rowkey_datum_array_,
&rhs->rowkey_datum_array_, cmp_ret))) {
LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(cmp_ret));
} else if (cmp_ret == 0) {
if (lhs->seq_no_ == rhs->seq_no_) {
cmp_ret = 0;
} else if (lhs->seq_no_ > rhs->seq_no_) {
if (dup_action_ == sql::ObLoadDupActionType::LOAD_REPLACE) {
cmp_ret = -1;
} else {
cmp_ret = 1;
}
} else {
if (dup_action_ == sql::ObLoadDupActionType::LOAD_REPLACE) {
cmp_ret = 1;
} else {
cmp_ret = -1;
}
}
}
}
return ret;
}
} // namespace storage
} // namespace oceanbase

View File

@ -6,6 +6,7 @@
#include "lib/ob_define.h"
#include "lib/ob_errno.h"
#include "sql/resolver/cmd/ob_load_data_stmt.h"
#include "storage/blocksstable/ob_datum_rowkey.h"
namespace oceanbase
@ -15,7 +16,7 @@ namespace blocksstable
class ObStorageDatumUtils;
class ObDatumRowkey;
class ObDatumRow;
} // namespace blocksstable
} // namespace blocksstable
namespace storage
{
class ObDirectLoadDatumArray;
@ -29,8 +30,11 @@ class ObDirectLoadDatumRowkeyCompare
public:
ObDirectLoadDatumRowkeyCompare() : datum_utils_(nullptr), result_code_(common::OB_SUCCESS) {}
int init(const blocksstable::ObStorageDatumUtils &datum_utils);
int compare(const blocksstable::ObDatumRowkey *lhs, const blocksstable::ObDatumRowkey *rhs,
int &cmp_ret);
bool operator()(const blocksstable::ObDatumRowkey *lhs, const blocksstable::ObDatumRowkey *rhs);
int get_error_code() const { return result_code_; }
public:
const blocksstable::ObStorageDatumUtils *datum_utils_;
int result_code_;
@ -40,10 +44,15 @@ class ObDirectLoadDatumRowCompare
{
public:
ObDirectLoadDatumRowCompare()
: rowkey_size_(0), result_code_(common::OB_SUCCESS), is_inited_(false) {}
: rowkey_size_(0), result_code_(common::OB_SUCCESS), is_inited_(false)
{
}
int init(const blocksstable::ObStorageDatumUtils &datum_utils, int64_t rowkey_size);
int compare(const blocksstable::ObDatumRow *lhs, const blocksstable::ObDatumRow *rhs,
int &cmp_ret);
bool operator()(const blocksstable::ObDatumRow *lhs, const blocksstable::ObDatumRow *rhs);
int get_error_code() const { return result_code_; }
public:
blocksstable::ObDatumRowkey lhs_rowkey_;
blocksstable::ObDatumRowkey rhs_rowkey_;
@ -56,12 +65,15 @@ public:
class ObDirectLoadDatumArrayCompare
{
public:
ObDirectLoadDatumArrayCompare()
: result_code_(common::OB_SUCCESS), is_inited_(false) {}
ObDirectLoadDatumArrayCompare() : result_code_(common::OB_SUCCESS), is_inited_(false) {}
int init(const blocksstable::ObStorageDatumUtils &datum_utils);
bool operator()(const ObDirectLoadDatumArray *lhs, const ObDirectLoadDatumArray *rhs);
int compare(const ObDirectLoadDatumArray *lhs, const ObDirectLoadDatumArray *rhs, int &cmp_ret);
bool operator()(const ObDirectLoadConstDatumArray *lhs, const ObDirectLoadConstDatumArray *rhs);
int compare(const ObDirectLoadConstDatumArray *lhs, const ObDirectLoadConstDatumArray *rhs,
int &cmp_ret);
int get_error_code() const { return result_code_; }
public:
blocksstable::ObDatumRowkey lhs_rowkey_;
blocksstable::ObDatumRowkey rhs_rowkey_;
@ -74,11 +86,15 @@ class ObDirectLoadExternalRowCompare
{
public:
ObDirectLoadExternalRowCompare() : result_code_(common::OB_SUCCESS), is_inited_(false) {}
int init(const blocksstable::ObStorageDatumUtils &datum_utils);
int init(const blocksstable::ObStorageDatumUtils &datum_utils,
sql::ObLoadDupActionType dup_action);
int compare(const ObDirectLoadExternalRow *lhs, const ObDirectLoadExternalRow *rhs, int &cmp_ret);
bool operator()(const ObDirectLoadExternalRow *lhs, const ObDirectLoadExternalRow *rhs);
int get_error_code() const { return result_code_; }
public:
ObDirectLoadDatumArrayCompare datum_array_compare_;
sql::ObLoadDupActionType dup_action_;
int result_code_;
bool is_inited_;
};
@ -87,18 +103,27 @@ class ObDirectLoadExternalMultiPartitionRowCompare
{
public:
ObDirectLoadExternalMultiPartitionRowCompare()
: result_code_(common::OB_SUCCESS), is_inited_(false) {}
int init(const blocksstable::ObStorageDatumUtils &datum_utils);
: result_code_(common::OB_SUCCESS), is_inited_(false)
{
}
int init(const blocksstable::ObStorageDatumUtils &datum_utils,
sql::ObLoadDupActionType dup_action);
bool operator()(const ObDirectLoadExternalMultiPartitionRow *lhs,
const ObDirectLoadExternalMultiPartitionRow *rhs);
bool operator()(const ObDirectLoadConstExternalMultiPartitionRow *lhs,
const ObDirectLoadConstExternalMultiPartitionRow *rhs);
int compare(const ObDirectLoadExternalMultiPartitionRow *lhs,
const ObDirectLoadExternalMultiPartitionRow *rhs, int &cmp_ret);
int compare(const ObDirectLoadConstExternalMultiPartitionRow *lhs,
const ObDirectLoadConstExternalMultiPartitionRow *rhs, int &cmp_ret);
int get_error_code() const { return result_code_; }
public:
ObDirectLoadDatumArrayCompare datum_array_compare_;
sql::ObLoadDupActionType dup_action_;
int result_code_;
bool is_inited_;
};
} // namespace storage
} // namespace oceanbase
} // namespace storage
} // namespace oceanbase

View File

@ -95,6 +95,7 @@ void ObDirectLoadConstExternalMultiPartitionRow::reset()
{
tablet_id_.reset();
rowkey_datum_array_.reset();
seq_no_.reset();
buf_size_ = 0;
buf_ = nullptr;
}
@ -107,6 +108,7 @@ ObDirectLoadConstExternalMultiPartitionRow &ObDirectLoadConstExternalMultiPartit
tablet_id_ = other.tablet_id_;
rowkey_datum_array_ = other.rowkey_datum_array_;
buf_size_ = other.buf_size_;
seq_no_ = other.seq_no_;
buf_ = other.buf_;
}
return *this;
@ -118,6 +120,7 @@ ObDirectLoadConstExternalMultiPartitionRow &ObDirectLoadConstExternalMultiPartit
tablet_id_ = other.tablet_id_;
rowkey_datum_array_ = other.external_row_.rowkey_datum_array_;
buf_size_ = other.external_row_.buf_size_;
seq_no_ = other.external_row_.seq_no_;
buf_ = other.external_row_.buf_;
return *this;
}
@ -147,6 +150,7 @@ int ObDirectLoadConstExternalMultiPartitionRow::deep_copy(
LOG_WARN("fail to deep copy datum array", KR(ret));
} else {
buf_size_ = src.buf_size_;
seq_no_ = src.seq_no_;
buf_ = buf + pos;
MEMCPY(buf + pos, src.buf_, buf_size_);
pos += buf_size_;

View File

@ -5,6 +5,7 @@
#pragma once
#include "storage/direct_load/ob_direct_load_external_row.h"
#include "share/table/ob_table_load_define.h"
namespace oceanbase
{
@ -47,13 +48,14 @@ public:
int to_datums(blocksstable::ObStorageDatum *datums, int64_t column_count) const;
bool is_valid() const
{
return tablet_id_.is_valid() && rowkey_datum_array_.is_valid() && buf_size_ > 0 &&
nullptr != buf_;
return tablet_id_.is_valid() && rowkey_datum_array_.is_valid() && seq_no_.is_valid() &&
buf_size_ > 0 && nullptr != buf_;
}
TO_STRING_KV(K_(tablet_id), K_(rowkey_datum_array), K_(buf_size), KP_(buf));
TO_STRING_KV(K_(tablet_id), K_(rowkey_datum_array), K_(seq_no), K_(buf_size), KP_(buf));
public:
common::ObTabletID tablet_id_;
ObDirectLoadConstDatumArray rowkey_datum_array_;
table::ObTableLoadSequenceNo seq_no_;
int64_t buf_size_;
const char *buf_;
};

View File

@ -7,6 +7,7 @@
#include "storage/direct_load/ob_direct_load_external_multi_partition_table.h"
#include "observer/table_load/ob_table_load_stat.h"
#include "storage/direct_load/ob_direct_load_external_table.h"
#include "share/table/ob_table_load_define.h"
namespace oceanbase
{
@ -84,6 +85,7 @@ int ObDirectLoadExternalMultiPartitionTableBuilder::init(
}
int ObDirectLoadExternalMultiPartitionTableBuilder::append_row(const ObTabletID &tablet_id,
const table::ObTableLoadSequenceNo &seq_no,
const ObDatumRow &datum_row)
{
int ret = OB_SUCCESS;
@ -101,7 +103,7 @@ int ObDirectLoadExternalMultiPartitionTableBuilder::append_row(const ObTabletID
OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_append_row_time_us);
row_.tablet_id_ = tablet_id;
if (OB_FAIL(row_.external_row_.from_datums(datum_row.storage_datums_, datum_row.count_,
param_.table_data_desc_.rowkey_column_num_))) {
param_.table_data_desc_.rowkey_column_num_, seq_no))) {
LOG_WARN("fail to from datums", KR(ret));
} else if (OB_FAIL(external_writer_.write_item(row_))) {
LOG_WARN("fail to write item", KR(ret));

View File

@ -41,6 +41,7 @@ public:
virtual ~ObDirectLoadExternalMultiPartitionTableBuilder();
int init(const ObDirectLoadExternalMultiPartitionTableBuildParam &param);
int append_row(const common::ObTabletID &tablet_id,
const table::ObTableLoadSequenceNo &seq_no,
const blocksstable::ObDatumRow &datum_row) override;
int close() override;
int64_t get_row_count() const override { return total_row_count_; }

View File

@ -14,6 +14,7 @@ namespace storage
{
using namespace common;
using namespace blocksstable;
using namespace table;
ObDirectLoadExternalRow::ObDirectLoadExternalRow()
: allocator_("TLD_ext_row"), buf_size_(0), buf_(nullptr)
@ -24,6 +25,7 @@ ObDirectLoadExternalRow::ObDirectLoadExternalRow()
void ObDirectLoadExternalRow::reset()
{
rowkey_datum_array_.reset();
seq_no_.reset();
buf_size_ = 0;
buf_ = nullptr;
allocator_.reset();
@ -32,6 +34,7 @@ void ObDirectLoadExternalRow::reset()
void ObDirectLoadExternalRow::reuse()
{
rowkey_datum_array_.reuse();
seq_no_.reset();
buf_size_ = 0;
buf_ = nullptr;
allocator_.reuse();
@ -62,6 +65,7 @@ int ObDirectLoadExternalRow::deep_copy(const ObDirectLoadExternalRow &src, char
LOG_WARN("fail to deep copy datum array", KR(ret));
} else {
buf_size_ = src.buf_size_;
seq_no_ = src.seq_no_;
buf_ = buf + pos;
MEMCPY(buf + pos, src.buf_, buf_size_);
pos += buf_size_;
@ -71,7 +75,7 @@ int ObDirectLoadExternalRow::deep_copy(const ObDirectLoadExternalRow &src, char
}
int ObDirectLoadExternalRow::from_datums(ObStorageDatum *datums, int64_t column_count,
int64_t rowkey_column_count)
int64_t rowkey_column_count, const ObTableLoadSequenceNo &seq_no)
{
OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, transfer_external_row_time_us);
int ret = OB_SUCCESS;
@ -98,6 +102,7 @@ int ObDirectLoadExternalRow::from_datums(ObStorageDatum *datums, int64_t column_
} else {
buf_ = buf;
buf_size_ = buf_size;
seq_no_ = seq_no;
}
}
}
@ -153,7 +158,7 @@ OB_DEF_SERIALIZE_SIMPLE(ObDirectLoadExternalRow)
{
OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_row_serialize_time_us);
int ret = OB_SUCCESS;
LST_DO_CODE(OB_UNIS_ENCODE, rowkey_datum_array_, buf_size_);
LST_DO_CODE(OB_UNIS_ENCODE, rowkey_datum_array_, seq_no_, buf_size_);
if (OB_SUCC(ret) && OB_NOT_NULL(buf_)) {
MEMCPY(buf + pos, buf_, buf_size_);
pos += buf_size_;
@ -166,7 +171,7 @@ OB_DEF_DESERIALIZE_SIMPLE(ObDirectLoadExternalRow)
OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_row_deserialize_time_us);
int ret = OB_SUCCESS;
reuse();
LST_DO_CODE(OB_UNIS_DECODE, rowkey_datum_array_, buf_size_);
LST_DO_CODE(OB_UNIS_DECODE, rowkey_datum_array_, seq_no_, buf_size_);
if (OB_SUCC(ret)) {
buf_ = buf + pos;
pos += buf_size_;
@ -178,7 +183,7 @@ OB_DEF_SERIALIZE_SIZE_SIMPLE(ObDirectLoadExternalRow)
{
OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_row_serialize_time_us);
int64_t len = 0;
LST_DO_CODE(OB_UNIS_ADD_LEN, rowkey_datum_array_, buf_size_);
LST_DO_CODE(OB_UNIS_ADD_LEN, rowkey_datum_array_, seq_no_, buf_size_);
len += buf_size_;
return len;
}

View File

@ -5,6 +5,7 @@
#pragma once
#include "lib/allocator/page_arena.h"
#include "share/table/ob_table_load_define.h"
#include "storage/blocksstable/ob_datum_rowkey.h"
#include "storage/direct_load/ob_direct_load_datum.h"
@ -24,18 +25,20 @@ public:
int deep_copy(const ObDirectLoadExternalRow &src, char *buf, const int64_t len, int64_t &pos);
// not deep copy
int from_datums(blocksstable::ObStorageDatum *datums, int64_t column_count,
int64_t rowkey_column_count);
int64_t rowkey_column_count, const table::ObTableLoadSequenceNo &seq_no);
int to_datums(blocksstable::ObStorageDatum *datums, int64_t column_count) const;
int get_rowkey(blocksstable::ObDatumRowkey &rowkey) const;
bool is_valid() const
{
return rowkey_datum_array_.is_valid() && buf_size_ > 0 && nullptr != buf_;
return rowkey_datum_array_.is_valid() && seq_no_.is_valid() && buf_size_ > 0 && nullptr != buf_;
}
OB_INLINE int64_t get_raw_size() const { return buf_size_; }
TO_STRING_KV(K_(rowkey_datum_array), K_(buf_size), KP_(buf));
TO_STRING_KV(K_(rowkey_datum_array), K_(seq_no), K_(buf_size), KP_(buf));
public:
common::ObArenaAllocator allocator_;
ObDirectLoadDatumArray rowkey_datum_array_;
table::ObTableLoadSequenceNo seq_no_;
int64_t buf_size_;
const char *buf_;
};

View File

@ -79,6 +79,7 @@ int ObDirectLoadExternalTableBuilder::init(const ObDirectLoadExternalTableBuildP
}
int ObDirectLoadExternalTableBuilder::append_row(const ObTabletID &tablet_id,
const table::ObTableLoadSequenceNo &seq_no,
const ObDatumRow &datum_row)
{
int ret = OB_SUCCESS;
@ -96,7 +97,7 @@ int ObDirectLoadExternalTableBuilder::append_row(const ObTabletID &tablet_id,
} else {
OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_append_row_time_us);
if (OB_FAIL(external_row_.from_datums(datum_row.storage_datums_, datum_row.count_,
build_param_.table_data_desc_.rowkey_column_num_))) {
build_param_.table_data_desc_.rowkey_column_num_, seq_no))) {
LOG_WARN("fail to from datums", KR(ret));
} else if (OB_FAIL(external_writer_.write_item(external_row_))) {
LOG_WARN("fail to write item", KR(ret));

View File

@ -42,6 +42,7 @@ public:
virtual ~ObDirectLoadExternalTableBuilder();
int init(const ObDirectLoadExternalTableBuildParam &build_param);
int append_row(const common::ObTabletID &tablet_id,
const table::ObTableLoadSequenceNo &seq_no,
const blocksstable::ObDatumRow &datum_row) override;
int close() override;
int64_t get_row_count() const override { return row_count_; }

View File

@ -191,9 +191,11 @@ int ObDirectLoadFastHeapTableBuilder::switch_sstable_slice()
}
int ObDirectLoadFastHeapTableBuilder::append_row(const ObTabletID &tablet_id,
const table::ObTableLoadSequenceNo &seq_no,
const ObDatumRow &datum_row)
{
UNUSED(tablet_id);
UNUSED(seq_no);
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;

View File

@ -56,6 +56,7 @@ public:
virtual ~ObDirectLoadFastHeapTableBuilder();
int init(const ObDirectLoadFastHeapTableBuildParam &param);
int append_row(const common::ObTabletID &tablet_id,
const table::ObTableLoadSequenceNo &seq_no,
const blocksstable::ObDatumRow &datum_row) override;
int close() override;
int64_t get_row_count() const override { return row_count_; }

View File

@ -5,6 +5,7 @@
#pragma once
#include "storage/blocksstable/ob_datum_row.h"
#include "share/table/ob_table_load_define.h"
namespace oceanbase
{
@ -28,6 +29,7 @@ public:
ObIDirectLoadPartitionTableBuilder() = default;
virtual ~ObIDirectLoadPartitionTableBuilder() = default;
virtual int append_row(const common::ObTabletID &tablet_id,
const table::ObTableLoadSequenceNo &seq_no,
const blocksstable::ObDatumRow &datum_row) = 0;
virtual int close() = 0;
virtual int64_t get_row_count() const = 0;

View File

@ -78,6 +78,7 @@ public:
int32_t column_count_;
ObDirectLoadDMLRowHandler *dml_row_handler_;
ObDirectLoadTmpFileManager *file_mgr_;
sql::ObLoadDupActionType dup_action_;
ObDirectLoadEasyQueue<storage::ObDirectLoadExternalMultiPartitionRowChunk *> mem_chunk_queue_;
int64_t fly_mem_chunk_count_;

View File

@ -215,7 +215,7 @@ int ObDirectLoadMemDump::dump_tables()
if (OB_ISNULL(extra_buf_ = static_cast<char *>(allocator_.alloc(extra_buf_size_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate memory", KR(ret));
} else if (OB_FAIL(compare.init(*(mem_ctx_->datum_utils_)))) {
} else if (OB_FAIL(compare.init(*(mem_ctx_->datum_utils_), mem_ctx_->dup_action_))) {
LOG_WARN("fail to init compare", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < context_ptr_->mem_chunk_array_.count(); i++) {
@ -263,7 +263,7 @@ int ObDirectLoadMemDump::dump_tables()
if (OB_SUCC(ret)) {
if (OB_FAIL(external_row->to_datums(datum_row.storage_datums_, datum_row.count_))) {
LOG_WARN("fail to transfer dataum row", KR(ret));
} else if (OB_FAIL(table_builder->append_row(external_row->tablet_id_, datum_row))) {
} else if (OB_FAIL(table_builder->append_row(external_row->tablet_id_, external_row->seq_no_, datum_row))) {
if (OB_LIKELY(OB_ERR_PRIMARY_KEY_DUPLICATE == ret)) {
if (OB_FAIL(mem_ctx_->dml_row_handler_->handle_update_row(datum_row))) {
LOG_WARN("fail to handle update row", KR(ret), K(datum_row));

View File

@ -142,7 +142,7 @@ int ObDirectLoadMemLoader::close_chunk(ChunkType *&chunk)
{
int ret = OB_SUCCESS;
CompareType compare;
if (OB_FAIL(compare.init(*(mem_ctx_->datum_utils_)))) {
if (OB_FAIL(compare.init(*(mem_ctx_->datum_utils_), mem_ctx_->dup_action_))) {
LOG_WARN("fail to init compare", KR(ret));
} else if (OB_FAIL(chunk->sort(compare))) {
LOG_WARN("fail to sort chunk", KR(ret));

View File

@ -38,7 +38,7 @@ int ObDirectLoadMemSample::gen_ranges(ObIArray<ChunkType *> &chunks, ObIArray<Ra
}
if (OB_SUCC(ret)) {
CompareType compare;
if (OB_FAIL(compare.init(*(mem_ctx_->datum_utils_)))) {
if (OB_FAIL(compare.init(*(mem_ctx_->datum_utils_), mem_ctx_->dup_action_))) {
LOG_WARN("fail to init compare", KR(ret));
} else {
std::sort(sample_rows.begin(), sample_rows.end(), compare);

View File

@ -14,6 +14,7 @@ namespace storage
{
using namespace common;
using namespace blocksstable;
using namespace table;
ObDirectLoadMultipleDatumRow::ObDirectLoadMultipleDatumRow()
: allocator_("TLD_MultiRow"), buf_size_(0), buf_(nullptr)
@ -28,6 +29,7 @@ ObDirectLoadMultipleDatumRow::~ObDirectLoadMultipleDatumRow()
void ObDirectLoadMultipleDatumRow::reset()
{
rowkey_.reset();
seq_no_.reset();
buf_size_ = 0;
buf_ = nullptr;
allocator_.reset();
@ -36,6 +38,7 @@ void ObDirectLoadMultipleDatumRow::reset()
void ObDirectLoadMultipleDatumRow::reuse()
{
rowkey_.reuse();
seq_no_.reset();
buf_size_ = 0;
buf_ = nullptr;
allocator_.reuse();
@ -66,6 +69,7 @@ int ObDirectLoadMultipleDatumRow::deep_copy(const ObDirectLoadMultipleDatumRow &
LOG_WARN("fail to deep copy rowkey", KR(ret));
} else {
buf_size_ = src.buf_size_;
seq_no_ = src.seq_no_;
buf_ = buf + pos;
MEMCPY(buf + pos, src.buf_, buf_size_);
pos += buf_size_;
@ -75,7 +79,7 @@ int ObDirectLoadMultipleDatumRow::deep_copy(const ObDirectLoadMultipleDatumRow &
}
int ObDirectLoadMultipleDatumRow::from_datums(const ObTabletID &tablet_id, ObStorageDatum *datums,
int64_t column_count, int64_t rowkey_column_count)
int64_t column_count, int64_t rowkey_column_count, const ObTableLoadSequenceNo &seq_no)
{
OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, transfer_external_row_time_us);
int ret = OB_SUCCESS;
@ -104,6 +108,7 @@ int ObDirectLoadMultipleDatumRow::from_datums(const ObTabletID &tablet_id, ObSto
} else {
buf_ = buf;
buf_size_ = buf_size;
seq_no_ = seq_no;
}
}
}
@ -147,7 +152,7 @@ OB_DEF_SERIALIZE_SIMPLE(ObDirectLoadMultipleDatumRow)
{
OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_row_serialize_time_us);
int ret = OB_SUCCESS;
LST_DO_CODE(OB_UNIS_ENCODE, rowkey_, buf_size_);
LST_DO_CODE(OB_UNIS_ENCODE, rowkey_, seq_no_, buf_size_);
if (OB_SUCC(ret) && OB_NOT_NULL(buf_)) {
MEMCPY(buf + pos, buf_, buf_size_);
pos += buf_size_;
@ -160,7 +165,7 @@ OB_DEF_DESERIALIZE_SIMPLE(ObDirectLoadMultipleDatumRow)
OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_row_deserialize_time_us);
int ret = OB_SUCCESS;
reuse();
LST_DO_CODE(OB_UNIS_DECODE, rowkey_, buf_size_);
LST_DO_CODE(OB_UNIS_DECODE, rowkey_, seq_no_, buf_size_);
if (OB_SUCC(ret)) {
buf_ = buf + pos;
pos += buf_size_;
@ -172,7 +177,7 @@ OB_DEF_SERIALIZE_SIZE_SIMPLE(ObDirectLoadMultipleDatumRow)
{
OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_row_serialize_time_us);
int64_t len = 0;
LST_DO_CODE(OB_UNIS_ADD_LEN, rowkey_, buf_size_);
LST_DO_CODE(OB_UNIS_ADD_LEN, rowkey_, seq_no_, buf_size_);
len += buf_size_;
return len;
}

View File

@ -4,6 +4,7 @@
#pragma once
#include "share/table/ob_table_load_define.h"
#include "storage/direct_load/ob_direct_load_multiple_datum_rowkey.h"
namespace oceanbase
@ -24,14 +25,20 @@ public:
int64_t &pos);
// not deep copy
int from_datums(const common::ObTabletID &tablet_id, blocksstable::ObStorageDatum *datums,
int64_t column_count, int64_t rowkey_column_count);
int64_t column_count, int64_t rowkey_column_count,
const table::ObTableLoadSequenceNo &seq_no);
int to_datums(blocksstable::ObStorageDatum *datums, int64_t column_count) const;
OB_INLINE bool is_valid() const { return rowkey_.is_valid() && buf_size_ > 0 && nullptr != buf_; }
OB_INLINE bool is_valid() const
{
return rowkey_.is_valid() && seq_no_.is_valid() && buf_size_ > 0 && nullptr != buf_;
}
OB_INLINE int64_t get_raw_size() const { return buf_size_; }
TO_STRING_KV(K_(rowkey), K_(buf_size), KP_(buf));
TO_STRING_KV(K_(rowkey), K_(seq_no), K_(buf_size), KP_(buf));
public:
common::ObArenaAllocator allocator_;
ObDirectLoadMultipleDatumRowkey rowkey_;
table::ObTableLoadSequenceNo seq_no_;
int64_t buf_size_;
const char *buf_;
};

View File

@ -14,6 +14,7 @@ namespace storage
{
using namespace common;
using namespace blocksstable;
using namespace table;
ObDirectLoadMultipleExternalRow::ObDirectLoadMultipleExternalRow()
: allocator_("TLD_ME_Row"), buf_size_(0), buf_(nullptr)
@ -23,6 +24,7 @@ ObDirectLoadMultipleExternalRow::ObDirectLoadMultipleExternalRow()
void ObDirectLoadMultipleExternalRow::reset()
{
seq_no_.reset();
buf_size_ = 0;
buf_ = nullptr;
allocator_.reset();
@ -30,6 +32,7 @@ void ObDirectLoadMultipleExternalRow::reset()
void ObDirectLoadMultipleExternalRow::reuse()
{
seq_no_.reset();
buf_size_ = 0;
buf_ = nullptr;
allocator_.reuse();
@ -54,6 +57,7 @@ int ObDirectLoadMultipleExternalRow::deep_copy(const ObDirectLoadMultipleExterna
} else {
reuse();
buf_size_ = src.buf_size_;
seq_no_ = src.seq_no_;
buf_ = buf + pos;
MEMCPY(buf + pos, src.buf_, buf_size_);
pos += buf_size_;
@ -61,7 +65,7 @@ int ObDirectLoadMultipleExternalRow::deep_copy(const ObDirectLoadMultipleExterna
return ret;
}
int ObDirectLoadMultipleExternalRow::from_datums(ObStorageDatum *datums, int64_t column_count)
int ObDirectLoadMultipleExternalRow::from_datums(ObStorageDatum *datums, int64_t column_count, const ObTableLoadSequenceNo &seq_no)
{
OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, transfer_external_row_time_us);
int ret = OB_SUCCESS;
@ -84,6 +88,7 @@ int ObDirectLoadMultipleExternalRow::from_datums(ObStorageDatum *datums, int64_t
LOG_WARN("fail to serialize datum array", KR(ret));
} else {
buf_ = buf;
seq_no_ = seq_no;
buf_size_ = buf_size;
}
}
@ -121,7 +126,7 @@ OB_DEF_SERIALIZE_SIMPLE(ObDirectLoadMultipleExternalRow)
{
OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_row_serialize_time_us);
int ret = OB_SUCCESS;
LST_DO_CODE(OB_UNIS_ENCODE, tablet_id_.id(), buf_size_);
LST_DO_CODE(OB_UNIS_ENCODE, tablet_id_.id(), seq_no_, buf_size_);
if (OB_SUCC(ret) && OB_NOT_NULL(buf_)) {
MEMCPY(buf + pos, buf_, buf_size_);
pos += buf_size_;
@ -135,7 +140,7 @@ OB_DEF_DESERIALIZE_SIMPLE(ObDirectLoadMultipleExternalRow)
int ret = OB_SUCCESS;
reset();
uint64_t id = 0;
LST_DO_CODE(OB_UNIS_DECODE, id, buf_size_);
LST_DO_CODE(OB_UNIS_DECODE, id, seq_no_, buf_size_);
if (OB_SUCC(ret)) {
tablet_id_ = id;
buf_ = buf + pos;
@ -148,7 +153,7 @@ OB_DEF_SERIALIZE_SIZE_SIMPLE(ObDirectLoadMultipleExternalRow)
{
OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_row_serialize_time_us);
int64_t len = 0;
LST_DO_CODE(OB_UNIS_ADD_LEN, tablet_id_.id(), buf_size_);
LST_DO_CODE(OB_UNIS_ADD_LEN, tablet_id_.id(), seq_no_, buf_size_);
len += buf_size_;
return len;
}

View File

@ -5,6 +5,7 @@
#pragma once
#include "lib/allocator/page_arena.h"
#include "share/table/ob_table_load_define.h"
#include "storage/blocksstable/ob_datum_rowkey.h"
#include "storage/direct_load/ob_direct_load_datum.h"
@ -12,7 +13,6 @@ namespace oceanbase
{
namespace storage
{
class ObDirectLoadMultipleExternalRow
{
OB_UNIS_VERSION(1);
@ -23,16 +23,19 @@ public:
int64_t get_deep_copy_size() const;
int deep_copy(const ObDirectLoadMultipleExternalRow &src, char *buf, const int64_t len,
int64_t &pos);
int from_datums(blocksstable::ObStorageDatum *datums, int64_t column_count);
int from_datums(blocksstable::ObStorageDatum *datums, int64_t column_count,
const table::ObTableLoadSequenceNo &seq_no);
int to_datums(blocksstable::ObStorageDatum *datums, int64_t column_count) const;
OB_INLINE bool is_valid() const
{
return tablet_id_.is_valid() && buf_size_ > 0 && nullptr != buf_;
return tablet_id_.is_valid() && seq_no_.is_valid() && buf_size_ > 0 && nullptr != buf_;
}
TO_STRING_KV(K_(tablet_id), K_(buf_size), KP_(buf));
TO_STRING_KV(K_(tablet_id), K_(seq_no), K_(buf_size), KP_(buf));
public:
common::ObArenaAllocator allocator_;
common::ObTabletID tablet_id_;
table::ObTableLoadSequenceNo seq_no_;
int64_t buf_size_;
const char *buf_;
};

View File

@ -81,6 +81,7 @@ int ObDirectLoadMultipleHeapTableBuilder::init(const ObDirectLoadMultipleHeapTab
}
int ObDirectLoadMultipleHeapTableBuilder::append_row(const ObTabletID &tablet_id,
const table::ObTableLoadSequenceNo &seq_no,
const ObDatumRow &datum_row)
{
int ret = OB_SUCCESS;
@ -96,7 +97,7 @@ int ObDirectLoadMultipleHeapTableBuilder::append_row(const ObTabletID &tablet_id
LOG_WARN("invalid args", KR(ret), K(param_), K(datum_row));
} else {
row_.tablet_id_ = tablet_id;
if (OB_FAIL(row_.from_datums(datum_row.storage_datums_, datum_row.count_))) {
if (OB_FAIL(row_.from_datums(datum_row.storage_datums_, datum_row.count_, seq_no))) {
LOG_WARN("fail to from datum row", KR(ret));
} else if (OB_FAIL(append_row(row_))) {
LOG_WARN("fail to append row", KR(ret), K(row_));

View File

@ -46,6 +46,7 @@ public:
virtual ~ObDirectLoadMultipleHeapTableBuilder();
int init(const ObDirectLoadMultipleHeapTableBuildParam &param);
int append_row(const common::ObTabletID &tablet_id,
const table::ObTableLoadSequenceNo &seq_no,
const blocksstable::ObDatumRow &datum_row) override;
int append_row(const RowType &row);
int close() override;

View File

@ -102,7 +102,7 @@ int ObDirectLoadMultipleHeapTableSorter::close_chunk(ObDirectLoadMultipleHeapTab
for (int64_t j = 0; OB_SUCC(ret) && j < bag.count(); j ++) {
if (OB_FAIL(bag.at(j)->to_datums(datum_row.storage_datums_, datum_row.count_))) {
LOG_WARN("fail to transfer dataum row", KR(ret));
} else if (OB_FAIL(table_builder.append_row(keys.at(i), datum_row))) {
} else if (OB_FAIL(table_builder.append_row(keys.at(i), bag.at(j)->seq_no_, datum_row))) {
LOG_WARN("fail to append row", KR(ret));
}
}

View File

@ -142,6 +142,7 @@ int ObDirectLoadMultipleSSTableBuilder::init(const ObDirectLoadMultipleSSTableBu
}
int ObDirectLoadMultipleSSTableBuilder::append_row(const ObTabletID &tablet_id,
const table::ObTableLoadSequenceNo &seq_no,
const ObDatumRow &datum_row)
{
int ret = OB_SUCCESS;
@ -157,7 +158,7 @@ int ObDirectLoadMultipleSSTableBuilder::append_row(const ObTabletID &tablet_id,
LOG_WARN("invalid args", KR(ret), K(param_), K(datum_row));
} else {
if (OB_FAIL(row_.from_datums(tablet_id, datum_row.storage_datums_, datum_row.count_,
param_.table_data_desc_.rowkey_column_num_))) {
param_.table_data_desc_.rowkey_column_num_, seq_no))) {
LOG_WARN("fail to from datum row", KR(ret));
} else if (OB_FAIL(append_row(row_))) {
LOG_WARN("fail to append row", KR(ret), K(row_));

View File

@ -41,6 +41,7 @@ public:
virtual ~ObDirectLoadMultipleSSTableBuilder();
int init(const ObDirectLoadMultipleSSTableBuildParam &param);
int append_row(const common::ObTabletID &tablet_id,
const table::ObTableLoadSequenceNo &seq_no,
const blocksstable::ObDatumRow &datum_row) override;
int append_row(const RowType &row);
int close() override;

View File

@ -77,7 +77,7 @@ int ObDirectLoadSSTableBuilder::init(const ObDirectLoadSSTableBuildParam &param)
return ret;
}
int ObDirectLoadSSTableBuilder::append_row(const ObTabletID &tablet_id, const ObDatumRow &datum_row)
int ObDirectLoadSSTableBuilder::append_row(const ObTabletID &tablet_id, const table::ObTableLoadSequenceNo &seq_no, const ObDatumRow &datum_row)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@ -97,7 +97,7 @@ int ObDirectLoadSSTableBuilder::append_row(const ObTabletID &tablet_id, const Ob
if (OB_FAIL(check_rowkey_order(key))) {
LOG_WARN("fail to check rowkey order", KR(ret), K(datum_row));
} else if (OB_FAIL(external_row.from_datums(datum_row.storage_datums_, datum_row.count_,
param_.table_data_desc_.rowkey_column_num_))) {
param_.table_data_desc_.rowkey_column_num_, seq_no))) {
LOG_WARN("fail to from datum row", KR(ret));
} else if (OB_FAIL(data_block_writer_.append_row(external_row))) {
LOG_WARN("fail to append row to data block writer", KR(ret), K(external_row));

View File

@ -161,6 +161,7 @@ public:
virtual ~ObDirectLoadSSTableBuilder() = default;
int init(const ObDirectLoadSSTableBuildParam &param);
int append_row(const common::ObTabletID &tablet_id,
const table::ObTableLoadSequenceNo &seq_no,
const blocksstable::ObDatumRow &datum_row) override;
int append_row(const ObDirectLoadExternalRow &external_row);
int close() override;

View File

@ -148,6 +148,7 @@ int ObDirectLoadTableStoreBucket::init(const ObDirectLoadTableStoreParam &param,
}
int ObDirectLoadTableStoreBucket::append_row(const ObTabletID &tablet_id,
const ObTableLoadSequenceNo &seq_no,
const ObDatumRow &datum_row)
{
OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, table_store_bucket_append_row);
@ -160,7 +161,7 @@ int ObDirectLoadTableStoreBucket::append_row(const ObTabletID &tablet_id,
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(tablet_id), K(datum_row), KPC(param_));
} else {
if (OB_FAIL(table_builder_->append_row(tablet_id, datum_row))) {
if (OB_FAIL(table_builder_->append_row(tablet_id, seq_no, datum_row))) {
LOG_WARN("fail to append row", KR(ret));
}
}
@ -291,7 +292,7 @@ int ObDirectLoadTableStore::get_bucket(const ObTabletID &tablet_id,
return ret;
}
int ObDirectLoadTableStore::append_row(const ObTabletID &tablet_id, const ObDatumRow &datum_row)
int ObDirectLoadTableStore::append_row(const ObTabletID &tablet_id, const ObTableLoadSequenceNo &seq_no, const ObDatumRow &datum_row)
{
OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, table_store_append_row);
OB_TABLE_LOAD_STATISTICS_COUNTER(table_store_row_count);
@ -306,7 +307,7 @@ int ObDirectLoadTableStore::append_row(const ObTabletID &tablet_id, const ObDatu
ObDirectLoadTableStoreBucket *bucket = nullptr;
if (OB_FAIL(get_bucket(tablet_id, bucket))) {
LOG_WARN("fail to get bucket", KR(ret), K(tablet_id));
} else if (OB_FAIL(bucket->append_row(tablet_id, datum_row))) {
} else if (OB_FAIL(bucket->append_row(tablet_id, seq_no, datum_row))) {
LOG_WARN("fail to append row to bucket", KR(ret), K(tablet_id), K(datum_row));
}
}

View File

@ -54,7 +54,7 @@ public:
ObDirectLoadTableStoreBucket();
~ObDirectLoadTableStoreBucket();
int init(const ObDirectLoadTableStoreParam &param, const common::ObTabletID &tablet_id);
int append_row(const common::ObTabletID &tablet_id, const blocksstable::ObDatumRow &datum_row);
int append_row(const common::ObTabletID &tablet_id, const table::ObTableLoadSequenceNo &seq_no, const blocksstable::ObDatumRow &datum_row);
int close();
int get_tables(common::ObIArray<ObIDirectLoadPartitionTable *> &table_array,
common::ObIAllocator &allocator);
@ -74,7 +74,7 @@ public:
ObDirectLoadTableStore() : allocator_("TLD_TSBucket"), is_inited_(false) {}
~ObDirectLoadTableStore();
int init(const ObDirectLoadTableStoreParam &param);
int append_row(const common::ObTabletID &tablet_id, const blocksstable::ObDatumRow &datum_row);
int append_row(const common::ObTabletID &tablet_id, const table::ObTableLoadSequenceNo &seq_no, const blocksstable::ObDatumRow &datum_row);
int close();
void clean_up();
int get_tables(common::ObIArray<ObIDirectLoadPartitionTable *> &table_array,

View File

@ -12,6 +12,7 @@
#include "../unittest/storage/blocksstable/ob_row_generate.h"
#include "observer/table_load/ob_table_load_partition_location.h"
#include "share/ob_simple_mem_limit_getter.h"
#include "share/table/ob_table_load_define.h"
#include "storage/blocksstable/ob_tmp_file.h"
#include "storage/direct_load/ob_direct_load_sstable_scanner.h"
#include "storage/direct_load/ob_direct_load_sstable_compactor.h"
@ -24,6 +25,7 @@ using namespace blocksstable;
using namespace storage;
using namespace share::schema;
using namespace share;
using namespace table;
static ObSimpleMemLimitGetter getter;
@ -264,6 +266,7 @@ TEST_F(TestDataBlockWriter, test_write_and_scan)
ret = file_mgr->init(table_schema_.get_tenant_id());
ObArray<ObColDesc> col_descs;
ObStorageDatumUtils datum_utils;
ObTableLoadSequenceNo seq_no(0);
ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(col_descs));
ret = datum_utils.init(col_descs, rowkey_column_count, lib::is_oracle_mode(), allocator_);
param.tablet_id_ = table_schema_.get_tablet_id();
@ -278,7 +281,7 @@ TEST_F(TestDataBlockWriter, test_write_and_scan)
ASSERT_EQ(OB_SUCCESS, row->init(allocator_, column_num));
ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(*row));
array.push_back(row);
ret = sstable_builder.append_row(table_schema_.get_tablet_id(), *row);
ret = sstable_builder.append_row(table_schema_.get_tablet_id(), seq_no, *row);
ASSERT_EQ(OB_SUCCESS, ret);
}
ret = sstable_builder.close();
@ -404,6 +407,7 @@ TEST_F(TestDataBlockWriter, test_write_and_scan_range)
ObDirectLoadSSTableBuildParam param;
ObArray<ObColDesc> col_descs;
ObStorageDatumUtils datum_utils;
ObTableLoadSequenceNo seq_no(0);
ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(col_descs));
ret = datum_utils.init(col_descs, rowkey_column_count, lib::is_oracle_mode(), allocator_);
param.tablet_id_ = table_schema_.get_tablet_id();
@ -419,7 +423,7 @@ TEST_F(TestDataBlockWriter, test_write_and_scan_range)
ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(*row));
array.push_back(row);
if (i < 5000) {
ret = sstable_builder.append_row(table_schema_.get_tablet_id(), *row);
ret = sstable_builder.append_row(table_schema_.get_tablet_id(), seq_no, *row);
ASSERT_EQ(OB_SUCCESS, ret);
}
}
@ -516,6 +520,7 @@ TEST_F(TestDataBlockWriter, test_scan_less_range)
ObDirectLoadSSTableBuildParam param;
ObArray<ObColDesc> col_descs;
ObStorageDatumUtils datum_utils;
ObTableLoadSequenceNo seq_no(0);
ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(col_descs));
ret = datum_utils.init(col_descs, rowkey_column_count, lib::is_oracle_mode(), allocator_);
param.tablet_id_ = table_schema_.get_tablet_id();
@ -531,7 +536,7 @@ TEST_F(TestDataBlockWriter, test_scan_less_range)
ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(*row));
array.push_back(row);
if (i >= 5000) {
ret = sstable_builder.append_row(table_schema_.get_tablet_id(), *row);
ret = sstable_builder.append_row(table_schema_.get_tablet_id(), seq_no, *row);
ASSERT_EQ(OB_SUCCESS, ret);
}
}
@ -632,6 +637,7 @@ TEST_F(TestDataBlockWriter, test_scan_range)
ObDirectLoadSSTableBuildParam param;
ObArray<ObColDesc> col_descs;
ObStorageDatumUtils datum_utils;
ObTableLoadSequenceNo seq_no(0);
ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(col_descs));
ret = datum_utils.init(col_descs, rowkey_column_count, lib::is_oracle_mode(), allocator_);
param.tablet_id_ = table_schema_.get_tablet_id();
@ -647,7 +653,7 @@ TEST_F(TestDataBlockWriter, test_scan_range)
ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(*row));
array.push_back(row);
if (i < 5000) {
ret = sstable_builder.append_row(table_schema_.get_tablet_id(), *row);
ret = sstable_builder.append_row(table_schema_.get_tablet_id(), seq_no, *row);
ASSERT_EQ(OB_SUCCESS, ret);
}
}
@ -768,6 +774,7 @@ TEST_F(TestDataBlockWriter, test_write_and_scan_large_low)
ObDirectLoadSSTableBuildParam param;
ObArray<ObColDesc> col_descs;
ObStorageDatumUtils datum_utils;
ObTableLoadSequenceNo seq_no(0);
ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(col_descs));
ret = datum_utils.init(col_descs, rowkey_column_count, lib::is_oracle_mode(), allocator_);
param.tablet_id_ = table_schema_.get_tablet_id();
@ -789,7 +796,7 @@ TEST_F(TestDataBlockWriter, test_write_and_scan_large_low)
row->storage_datums_[24].set_string(ObString(value1_size, ptr1));
}
array.push_back(row);
ret = sstable_builder.append_row(table_schema_.get_tablet_id(), *row);
ret = sstable_builder.append_row(table_schema_.get_tablet_id(), seq_no, *row);
ASSERT_EQ(OB_SUCCESS, ret);
}
ret = sstable_builder.close();
@ -912,6 +919,7 @@ TEST_F(TestDataBlockWriter, test_write_and_scan_range_large_low)
ObDirectLoadSSTableBuildParam param;
ObArray<ObColDesc> col_descs;
ObStorageDatumUtils datum_utils;
ObTableLoadSequenceNo seq_no(0);
ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(col_descs));
ret = datum_utils.init(col_descs, rowkey_column_count, lib::is_oracle_mode(), allocator_);
param.tablet_id_ = table_schema_.get_tablet_id();
@ -934,7 +942,7 @@ TEST_F(TestDataBlockWriter, test_write_and_scan_range_large_low)
}
array.push_back(row);
if (i < 5000) {
ret = sstable_builder.append_row(table_schema_.get_tablet_id(), *row);
ret = sstable_builder.append_row(table_schema_.get_tablet_id(), seq_no, *row);
ASSERT_EQ(OB_SUCCESS, ret);
}
}
@ -1010,6 +1018,7 @@ TEST_F(TestDataBlockWriter, test_scan_range_large_low)
ObDirectLoadSSTableBuildParam param;
ObArray<ObColDesc> col_descs;
ObStorageDatumUtils datum_utils;
ObTableLoadSequenceNo seq_no(0);
ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(col_descs));
ret = datum_utils.init(col_descs, rowkey_column_count, lib::is_oracle_mode(), allocator_);
param.tablet_id_ = table_schema_.get_tablet_id();
@ -1032,7 +1041,7 @@ TEST_F(TestDataBlockWriter, test_scan_range_large_low)
}
array.push_back(row);
if (i < 5000) {
ret = sstable_builder.append_row(table_schema_.get_tablet_id(), *row);
ret = sstable_builder.append_row(table_schema_.get_tablet_id(), seq_no, *row);
ASSERT_EQ(OB_SUCCESS, ret);
}
}
@ -1112,6 +1121,7 @@ TEST_F(TestDataBlockWriter, test_write_and_compact)
ObDirectLoadSSTableBuildParam param;
ObArray<ObColDesc> col_descs;
ObStorageDatumUtils datum_utils;
ObTableLoadSequenceNo seq_no(0);
ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(col_descs));
ret = datum_utils.init(col_descs, rowkey_column_count, lib::is_oracle_mode(), allocator_);
param.tablet_id_ = table_schema_.get_tablet_id();
@ -1130,7 +1140,7 @@ TEST_F(TestDataBlockWriter, test_write_and_compact)
ASSERT_EQ(OB_SUCCESS, row->init(allocator_, column_num));
ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(*row));
array1.push_back(row);
ret = sstable_builder1.append_row(table_schema_.get_tablet_id(), *row);
ret = sstable_builder1.append_row(table_schema_.get_tablet_id(), seq_no, *row);
ASSERT_EQ(OB_SUCCESS, ret);
}
for (int64_t i = 0; i < test_row_num; ++i) {
@ -1138,7 +1148,7 @@ TEST_F(TestDataBlockWriter, test_write_and_compact)
ASSERT_EQ(OB_SUCCESS, row->init(allocator_, column_num));
ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(*row));
array2.push_back(row);
ret = sstable_builder2.append_row(table_schema_.get_tablet_id(), *row);
ret = sstable_builder2.append_row(table_schema_.get_tablet_id(), seq_no, *row);
ASSERT_EQ(OB_SUCCESS, ret);
}
@ -1310,6 +1320,7 @@ TEST_F(TestDataBlockWriter, test_write_and_compact_large_row)
ObDirectLoadSSTableBuildParam param;
ObArray<ObColDesc> col_descs;
ObStorageDatumUtils datum_utils;
ObTableLoadSequenceNo seq_no(0);
ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(col_descs));
ret = datum_utils.init(col_descs, rowkey_column_count, lib::is_oracle_mode(), allocator_);
param.tablet_id_ = table_schema_.get_tablet_id();
@ -1335,7 +1346,7 @@ TEST_F(TestDataBlockWriter, test_write_and_compact_large_row)
row->storage_datums_[24].set_string(ObString(value1_size, ptr1));
}
array1.push_back(row);
ret = sstable_builder1.append_row(table_schema_.get_tablet_id(), *row);
ret = sstable_builder1.append_row(table_schema_.get_tablet_id(), seq_no, *row);
ASSERT_EQ(OB_SUCCESS, ret);
}
for (int64_t i = 0; i < test_row_num; ++i) {
@ -1350,7 +1361,7 @@ TEST_F(TestDataBlockWriter, test_write_and_compact_large_row)
row->storage_datums_[24].set_string(ObString(value1_size, ptr1));
}
array2.push_back(row);
ret = sstable_builder2.append_row(table_schema_.get_tablet_id(), *row);
ret = sstable_builder2.append_row(table_schema_.get_tablet_id(), seq_no, *row);
ASSERT_EQ(OB_SUCCESS, ret);
}