direct load replace sematics bug
This commit is contained in:
@ -531,8 +531,9 @@ int ObTableDirectLoadInsertExecutor::set_batch_seq_no(int64_t batch_id,
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < obj_row_array.count(); ++i) {
|
||||
ObTableLoadObjRow &row = obj_row_array.at(i);
|
||||
row.seq_no_.batch_id_ = batch_id;
|
||||
row.seq_no_.batch_seq_no_ = i;
|
||||
row.seq_no_.sequence_no_ = batch_id;
|
||||
row.seq_no_.sequence_no_ <<= ObTableLoadSequenceNo::BATCH_ID_SHIFT;
|
||||
row.seq_no_.sequence_no_ |= i;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
||||
@ -438,32 +438,20 @@ public:
|
||||
|
||||
struct ObTableLoadSequenceNo
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
static const uint64_t MAX_DATA_ID = (1LL << 16) - 1;
|
||||
static const uint64_t MAX_CHUNK_ID = (1LL << 32) - 1;
|
||||
static const uint64_t MAX_BATCH_ID = (1LL << 48) - 1;
|
||||
static const uint64_t MAX_DATA_SEQ_NO = (1LL << 48) - 1;
|
||||
static const uint64_t MAX_CHUNK_SEQ_NO = (1LL << 32) - 1;
|
||||
static const uint64_t MAX_BATCH_SEQ_NO = (1LL << 16) - 1;
|
||||
union {
|
||||
// multi file
|
||||
struct {
|
||||
uint64_t data_id_ : 16;
|
||||
uint64_t data_seq_no_ : 48;
|
||||
};
|
||||
// single file
|
||||
struct {
|
||||
uint64_t chunk_id_ : 32;
|
||||
uint64_t chunk_seq_no_ : 32;
|
||||
};
|
||||
// client
|
||||
struct {
|
||||
uint64_t batch_id_ : 48;
|
||||
uint64_t batch_seq_no_ : 16;
|
||||
};
|
||||
uint64_t sequence_no_;
|
||||
};
|
||||
static const int32_t DATA_ID_SHIFT = 48; // multi file sequence_no_ = [data_id << 48 | data_seq_no]
|
||||
static const int32_t CHUNK_ID_SHIFT = 32; // single file sequence_no_ = [chunk_id << 32 | chunk_seq_no]
|
||||
static const int32_t BATCH_ID_SHIFT = 16; // java client sequence_no_ = [batch_id << 16 | batch_seq_no]
|
||||
|
||||
static const uint64_t MAX_DATA_ID = (1LL << (64 - DATA_ID_SHIFT)) - 1;
|
||||
static const uint64_t MAX_CHUNK_ID = (1LL << (64 - CHUNK_ID_SHIFT)) - 1;
|
||||
static const uint64_t MAX_BATCH_ID = (1LL << (64 - BATCH_ID_SHIFT)) - 1;
|
||||
static const uint64_t MAX_DATA_SEQ_NO = (1LL << DATA_ID_SHIFT) - 1;
|
||||
static const uint64_t MAX_CHUNK_SEQ_NO = (1LL << CHUNK_ID_SHIFT) - 1;
|
||||
static const uint64_t MAX_BATCH_SEQ_NO = (1LL << BATCH_ID_SHIFT) - 1;
|
||||
uint64_t sequence_no_;
|
||||
|
||||
ObTableLoadSequenceNo() : sequence_no_(OB_INVALID_ID) {}
|
||||
ObTableLoadSequenceNo(uint64_t sequence_no) { sequence_no_ = sequence_no; }
|
||||
void reset() { sequence_no_ = OB_INVALID_ID; }
|
||||
@ -494,7 +482,7 @@ public:
|
||||
sequence_no_--;
|
||||
return tmp;
|
||||
}
|
||||
TO_STRING_KV(K_(sequence_no), K_(data_id), K_(data_seq_no), K_(chunk_id), K_(chunk_seq_no), K_(batch_id), K_(batch_seq_no));
|
||||
TO_STRING_KV(K_(sequence_no));
|
||||
};
|
||||
|
||||
} // namespace table
|
||||
|
||||
@ -49,7 +49,13 @@ public:
|
||||
{
|
||||
return seq_no_;
|
||||
}
|
||||
TO_STRING_KV(K_(count));
|
||||
|
||||
int64_t to_string(char* buf, const int64_t buf_len) const {
|
||||
int64_t pos = 0;
|
||||
databuff_printf(buf, buf_len, pos, "seq=%lu ", seq_no_.sequence_no_);
|
||||
databuff_print_obj_array(buf, buf_len, pos, cells_, count_);
|
||||
return pos;
|
||||
}
|
||||
|
||||
private:
|
||||
static int allocate_cells(T *&cells, int64_t count,
|
||||
|
||||
@ -1524,8 +1524,8 @@ int ObLoadDataDirectImpl::LargeFileLoadExecutor::get_next_task_handle(TaskHandle
|
||||
handle->data_desc_ = data_desc_;
|
||||
handle->start_line_no_ = total_line_no_ ;
|
||||
handle->result_.created_ts_ = ObTimeUtil::current_time();
|
||||
handle->sequence_no_.chunk_id_ = chunk_id;
|
||||
handle->sequence_no_.chunk_seq_no_ = 0;
|
||||
handle->sequence_no_.sequence_no_ = chunk_id;
|
||||
handle->sequence_no_.sequence_no_ <<= ObTableLoadSequenceNo::CHUNK_ID_SHIFT;
|
||||
handle->data_buffer_.swap(expr_buffer_);
|
||||
handle->data_buffer_.is_end_file_ = data_reader_.is_end_file();
|
||||
total_line_no_ += current_line_count;
|
||||
@ -1792,8 +1792,8 @@ int ObLoadDataDirectImpl::MultiFilesLoadExecutor::get_next_task_handle(TaskHandl
|
||||
handle->data_desc_ = data_desc;
|
||||
handle->start_line_no_ = 0;
|
||||
handle->result_.created_ts_ = ObTimeUtil::current_time();
|
||||
handle->sequence_no_.data_id_ = data_id;
|
||||
handle->sequence_no_.data_seq_no_ = 0;
|
||||
handle->sequence_no_.sequence_no_ = data_id;
|
||||
handle->sequence_no_.sequence_no_ <<= ObTableLoadSequenceNo::DATA_ID_SHIFT;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user