diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index b1b16f0902..26564f7adf 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -1753,12 +1753,12 @@ int ObTableLoadCoordinator::write(const ObTableLoadTransId &trans_id, int32_t se // 取出bucket_writer else if (OB_FAIL(trans->get_bucket_writer_for_write(bucket_writer))) { LOG_WARN("fail to get bucket writer", KR(ret)); - } else if (OB_FAIL(bucket_writer->advance_sequence_no(session_id, sequence_no, guard))) { - if (OB_UNLIKELY(OB_ENTRY_EXIST != ret)) { - LOG_WARN("fail to advance sequence no", KR(ret), K(session_id)); - } else { - ret = OB_SUCCESS; - } + // } else if (OB_FAIL(bucket_writer->advance_sequence_no(session_id, sequence_no, guard))) { + // if (OB_UNLIKELY(OB_ENTRY_EXIST != ret)) { + // LOG_WARN("fail to advance sequence no", KR(ret), K(session_id)); + // } else { + // ret = OB_SUCCESS; + // } } else { ObTableLoadTask *task = nullptr; WriteTaskProcessor *processor = nullptr; diff --git a/src/observer/table_load/ob_table_load_instance.cpp b/src/observer/table_load/ob_table_load_instance.cpp index f451cacbfb..506c9b0fa1 100644 --- a/src/observer/table_load/ob_table_load_instance.cpp +++ b/src/observer/table_load/ob_table_load_instance.cpp @@ -440,6 +440,10 @@ int ObTableLoadInstance::start_redef_table( LOG_WARN("failed to assign tablet ids", KR(ret), K(param.load_level_), K(tablet_ids)); } else if (OB_FAIL(ObTableLoadRedefTable::start(start_arg, start_res, *stmt_ctx_.session_info_))) { LOG_WARN("fail to start redef table", KR(ret), K(start_arg)); + // rewrite error code for concurrency of direct load and offline ddl + if (OB_TABLE_NOT_EXIST == ret) { + ret = OB_SCHEMA_NOT_UPTODATE; + } } else { ddl_param.dest_table_id_ = start_res.dest_table_id_; ddl_param.task_id_ = start_res.task_id_; diff --git a/src/observer/table_load/ob_table_load_schema.cpp b/src/observer/table_load/ob_table_load_schema.cpp index 21b3bb57a2..77c040d931 100644 --- a/src/observer/table_load/ob_table_load_schema.cpp +++ b/src/observer/table_load/ob_table_load_schema.cpp @@ -240,22 +240,6 @@ int ObTableLoadSchema::get_user_column_id_and_names(const ObTableSchema *table_s return ret; } -int ObTableLoadSchema::get_user_column_count(ObSchemaGetterGuard &schema_guard, - uint64_t tenant_id, - uint64_t table_id, - int64_t &column_count) -{ - int ret = OB_SUCCESS; - column_count = 0; - ObArray column_schemas; - if (OB_FAIL(get_user_column_schemas(schema_guard, tenant_id, table_id, column_schemas))) { - LOG_WARN("fail to get user column schemas", KR(ret)); - } else { - column_count = column_schemas.count(); - } - return ret; -} - int ObTableLoadSchema::get_column_ids(const ObTableSchema *table_schema, ObIArray &column_ids, bool contain_hidden_pk_column) @@ -324,50 +308,6 @@ int ObTableLoadSchema::check_has_udt_column(const ObTableSchema *table_schema, b return ret; } -int ObTableLoadSchema::get_tenant_optimizer_gather_stats_on_load(const uint64_t tenant_id, - bool &value) -{ - int ret = OB_SUCCESS; - value = false; - ObSqlString sql; - SMART_VAR(ObMySQLProxy::MySQLResult, res) - { - sqlclient::ObMySQLResult *result = nullptr; - // TODO(suzhi.yt) 这里为啥是带zone纬度的? 如果查询结果中有多个zone的, 选哪个作为返回值呢? - if (OB_FAIL(sql.assign_fmt( - "SELECT value FROM %s WHERE tenant_id = %ld and (zone, name, schema_version) in (select " - "zone, name, max(schema_version) FROM %s group by zone, name) and name = '%s'", - OB_ALL_SYS_VARIABLE_HISTORY_TNAME, - ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id), - OB_ALL_SYS_VARIABLE_HISTORY_TNAME, OB_SV__OPTIMIZER_GATHER_STATS_ON_LOAD))) { - LOG_WARN("fail to append sql", KR(ret), K(tenant_id)); - } else if (OB_FAIL(GCTX.sql_proxy_->read(res, tenant_id, sql.ptr()))) { - LOG_WARN("fail to execute sql", KR(ret), K(sql), K(tenant_id)); - } else if (OB_ISNULL(result = res.get_result())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("fail to get sql result", KR(ret), K(sql), K(tenant_id)); - } else { - while (OB_SUCC(ret)) { - if (OB_FAIL(result->next())) { - if (OB_ITER_END != ret) { - LOG_WARN("fail to get next row", KR(ret), K(tenant_id)); - } else { - ret = OB_SUCCESS; - break; - } - } else { - ObString data; - EXTRACT_VARCHAR_FIELD_MYSQL(*result, "value", data); - if (0 == strcmp(data.ptr(), "1")) { - value = true; - } - } - } - } - } - return ret; -} - int ObTableLoadSchema::check_has_invisible_column(const ObTableSchema *table_schema, bool &bret) { int ret = OB_SUCCESS; @@ -440,16 +380,46 @@ int ObTableLoadSchema::check_has_lob_column(const ObTableSchema *table_schema, b return ret; } -int ObTableLoadSchema::get_table_compressor_type(uint64_t tenant_id, uint64_t table_id, - ObCompressorType &compressor_type) +int ObTableLoadSchema::get_tenant_optimizer_gather_stats_on_load(const uint64_t tenant_id, + bool &value) { int ret = OB_SUCCESS; - ObSchemaGetterGuard schema_guard; - const share::schema::ObTableSchema *table_schema = nullptr; - if (OB_FAIL(get_table_schema(tenant_id, table_id, schema_guard, table_schema))) { - LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); - } else { - compressor_type = table_schema->get_compressor_type(); + value = false; + ObSqlString sql; + SMART_VAR(ObMySQLProxy::MySQLResult, res) + { + sqlclient::ObMySQLResult *result = nullptr; + // TODO(suzhi.yt) 这里为啥是带zone纬度的? 如果查询结果中有多个zone的, 选哪个作为返回值呢? + if (OB_FAIL(sql.assign_fmt( + "SELECT value FROM %s WHERE tenant_id = %ld and (zone, name, schema_version) in (select " + "zone, name, max(schema_version) FROM %s group by zone, name) and name = '%s'", + OB_ALL_SYS_VARIABLE_HISTORY_TNAME, + ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id), + OB_ALL_SYS_VARIABLE_HISTORY_TNAME, OB_SV__OPTIMIZER_GATHER_STATS_ON_LOAD))) { + LOG_WARN("fail to append sql", KR(ret), K(tenant_id)); + } else if (OB_FAIL(GCTX.sql_proxy_->read(res, tenant_id, sql.ptr()))) { + LOG_WARN("fail to execute sql", KR(ret), K(sql), K(tenant_id)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get sql result", KR(ret), K(sql), K(tenant_id)); + } else { + while (OB_SUCC(ret)) { + if (OB_FAIL(result->next())) { + if (OB_ITER_END != ret) { + LOG_WARN("fail to get next row", KR(ret), K(tenant_id)); + } else { + ret = OB_SUCCESS; + break; + } + } else { + ObString data; + EXTRACT_VARCHAR_FIELD_MYSQL(*result, "value", data); + if (0 == strcmp(data.ptr(), "1")) { + value = true; + } + } + } + } } return ret; } diff --git a/src/observer/table_load/ob_table_load_schema.h b/src/observer/table_load/ob_table_load_schema.h index 6a06360791..fb9539bbab 100644 --- a/src/observer/table_load/ob_table_load_schema.h +++ b/src/observer/table_load/ob_table_load_schema.h @@ -42,6 +42,7 @@ public: static int get_table_schema(share::schema::ObSchemaGetterGuard &schema_guard, uint64_t tenant_id, uint64_t table_id, const share::schema::ObTableSchema *&table_schema); + static int get_user_column_schemas(const share::schema::ObTableSchema *table_schema, ObIArray &column_schemas); static int get_user_column_schemas(share::schema::ObSchemaGetterGuard &schema_guard, @@ -59,10 +60,7 @@ public: static int get_user_column_id_and_names(const share::schema::ObTableSchema *table_schema, common::ObIArray &column_ids, common::ObIArray &column_names); - static int get_user_column_count(share::schema::ObSchemaGetterGuard &schema_guard, - uint64_t tenant_id, - uint64_t table_id, - int64_t &column_count); + static int get_column_ids(const share::schema::ObTableSchema *table_schema, common::ObIArray &column_ids, bool contain_hidden_pk_column = false); @@ -71,14 +69,15 @@ public: uint64_t table_id, common::ObIArray &column_ids, bool contain_hidden_pk_column = false); + static int check_has_udt_column(const share::schema::ObTableSchema *table_schema, bool &bret); - static int get_tenant_optimizer_gather_stats_on_load(const uint64_t tenant_id, bool &value); static int check_has_invisible_column(const share::schema::ObTableSchema *table_schema, bool &bret); static int check_has_unused_column(const share::schema::ObTableSchema *table_schema, bool &bret); static int check_has_roaringbitmap_column(const share::schema::ObTableSchema *table_schema, bool &bret); static int check_has_lob_column(const share::schema::ObTableSchema *table_schema, bool &bret); - static int get_table_compressor_type(uint64_t tenant_id, uint64_t table_id, - ObCompressorType &compressor_type); + + static int get_tenant_optimizer_gather_stats_on_load(const uint64_t tenant_id, bool &value); + public: ObTableLoadSchema(); ~ObTableLoadSchema(); diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index f17dcfac8d..2340a48d24 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -69,7 +69,9 @@ ObLoadDataDirectImpl::LoadExecuteParam::LoadExecuteParam() ignore_row_num_(-1), dup_action_(ObLoadDupActionType::LOAD_INVALID_MODE), method_(ObDirectLoadMethod::INVALID_METHOD), - insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE) + insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE), + compressor_type_(ObCompressorType::INVALID_COMPRESSOR), + online_sample_percent_(100.) { column_ids_.set_tenant_id(MTL_ID()); } @@ -92,7 +94,8 @@ bool ObLoadDataDirectImpl::LoadExecuteParam::is_valid() const (storage::ObDirectLoadInsertMode::INC_REPLACE == insert_mode_ ? sql::ObLoadDupActionType::LOAD_REPLACE == dup_action_ : true) && - data_access_param_.is_valid() && !column_ids_.empty(); + data_access_param_.is_valid() && !column_ids_.empty() && + ObCompressorType::INVALID_COMPRESSOR != compressor_type_; } /** @@ -2326,6 +2329,8 @@ int ObLoadDataDirectImpl::init_execute_param() const ObLoadDataHint &hint = load_stmt_->get_hints(); const ObIArray &field_or_var_list = load_stmt_->get_field_or_var_list(); + ObSchemaGetterGuard *schema_guard = ctx_->get_sql_ctx()->schema_guard_; + const ObTableSchema *table_schema = nullptr; const bool is_backup = ObLoadDataFormat::OB_BACKUP_1_4 == load_args.access_info_.get_load_data_format(); execute_param_.tenant_id_ = load_args.tenant_id_; execute_param_.database_id_ = load_args.database_id_; @@ -2335,6 +2340,12 @@ int ObLoadDataDirectImpl::init_execute_param() execute_param_.combined_name_ = load_args.combined_name_; execute_param_.ignore_row_num_ = load_args.ignore_rows_; execute_param_.dup_action_ = load_args.dupl_action_; + if (OB_FAIL(ObTableLoadSchema::get_table_schema(*schema_guard, + execute_param_.tenant_id_, + execute_param_.table_id_, + table_schema))) { + LOG_WARN("fail to get table schema", KR(ret), K(execute_param_)); + } // parallel_ if (OB_SUCC(ret)) { ObTenant *tenant = nullptr; @@ -2423,33 +2434,19 @@ int ObLoadDataDirectImpl::init_execute_param() } // column_ids_ if (OB_SUCC(ret)) { - ObSchemaGetterGuard *schema_guard = ctx_->get_sql_ctx()->schema_guard_; - int64_t column_count = 0; execute_param_.column_ids_.reset(); if (is_backup) { // 备份数据导入 - if (OB_FAIL(ObTableLoadSchema::get_column_ids(*schema_guard, - execute_param_.tenant_id_, - execute_param_.table_id_, - execute_param_.column_ids_))) { + if (OB_FAIL(ObTableLoadSchema::get_column_ids(table_schema, execute_param_.column_ids_))) { LOG_WARN("fail to get column ids for backup", KR(ret)); } } else if (load_stmt_->get_default_table_columns()) { // 默认列导入 - if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(*schema_guard, - execute_param_.tenant_id_, - execute_param_.table_id_, - execute_param_.column_ids_))) { + if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(table_schema, execute_param_.column_ids_))) { LOG_WARN("fail to get user column ids", KR(ret)); } } else { // 指定列导入 const static uint64_t INVALID_COLUMN_ID = UINT64_MAX; - const ObTableSchema *table_schema = nullptr; ObArray user_column_ids; - if (OB_FAIL(ObTableLoadSchema::get_table_schema(*schema_guard, - execute_param_.tenant_id_, - execute_param_.table_id_, - table_schema))) { - LOG_WARN("fail to get table schema", KR(ret), K(execute_param_)); - } else if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(table_schema, user_column_ids))) { + if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(table_schema, user_column_ids))) { LOG_WARN("fail to get user column ids", KR(ret)); } for (int64_t i = 0; OB_SUCC(ret) && i < field_or_var_list.count(); ++i) { @@ -2479,6 +2476,23 @@ int ObLoadDataDirectImpl::init_execute_param() } } } + // compressor_type_ + if (OB_SUCC(ret)) { + if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type( + table_schema->get_compressor_type(), execute_param_.parallel_, execute_param_.compressor_type_))) { + LOG_WARN("fail to get tmp store compressor type", KR(ret)); + } + } + // online_sample_percent_ + if (OB_SUCC(ret)) { + if (execute_param_.online_opt_stat_gather_ && + OB_FAIL(ObDbmsStatsUtils::get_sys_online_estimate_percent(*ctx_, + execute_param_.tenant_id_, + execute_param_.table_id_, + execute_param_.online_sample_percent_))) { + LOG_WARN("failed to get sys online sample percent", K(ret)); + } + } return ret; } @@ -2489,7 +2503,6 @@ int ObLoadDataDirectImpl::init_execute_context() const bool is_backup = ObLoadDataFormat::OB_BACKUP_1_4 == load_args.access_info_.get_load_data_format(); execute_ctx_.exec_ctx_.exec_ctx_ = ctx_; execute_ctx_.allocator_ = &ctx_->get_allocator(); - ObCompressorType table_compressor_type = ObCompressorType::NONE_COMPRESSOR; ObTableLoadParam load_param; load_param.tenant_id_ = execute_param_.tenant_id_; load_param.table_id_ = execute_param_.table_id_; @@ -2499,40 +2512,20 @@ int ObLoadDataDirectImpl::init_execute_context() load_param.max_error_row_count_ = execute_param_.max_error_rows_; load_param.column_count_ = execute_param_.column_ids_.count(); load_param.need_sort_ = is_backup ? false : execute_param_.need_sort_; - load_param.dup_action_ = execute_param_.dup_action_; load_param.px_mode_ = false; load_param.online_opt_stat_gather_ = execute_param_.online_opt_stat_gather_; + load_param.dup_action_ = execute_param_.dup_action_; load_param.method_ = execute_param_.method_; load_param.insert_mode_ = execute_param_.insert_mode_; load_param.load_mode_ = ObDirectLoadMode::LOAD_DATA; + load_param.compressor_type_ = execute_param_.compressor_type_; + load_param.online_sample_percent_ = execute_param_.online_sample_percent_; load_param.load_level_ = ObDirectLoadLevel::TABLE; - - double online_sample_percent = 100.; - if (OB_SUCC(ret)) { - if (execute_param_.online_opt_stat_gather_ && - OB_FAIL(ObDbmsStatsUtils::get_sys_online_estimate_percent(*ctx_, - execute_param_.tenant_id_, - execute_param_.table_id_, - online_sample_percent))) { - LOG_WARN("failed to get sys online sample percent", K(ret)); - } else { - load_param.online_sample_percent_ = online_sample_percent; - } - } - - if (OB_SUCC(ret)) { - if (OB_FAIL(ObTableLoadSchema::get_table_compressor_type( - execute_param_.tenant_id_, execute_param_.table_id_, table_compressor_type))) { - LOG_WARN("fail to get table compressor type", KR(ret)); - } else if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type( - table_compressor_type, execute_param_.parallel_, load_param.compressor_type_))) { - LOG_WARN("fail to get tmp store compressor type", KR(ret)); - } else if (OB_FAIL(direct_loader_.init(load_param, execute_param_.column_ids_, - &execute_ctx_.exec_ctx_))) { - LOG_WARN("fail to init direct loader", KR(ret)); - } else if (OB_FAIL(init_logger())) { - LOG_WARN("fail to init logger", KR(ret)); - } + if (OB_FAIL( + direct_loader_.init(load_param, execute_param_.column_ids_, &execute_ctx_.exec_ctx_))) { + LOG_WARN("fail to init direct loader", KR(ret)); + } else if (OB_FAIL(init_logger())) { + LOG_WARN("fail to init logger", KR(ret)); } if (OB_SUCC(ret)) { execute_ctx_.direct_loader_ = &direct_loader_; diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.h b/src/sql/engine/cmd/ob_load_data_direct_impl.h index 8247693537..0c2d484f10 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.h +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.h @@ -95,7 +95,9 @@ private: "method", storage::ObDirectLoadMethod::get_type_string(method_), "insert_mode", storage::ObDirectLoadInsertMode::get_type_string(insert_mode_), K_(data_access_param), - K_(column_ids)); + K_(column_ids), + K_(compressor_type), + K_(online_sample_percent)); public: uint64_t tenant_id_; uint64_t database_id_; @@ -116,6 +118,8 @@ private: storage::ObDirectLoadInsertMode::Type insert_mode_; DataAccessParam data_access_param_; ObArray column_ids_; + ObCompressorType compressor_type_; + double online_sample_percent_; }; struct LoadExecuteContext diff --git a/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp b/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp index 52900383fc..8fad5983f5 100644 --- a/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp +++ b/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp @@ -118,14 +118,14 @@ int ObTableDirectInsertCtx::init( tablet_ids.reset(); param.tenant_id_ = MTL_ID(); param.table_id_ = table_id; - param.batch_size_ = 100; param.parallel_ = parallel; param.session_count_ = parallel; + param.batch_size_ = 100; + param.max_error_row_count_ = 0; param.column_count_ = column_ids.count(); + param.need_sort_ = table_schema->is_heap_table() ? phy_plan.get_direct_load_need_sort() : true; param.px_mode_ = true; param.online_opt_stat_gather_ = is_online_gather_statistics_; - param.need_sort_ = table_schema->is_heap_table() ? phy_plan.get_direct_load_need_sort() : true; - param.max_error_row_count_ = 0; param.dup_action_ = (enable_inc_replace ? sql::ObLoadDupActionType::LOAD_REPLACE : sql::ObLoadDupActionType::LOAD_STOP_ON_DUP); param.method_ = method;