fix duplicated logic id
This commit is contained in:
@ -205,14 +205,6 @@ int ObPxMultiPartSSTableInsertOp::inner_get_next_row()
|
|||||||
write_sstable_param.task_cnt_ = ctx_.get_sqc_handler()->get_sqc_ctx().get_task_count();
|
write_sstable_param.task_cnt_ = ctx_.get_sqc_handler()->get_sqc_ctx().get_task_count();
|
||||||
write_sstable_param.schema_version_ = MY_SPEC.plan_->get_ddl_schema_version();
|
write_sstable_param.schema_version_ = MY_SPEC.plan_->get_ddl_schema_version();
|
||||||
write_sstable_param.execution_id_ = MY_SPEC.plan_->get_ddl_execution_id();
|
write_sstable_param.execution_id_ = MY_SPEC.plan_->get_ddl_execution_id();
|
||||||
const ObTabletCacheInterval *curr_tablet_seq_cache =
|
|
||||||
count_rows_finish_ && curr_tablet_idx_ < tablet_seq_caches_.count() ? &tablet_seq_caches_.at(curr_tablet_idx_) : nullptr;
|
|
||||||
int64_t parallel_idx = curr_tablet_seq_cache ? curr_tablet_seq_cache->task_id_ : ctx_.get_px_task_id();
|
|
||||||
if (OB_FAIL(block_start_seq.set_parallel_degree(parallel_idx))) {
|
|
||||||
LOG_WARN("set parallel index failed", K(ret), K(parallel_idx));
|
|
||||||
}
|
|
||||||
FLOG_INFO("update ddl parallel id", K(ret), K(parallel_idx), K(block_start_seq), K(ctx_.get_px_task_id()),
|
|
||||||
K(count_rows_finish_), K(curr_tablet_idx_), K(tablet_seq_caches_.count()), KPC(curr_tablet_seq_cache));
|
|
||||||
}
|
}
|
||||||
while (OB_SUCC(ret) && notify_idx < tablet_ids.count()) {
|
while (OB_SUCC(ret) && notify_idx < tablet_ids.count()) {
|
||||||
ObTabletID ¬ify_tablet_id = tablet_ids.at(notify_idx);
|
ObTabletID ¬ify_tablet_id = tablet_ids.at(notify_idx);
|
||||||
@ -227,7 +219,14 @@ int ObPxMultiPartSSTableInsertOp::inner_get_next_row()
|
|||||||
write_sstable_param.tablet_id_ = row_tablet_id;
|
write_sstable_param.tablet_id_ = row_tablet_id;
|
||||||
int64_t affected_rows = 0;
|
int64_t affected_rows = 0;
|
||||||
ObSSTableInsertRowIterator row_iter(ctx_, this);
|
ObSSTableInsertRowIterator row_iter(ctx_, this);
|
||||||
if (OB_FAIL(ObSSTableInsertManager::get_instance().add_sstable_slice(
|
const ObTabletCacheInterval *curr_tablet_seq_cache =
|
||||||
|
count_rows_finish_ && curr_tablet_idx_ < tablet_seq_caches_.count() ? &tablet_seq_caches_.at(curr_tablet_idx_) : nullptr;
|
||||||
|
int64_t parallel_idx = curr_tablet_seq_cache ? curr_tablet_seq_cache->task_id_ : ctx_.get_px_task_id();
|
||||||
|
FLOG_INFO("update ddl parallel id", K(ret), K(parallel_idx), K(ctx_.get_px_task_id()),
|
||||||
|
K(count_rows_finish_), K(curr_tablet_idx_), K(tablet_seq_caches_.count()), KPC(curr_tablet_seq_cache));
|
||||||
|
if (OB_FAIL(block_start_seq.set_parallel_degree(parallel_idx))) {
|
||||||
|
LOG_WARN("set parallel index failed", K(ret), K(parallel_idx));
|
||||||
|
} else if (OB_FAIL(ObSSTableInsertManager::get_instance().add_sstable_slice(
|
||||||
write_sstable_param, block_start_seq, row_iter, affected_rows))) {
|
write_sstable_param, block_start_seq, row_iter, affected_rows))) {
|
||||||
if (OB_ITER_END != ret) {
|
if (OB_ITER_END != ret) {
|
||||||
LOG_WARN("failed to write sstable rows to storage layer", K(ret),
|
LOG_WARN("failed to write sstable rows to storage layer", K(ret),
|
||||||
|
Reference in New Issue
Block a user