fix direct load coordinator write lock contention

This commit is contained in:
suz-yang
2024-08-23 10:47:18 +00:00
committed by ob-robot
parent 8f7ce6d472
commit 058c8edbae
7 changed files with 103 additions and 133 deletions

View File

@ -69,7 +69,9 @@ ObLoadDataDirectImpl::LoadExecuteParam::LoadExecuteParam()
ignore_row_num_(-1),
dup_action_(ObLoadDupActionType::LOAD_INVALID_MODE),
method_(ObDirectLoadMethod::INVALID_METHOD),
insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE)
insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE),
compressor_type_(ObCompressorType::INVALID_COMPRESSOR),
online_sample_percent_(100.)
{
column_ids_.set_tenant_id(MTL_ID());
}
@ -92,7 +94,8 @@ bool ObLoadDataDirectImpl::LoadExecuteParam::is_valid() const
(storage::ObDirectLoadInsertMode::INC_REPLACE == insert_mode_
? sql::ObLoadDupActionType::LOAD_REPLACE == dup_action_
: true) &&
data_access_param_.is_valid() && !column_ids_.empty();
data_access_param_.is_valid() && !column_ids_.empty() &&
ObCompressorType::INVALID_COMPRESSOR != compressor_type_;
}
/**
@ -2326,6 +2329,8 @@ int ObLoadDataDirectImpl::init_execute_param()
const ObLoadDataHint &hint = load_stmt_->get_hints();
const ObIArray<ObLoadDataStmt::FieldOrVarStruct> &field_or_var_list =
load_stmt_->get_field_or_var_list();
ObSchemaGetterGuard *schema_guard = ctx_->get_sql_ctx()->schema_guard_;
const ObTableSchema *table_schema = nullptr;
const bool is_backup = ObLoadDataFormat::OB_BACKUP_1_4 == load_args.access_info_.get_load_data_format();
execute_param_.tenant_id_ = load_args.tenant_id_;
execute_param_.database_id_ = load_args.database_id_;
@ -2335,6 +2340,12 @@ int ObLoadDataDirectImpl::init_execute_param()
execute_param_.combined_name_ = load_args.combined_name_;
execute_param_.ignore_row_num_ = load_args.ignore_rows_;
execute_param_.dup_action_ = load_args.dupl_action_;
if (OB_FAIL(ObTableLoadSchema::get_table_schema(*schema_guard,
execute_param_.tenant_id_,
execute_param_.table_id_,
table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(execute_param_));
}
// parallel_
if (OB_SUCC(ret)) {
ObTenant *tenant = nullptr;
@ -2423,33 +2434,19 @@ int ObLoadDataDirectImpl::init_execute_param()
}
// column_ids_
if (OB_SUCC(ret)) {
ObSchemaGetterGuard *schema_guard = ctx_->get_sql_ctx()->schema_guard_;
int64_t column_count = 0;
execute_param_.column_ids_.reset();
if (is_backup) { // 备份数据导入
if (OB_FAIL(ObTableLoadSchema::get_column_ids(*schema_guard,
execute_param_.tenant_id_,
execute_param_.table_id_,
execute_param_.column_ids_))) {
if (OB_FAIL(ObTableLoadSchema::get_column_ids(table_schema, execute_param_.column_ids_))) {
LOG_WARN("fail to get column ids for backup", KR(ret));
}
} else if (load_stmt_->get_default_table_columns()) { // 默认列导入
if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(*schema_guard,
execute_param_.tenant_id_,
execute_param_.table_id_,
execute_param_.column_ids_))) {
if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(table_schema, execute_param_.column_ids_))) {
LOG_WARN("fail to get user column ids", KR(ret));
}
} else { // 指定列导入
const static uint64_t INVALID_COLUMN_ID = UINT64_MAX;
const ObTableSchema *table_schema = nullptr;
ObArray<uint64_t> user_column_ids;
if (OB_FAIL(ObTableLoadSchema::get_table_schema(*schema_guard,
execute_param_.tenant_id_,
execute_param_.table_id_,
table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(execute_param_));
} else if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(table_schema, user_column_ids))) {
if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(table_schema, user_column_ids))) {
LOG_WARN("fail to get user column ids", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < field_or_var_list.count(); ++i) {
@ -2479,6 +2476,23 @@ int ObLoadDataDirectImpl::init_execute_param()
}
}
}
// compressor_type_
if (OB_SUCC(ret)) {
if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(
table_schema->get_compressor_type(), execute_param_.parallel_, execute_param_.compressor_type_))) {
LOG_WARN("fail to get tmp store compressor type", KR(ret));
}
}
// online_sample_percent_
if (OB_SUCC(ret)) {
if (execute_param_.online_opt_stat_gather_ &&
OB_FAIL(ObDbmsStatsUtils::get_sys_online_estimate_percent(*ctx_,
execute_param_.tenant_id_,
execute_param_.table_id_,
execute_param_.online_sample_percent_))) {
LOG_WARN("failed to get sys online sample percent", K(ret));
}
}
return ret;
}
@ -2489,7 +2503,6 @@ int ObLoadDataDirectImpl::init_execute_context()
const bool is_backup = ObLoadDataFormat::OB_BACKUP_1_4 == load_args.access_info_.get_load_data_format();
execute_ctx_.exec_ctx_.exec_ctx_ = ctx_;
execute_ctx_.allocator_ = &ctx_->get_allocator();
ObCompressorType table_compressor_type = ObCompressorType::NONE_COMPRESSOR;
ObTableLoadParam load_param;
load_param.tenant_id_ = execute_param_.tenant_id_;
load_param.table_id_ = execute_param_.table_id_;
@ -2499,40 +2512,20 @@ int ObLoadDataDirectImpl::init_execute_context()
load_param.max_error_row_count_ = execute_param_.max_error_rows_;
load_param.column_count_ = execute_param_.column_ids_.count();
load_param.need_sort_ = is_backup ? false : execute_param_.need_sort_;
load_param.dup_action_ = execute_param_.dup_action_;
load_param.px_mode_ = false;
load_param.online_opt_stat_gather_ = execute_param_.online_opt_stat_gather_;
load_param.dup_action_ = execute_param_.dup_action_;
load_param.method_ = execute_param_.method_;
load_param.insert_mode_ = execute_param_.insert_mode_;
load_param.load_mode_ = ObDirectLoadMode::LOAD_DATA;
load_param.compressor_type_ = execute_param_.compressor_type_;
load_param.online_sample_percent_ = execute_param_.online_sample_percent_;
load_param.load_level_ = ObDirectLoadLevel::TABLE;
double online_sample_percent = 100.;
if (OB_SUCC(ret)) {
if (execute_param_.online_opt_stat_gather_ &&
OB_FAIL(ObDbmsStatsUtils::get_sys_online_estimate_percent(*ctx_,
execute_param_.tenant_id_,
execute_param_.table_id_,
online_sample_percent))) {
LOG_WARN("failed to get sys online sample percent", K(ret));
} else {
load_param.online_sample_percent_ = online_sample_percent;
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(ObTableLoadSchema::get_table_compressor_type(
execute_param_.tenant_id_, execute_param_.table_id_, table_compressor_type))) {
LOG_WARN("fail to get table compressor type", KR(ret));
} else if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(
table_compressor_type, execute_param_.parallel_, load_param.compressor_type_))) {
LOG_WARN("fail to get tmp store compressor type", KR(ret));
} else if (OB_FAIL(direct_loader_.init(load_param, execute_param_.column_ids_,
&execute_ctx_.exec_ctx_))) {
LOG_WARN("fail to init direct loader", KR(ret));
} else if (OB_FAIL(init_logger())) {
LOG_WARN("fail to init logger", KR(ret));
}
if (OB_FAIL(
direct_loader_.init(load_param, execute_param_.column_ids_, &execute_ctx_.exec_ctx_))) {
LOG_WARN("fail to init direct loader", KR(ret));
} else if (OB_FAIL(init_logger())) {
LOG_WARN("fail to init logger", KR(ret));
}
if (OB_SUCC(ret)) {
execute_ctx_.direct_loader_ = &direct_loader_;