fix bug, set ddl major sstable meta block start seq as slice_cnt * step size

This commit is contained in:
AnimationFan 2024-12-19 10:48:35 +00:00 committed by ob-robot
parent efc750aa8b
commit c4ea691231
10 changed files with 111 additions and 12 deletions

View File

@ -2528,6 +2528,56 @@ int64_t ObDDLUtil::get_default_ddl_tx_timeout()
}
/*
* return the map between tablet id & slice cnt;
* note that pair <0, 0> may exist when result is not partition table
*/
int ObDDLUtil::get_task_tablet_slice_count(const int64_t tenant_id, const int64_t ddl_task_id, bool &is_partitioned_table, common::hash::ObHashMap<int64_t, int64_t> &tablet_slice_cnt_map)
{
int ret = OB_SUCCESS;
bool use_idem_mode = false;
rootserver::ObDDLSliceInfo ddl_slice_info;
ObMySQLProxy *sql_proxy = GCTX.sql_proxy_;
ObArenaAllocator arena(ObMemAttr(tenant_id, "get_slice_info"));
bool is_use_idem_mode = false;
is_partitioned_table = true;
if (OB_ISNULL(sql_proxy)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql proxy is null", K(ret));
} else if (OB_FAIL(rootserver::ObDDLTaskRecordOperator::get_schedule_info_for_update(
*sql_proxy, tenant_id, ddl_task_id, arena, ddl_slice_info, use_idem_mode))) {
LOG_WARN("fail to get schedule info", K(ret), K(tenant_id), K(ddl_task_id));
} else {
for (int64_t i = 0; i < ddl_slice_info.part_ranges_.count() && OB_SUCC(ret); i++) {
int64_t tablet_slice_cnt = 0;
const ObPxTabletRange &cur_part_range = ddl_slice_info.part_ranges_.at(i);
const int64_t cur_tablet_id = cur_part_range.tablet_id_;
if (0 == cur_tablet_id && 1 == ddl_slice_info.part_ranges_.count()) {
is_partitioned_table = false;
}
if (OB_FAIL(tablet_slice_cnt_map.get_refactored(cur_tablet_id, tablet_slice_cnt))) {
if (OB_HASH_NOT_EXIST == ret) {
ret = OB_SUCCESS;
if (OB_FAIL(tablet_slice_cnt_map.set_refactored(cur_tablet_id, 0))) {
LOG_WARN("failed to set refactor", K(ret));
}
} else {
LOG_WARN("failed to get slice cnt", K(ret));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(tablet_slice_cnt_map.set_refactored(cur_tablet_id, tablet_slice_cnt + cur_part_range.range_cut_.count(), 1 /* over write*/))) {
LOG_WARN("failed to set slice cnt", K(ret), K(tablet_slice_cnt), K( cur_part_range.range_cut_.count()));
}
}
}
return ret;
}
int ObDDLUtil::get_data_information(
const uint64_t tenant_id,
const uint64_t task_id,

View File

@ -762,6 +762,8 @@ public:
static int64_t get_default_ddl_rpc_timeout();
static int64_t get_default_ddl_tx_timeout();
static int get_task_tablet_slice_count(const int64_t tenant_id, const int64_t task_id, bool &is_partition_table, common::hash::ObHashMap<int64_t, int64_t> &tablet_slice_count_map);
static int get_data_information(
const uint64_t tenant_id,
const uint64_t task_id,

View File

@ -166,6 +166,7 @@ int ObPxMultiPartSSTableInsertOp::inner_get_next_row()
ObSqlCtx *sql_ctx = NULL;
int64_t notify_idx = 0;
int64_t unused_row_scan_cnt = 0;
int64_t ddl_task_id = MY_SPEC.plan_->get_ddl_task_id();
ObInsertMonitor insert_monitor(unused_row_scan_cnt, op_monitor_info_.otherstat_2_value_, op_monitor_info_.otherstat_1_value_);
#ifdef ERRSIM
if (OB_SUCC(ret)) {
@ -228,6 +229,19 @@ int ObPxMultiPartSSTableInsertOp::inner_get_next_row()
} else if (need_idempotent_autoinc_val && OB_FAIL(build_table_slice_info())) {
LOG_WARN("fail to build table slice info", K(ret));
}
/* get tablet slice info */
bool is_partition_table = true;
common::hash::ObHashMap<int64_t, int64_t> tablet_slice_cnt_map;
if (OB_FAIL(ret)) {
} else if (!is_shared_storage_dempotent_mode(ddl_ctrl.direct_load_type_)) {
} else if (OB_FAIL(tablet_slice_cnt_map.create(participants_.count(), "SliceInfoM",
ObModIds::OB_HASH_NODE, tenant_id))) {
LOG_WARN("fail to create hash table", K(ret));
} else if (OB_FAIL(ObDDLUtil::get_task_tablet_slice_count(MTL_ID(), ddl_task_id, is_partition_table, tablet_slice_cnt_map))) {
LOG_WARN("failed to get tablet slice count", K(ret), K(ddl_task_id), K(MTL_ID()));
}
for (notify_idx = 0; OB_SUCC(ret) && notify_idx < participants_.count();) {
clear_evaluated_flag();
bool is_current_slice_empty = false;
@ -247,7 +261,19 @@ int ObPxMultiPartSSTableInsertOp::inner_get_next_row()
count_rows_finish_ && curr_tablet_idx_ < tablet_seq_caches_.count() && curr_tablet_idx_ >= 0 ?
&tablet_seq_caches_.at(curr_tablet_idx_) : nullptr;
ObDirectLoadMgrAgent ddl_agent;
if (OB_FAIL(ddl_agent.init(slice_info.context_id_, slice_info.ls_id_, slice_info.data_tablet_id_, ddl_ctrl.direct_load_type_))) {
if (!is_shared_storage_dempotent_mode(ddl_ctrl.direct_load_type_)) {
} else if (MY_SPEC.regenerate_heap_table_pk_) {
slice_info.total_slice_cnt_ = ctx_.get_sqc_handler()->get_sqc_ctx().get_task_count();
} else if (OB_FAIL(tablet_slice_cnt_map.get_refactored(is_partition_table ? slice_info.data_tablet_id_.id() : 0, slice_info.total_slice_cnt_))) {
LOG_WARN("failed to get tablet slice cnt", K(ret), K(is_partition_table), K(slice_info.data_tablet_id_));
} else if (slice_info.total_slice_cnt_ < 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("slice cnt should not less than 0", K(ret), K(is_partition_table), K(slice_info.data_tablet_id_));
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ddl_agent.init(slice_info.context_id_, slice_info.ls_id_, slice_info.data_tablet_id_, ddl_ctrl.direct_load_type_))) {
LOG_WARN("init agent failed", K(ret), K(slice_info));
} else if (all_slices_empty || is_all_partition_finished_) {
is_current_slice_empty = true;
@ -291,7 +317,7 @@ int ObPxMultiPartSSTableInsertOp::inner_get_next_row()
ObDDLInsertRowIterator row_iter;
if (OB_FAIL(row_iter.init(tenant_id, ddl_agent, &slice_row_iter, notify_ls_id, notify_tablet_id,
slice_info.context_id_, tablet_slice_param,
table_schema->get_lob_columns_count(), is_vec_data_complement_))) {
table_schema->get_lob_columns_count(), slice_info.total_slice_cnt_, is_vec_data_complement_))) {
LOG_WARN("init ddl insert rot iterator failed", K(ret));
} else if (OB_FAIL(ddl_agent.fill_sstable_slice(slice_info, &row_iter, affected_rows,
&insert_monitor))) {

View File

@ -458,6 +458,8 @@ int ObComplementDataContext::init(
direct_load_param.runtime_only_param_.table_id_ = param.dest_table_id_;
direct_load_param.runtime_only_param_.schema_version_ = param.dest_schema_version_;
direct_load_param.runtime_only_param_.task_cnt_ = param.concurrent_cnt_; // real slice count.
total_slice_cnt_ = param.ranges_.count();
if (OB_ISNULL(tenant_direct_load_mgr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected err", K(ret));
@ -1339,12 +1341,13 @@ int ObComplementWriteTask::append_row(ObScan *scan)
slice_info.ls_id_ = param_->dest_ls_id_;
slice_info.data_tablet_id_ = param_->dest_tablet_id_;
slice_info.context_id_ = context_->context_id_;
slice_info.total_slice_cnt_ = context_->total_slice_cnt_;
ObInsertMonitor insert_monitor(context_->row_scanned_, context_->row_inserted_, context_->cg_row_inserted_);
ObDDLInsertRowIterator row_iter;
ObTabletSliceParam tablet_slice_param(context_->concurrent_cnt_, task_id_);
if (OB_FAIL(row_iter.init(param_->orig_tenant_id_, context_->ddl_agent_, scan,
param_->dest_ls_id_, param_->dest_tablet_id_, context_->context_id_, tablet_slice_param, context_->lob_cols_cnt_))) {
LOG_WARN("init ddl insert row iterator failed", K(ret));
param_->dest_ls_id_, param_->dest_tablet_id_, context_->context_id_, tablet_slice_param, context_->lob_cols_cnt_, context_->total_slice_cnt_))) {
LOG_WARN("init ddl insert row iterator failed", K(ret), K(context_->total_slice_cnt_));
} else if (OB_FAIL(context_->ddl_agent_.open_sstable_slice(macro_start_seq, slice_info))) {
LOG_WARN("open slice failed", K(ret), K(macro_start_seq), K(slice_info));
} else if (OB_FAIL(context_->ddl_agent_.fill_sstable_slice(slice_info, &row_iter, affected_rows, &insert_monitor))) {

View File

@ -166,7 +166,7 @@ public:
is_inited_(false), ddl_agent_(), direct_load_type_(ObDirectLoadType::DIRECT_LOAD_INVALID),
is_major_sstable_exist_(false), complement_data_ret_(common::OB_SUCCESS),
lock_(ObLatchIds::COMPLEMENT_DATA_CONTEXT_LOCK), concurrent_cnt_(0),
physical_row_count_(0), row_scanned_(0), row_inserted_(0), cg_row_inserted_(0), context_id_(0), lob_cols_cnt_(0)
physical_row_count_(0), row_scanned_(0), row_inserted_(0), cg_row_inserted_(0), context_id_(0), lob_cols_cnt_(0), total_slice_cnt_(-1)
{}
~ObComplementDataContext() { destroy(); }
int init(
@ -177,7 +177,7 @@ public:
int add_column_checksum(const ObIArray<int64_t> &report_col_checksums, const ObIArray<int64_t> &report_col_ids);
int get_column_checksum(ObIArray<int64_t> &report_col_checksums, ObIArray<int64_t> &report_col_ids);
TO_STRING_KV(K_(is_inited), K_(ddl_agent), K_(direct_load_type), K_(is_major_sstable_exist), K_(complement_data_ret), K_(concurrent_cnt),
K_(physical_row_count), K_(row_scanned), K_(row_inserted), K_(cg_row_inserted), K_(context_id), K_(lob_cols_cnt));
K_(physical_row_count), K_(row_scanned), K_(row_inserted), K_(cg_row_inserted), K_(context_id), K_(lob_cols_cnt), K_(total_slice_cnt));
public:
bool is_inited_;
ObDirectLoadMgrAgent ddl_agent_;
@ -194,6 +194,7 @@ public:
int64_t lob_cols_cnt_;
ObArray<int64_t> report_col_checksums_;
ObArray<int64_t> report_col_ids_;
int64_t total_slice_cnt_;
};
class ObComplementPrepareTask;

View File

@ -372,6 +372,7 @@ public:
blocksstable::ObMacroDataSeq &next_seq);
virtual int update_max_lob_id(const int64_t lob_id) { UNUSED(lob_id); return common::OB_SUCCESS; }
virtual int set_total_slice_cnt(const int64_t slice_cnt) { UNUSED(slice_cnt); return OB_NOT_SUPPORTED;}
// for ref_cnt
void inc_ref() { ATOMIC_INC(&ref_cnt_); }

View File

@ -181,6 +181,8 @@ int ObDirectLoadMgrAgent::open_sstable_slice_for_ss(
LOG_WARN("error sys", K(ret));
} else if (OB_FAIL(mgr_handle_.get_obj()->open_sstable_slice(slice_info.is_lob_slice_, start_seq, slice_info.context_id_, slice_info.slice_id_))) {
LOG_WARN("open sstable slice failed", K(ret), K(start_seq), K(slice_info));
} else if (OB_FAIL(mgr_handle_.get_obj()->set_total_slice_cnt(slice_info.total_slice_cnt_))) {
LOG_WARN("failed to set total slice cnt for direct load mgr", K(ret));
}
return ret;
}

View File

@ -182,7 +182,8 @@ ObDDLInsertRowIterator::ObDDLInsertRowIterator()
lob_allocator_(ObModIds::OB_LOB_ACCESS_BUFFER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
lob_slice_id_(0),
lob_cols_cnt_(0),
is_skip_lob_(false)
is_skip_lob_(false),
total_slice_cnt_(-1)
{
lob_id_cache_.set(1/*start*/, 0/*end*/);
}
@ -200,6 +201,7 @@ int ObDDLInsertRowIterator::init(
const int64_t context_id,
const ObTabletSliceParam &tablet_slice_param,
const int64_t lob_cols_cnt,
const int64_t total_slice_cnt,
const bool is_skip_lob)
{
int ret = OB_SUCCESS;
@ -212,9 +214,10 @@ int ObDDLInsertRowIterator::init(
|| !tablet_id.is_valid()
|| context_id < 0
// no need check tablet slice param, invalid when slice empty
|| lob_cols_cnt < 0)) {
|| lob_cols_cnt < 0
|| (is_shared_storage_dempotent_mode(agent.get_direct_load_type()) && total_slice_cnt < 0))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(source_tenant_id), KP(slice_row_iter), K(ls_id), K(tablet_id), K(context_id), K(tablet_slice_param), K(lob_cols_cnt));
LOG_WARN("invalid argument", K(ret), K(source_tenant_id), KP(slice_row_iter), K(ls_id), K(tablet_id), K(context_id), K(tablet_slice_param), K(lob_cols_cnt), K(total_slice_cnt));
} else if (lob_cols_cnt > 0 && tablet_slice_param.is_valid()
&& OB_FAIL(lob_id_generator_.init(tablet_slice_param.slice_idx_ * ObTabletSliceParam::LOB_ID_SEQ_INTERVAL, // start
ObTabletSliceParam::LOB_ID_SEQ_INTERVAL, // interval
@ -230,6 +233,9 @@ int ObDDLInsertRowIterator::init(
const int64_t parallel_idx = tablet_slice_param.slice_idx_ >= 0 ? tablet_slice_param.slice_idx_ : 0;
is_skip_lob_ = is_skip_lob;
is_inited_ = true;
if ((is_shared_storage_dempotent_mode(ddl_agent_->get_direct_load_type()))) {
total_slice_cnt_ = total_slice_cnt;
}
if (OB_FAIL(macro_seq_.set_parallel_degree(parallel_idx))) {
LOG_WARN("set failed", K(ret), K(parallel_idx));
#ifdef OB_BUILD_SHARED_STORAGE
@ -343,6 +349,9 @@ int ObDDLInsertRowIterator::switch_to_new_lob_slice()
slice_info.data_tablet_id_ = current_tablet_id_;
slice_info.slice_id_ = lob_slice_id_;
slice_info.context_id_ = context_id_;
if (is_shared_storage_dempotent_mode(ddl_agent_->get_direct_load_type())) {
slice_info.total_slice_cnt_ = total_slice_cnt_;
}
ObTabletAutoincrementService &auto_inc = ObTabletAutoincrementService::get_instance();
ObTabletID lob_meta_tablet_id;
int64_t CACHE_SIZE_REQUESTED = AUTO_INC_CACHE_SIZE;

View File

@ -144,11 +144,11 @@ struct ObDirectLoadSliceInfo final
public:
ObDirectLoadSliceInfo()
: is_full_direct_load_(false), is_lob_slice_(false), ls_id_(), data_tablet_id_(), slice_id_(-1),
context_id_(0), src_tenant_id_(MTL_ID()), is_task_finish_(false)
context_id_(0), src_tenant_id_(MTL_ID()), is_task_finish_(false), total_slice_cnt_(-1)
{ }
~ObDirectLoadSliceInfo() = default;
bool is_valid() const { return ls_id_.is_valid() && data_tablet_id_.is_valid() && slice_id_ >= 0 && context_id_ >= 0 && src_tenant_id_ > 0; }
TO_STRING_KV(K_(is_full_direct_load), K_(is_lob_slice), K_(ls_id), K_(data_tablet_id), K_(slice_id), K_(context_id), K_(src_tenant_id), K_(is_task_finish));
TO_STRING_KV(K_(is_full_direct_load), K_(is_lob_slice), K_(ls_id), K_(data_tablet_id), K_(slice_id), K_(context_id), K_(src_tenant_id), K_(is_task_finish), K_(total_slice_cnt));
public:
bool is_full_direct_load_;
bool is_lob_slice_;
@ -158,6 +158,7 @@ public:
int64_t context_id_;
uint64_t src_tenant_id_;
bool is_task_finish_;
int64_t total_slice_cnt_;
DISALLOW_COPY_AND_ASSIGN(ObDirectLoadSliceInfo);
};
@ -383,6 +384,7 @@ public:
const int64_t context_id,
const ObTabletSliceParam &tablet_slice_param,
const int64_t lob_cols_cnt,
const int64_t total_slice_cnt,
const bool is_skip_lob = false);
virtual int get_next_row(const blocksstable::ObDatumRow *&row) override
{
@ -391,7 +393,7 @@ public:
}
int get_next_row(const bool skip_lob, const blocksstable::ObDatumRow *&row) override;
TO_STRING_KV(K_(is_inited), K_(ls_id), K_(current_tablet_id), K_(context_id), K_(macro_seq),
K_(lob_id_generator), K_(lob_id_cache), K_(lob_slice_id), K_(lob_cols_cnt), K_(is_skip_lob));
K_(lob_id_generator), K_(lob_id_cache), K_(lob_slice_id), K_(lob_cols_cnt), K_(is_skip_lob), K_(total_slice_cnt));
public:
int switch_to_new_lob_slice();
int close_lob_sstable_slice();
@ -413,6 +415,7 @@ private:
int64_t lob_slice_id_;
int64_t lob_cols_cnt_;
bool is_skip_lob_;
int64_t total_slice_cnt_;
};
class ObLobMetaRowIterator : public ObIStoreRowIterator

View File

@ -658,6 +658,7 @@ int ObDirectLoadInsertTabletContext::open_sstable_slice(const ObMacroDataSeq &st
slice_info.data_tablet_id_ = tablet_id_;
slice_info.slice_id_ = slice_id;
slice_info.context_id_ = context_id_;
slice_info.total_slice_cnt_ = param_->parallel_; //mock total slice cnt
if (OB_FAIL(open())) {
LOG_WARN("fail to open tablet direct load", KR(ret));
} else if (OB_FAIL(ddl_agent_.open_sstable_slice(start_seq, slice_info))) {
@ -687,6 +688,7 @@ int ObDirectLoadInsertTabletContext::open_lob_sstable_slice(const ObMacroDataSeq
slice_info.data_tablet_id_ = tablet_id_;
slice_info.slice_id_ = slice_id;
slice_info.context_id_ = context_id_;
slice_info.total_slice_cnt_ = param_->parallel_; //mock total slice cnt
if (OB_FAIL(open())) {
LOG_WARN("fail to open tablet direct load", KR(ret));
} else if (OB_FAIL(ddl_agent_.open_sstable_slice(start_seq, slice_info))) {