[CP] Fix direct load px write not master
This commit is contained in:
@ -961,6 +961,29 @@ int ObTableLoadStore::px_finish_trans(const ObTableLoadTransId &trans_id)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadStore::px_check_for_write(const ObTabletID &tablet_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this));
|
||||
} else {
|
||||
bool is_exist = false;
|
||||
for (int64_t i = 0; i < store_ctx_->ls_partition_ids_.count(); ++i) {
|
||||
const ObTableLoadLSIdAndPartitionId &ls_part_id = store_ctx_->ls_partition_ids_.at(i);
|
||||
if (ls_part_id.part_tablet_id_.tablet_id_ == tablet_id) {
|
||||
is_exist = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (OB_UNLIKELY(!is_exist)) {
|
||||
ret = OB_NOT_MASTER;
|
||||
LOG_WARN("not partition master", KR(ret), K(tablet_id), K(store_ctx_->ls_partition_ids_));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadStore::px_write(const ObTableLoadTransId &trans_id,
|
||||
const ObTabletID &tablet_id, const ObIArray<ObNewRow> &row_array)
|
||||
{
|
||||
|
||||
@ -85,6 +85,7 @@ private:
|
||||
public:
|
||||
int px_start_trans(const table::ObTableLoadTransId &trans_id);
|
||||
int px_finish_trans(const table::ObTableLoadTransId &trans_id);
|
||||
int px_check_for_write(const ObTabletID &tablet_id);
|
||||
int px_write(const table::ObTableLoadTransId &trans_id,
|
||||
const ObTabletID &tablet_id,
|
||||
const common::ObIArray<common::ObNewRow> &row_array);
|
||||
|
||||
@ -885,7 +885,11 @@ int ObDirectLoadMultipleMergeRangeSplitter::get_rowkeys_by_multiple(
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && nullptr != last_rowkey_) {
|
||||
if (last_rowkey_->tablet_id_.compare(tablet_id) == 0) {
|
||||
const int cmp_ret = last_rowkey_->tablet_id_.compare(tablet_id);
|
||||
if (OB_UNLIKELY(cmp_ret < 0)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected tablet id", KR(ret), KPC(last_rowkey_), K(tablet_id));
|
||||
} else if (cmp_ret == 0) {
|
||||
// the same tablet
|
||||
ObDatumRowkey rowkey;
|
||||
ObDatumRowkey copied_rowkey;
|
||||
@ -916,9 +920,8 @@ int ObDirectLoadMultipleMergeRangeSplitter::get_rowkeys_by_multiple(
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// the different tablet
|
||||
abort_unless(last_rowkey_->tablet_id_.compare(tablet_id) > 0);
|
||||
} else { /* cmp_ret > 0 */
|
||||
// no rowkey for current tablet_id, do nothing
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
||||
@ -2697,6 +2697,8 @@ int ObLSTabletService::direct_insert_rows(
|
||||
ObTableLoadStore store(table_ctx);
|
||||
if (OB_FAIL(store.init())) {
|
||||
LOG_WARN("fail to init store", KR(ret));
|
||||
} else if (OB_FAIL(store.px_check_for_write(tablet_id))) {
|
||||
LOG_WARN("fail to check for write", KR(ret), K(tablet_id));
|
||||
}
|
||||
|
||||
while (OB_SUCC(ret) && OB_SUCC(get_next_rows(row_iter, rows, row_count))) {
|
||||
|
||||
Reference in New Issue
Block a user