diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp index d4c256ae51..f01d0b16f3 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp @@ -107,7 +107,7 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray &idx_array, } // init partition_calc_ else if (OB_FAIL( - partition_calc_.init(ctx_->param_.tenant_id_, ctx_->param_.table_id_, ctx_->session_info_))) { + partition_calc_.init(ctx_->param_, ctx_->session_info_))) { LOG_WARN("fail to init partition calc", KR(ret)); } // init trans_allocator_ diff --git a/src/observer/table_load/ob_table_load_partition_calc.cpp b/src/observer/table_load/ob_table_load_partition_calc.cpp index af6915000d..8705d349fd 100644 --- a/src/observer/table_load/ob_table_load_partition_calc.cpp +++ b/src/observer/table_load/ob_table_load_partition_calc.cpp @@ -26,8 +26,7 @@ ObTableLoadPartitionCalc::ObTableLoadPartitionCalc() : session_info_(nullptr), is_partition_with_autoinc_(false), partition_with_autoinc_idx_(OB_INVALID_INDEX), - tenant_id_(OB_INVALID_ID), - table_id_(OB_INVALID_ID), + param_(nullptr), is_partitioned_(false), allocator_("TLD_PartCalc"), exec_ctx_(allocator_), @@ -35,13 +34,15 @@ ObTableLoadPartitionCalc::ObTableLoadPartitionCalc() { } -int ObTableLoadPartitionCalc::init(uint64_t tenant_id, uint64_t table_id, sql::ObSQLSessionInfo *session_info) +int ObTableLoadPartitionCalc::init(const ObTableLoadParam ¶m, sql::ObSQLSessionInfo *session_info) { int ret = OB_SUCCESS; if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("ObTableLoadPartitionCalc init twice", KR(ret), KP(this)); } else { + uint64_t tenant_id = param.tenant_id_; + uint64_t table_id = param.table_id_; allocator_.set_tenant_id(tenant_id); sql_ctx_.schema_guard_ = &schema_guard_; exec_ctx_.set_sql_ctx(&sql_ctx_); @@ -71,8 +72,7 @@ int ObTableLoadPartitionCalc::init(uint64_t tenant_id, uint64_t table_id, sql::O } } if (OB_SUCC(ret)) { - tenant_id_ = tenant_id; - table_id_ = table_id; + param_ = ¶m; session_info_ = session_info; is_partitioned_ = is_partitioned; is_inited_ = true; @@ -134,78 +134,47 @@ int ObTableLoadPartitionCalc::init_part_key_index(const ObTableSchema *table_sch return ret; } -int ObTableLoadPartitionCalc::calc(ObTableLoadPartitionCalcContext &ctx) +int ObTableLoadPartitionCalc::get_part_key(const table::ObTableLoadObjRow &row, common::ObNewRow &part_key) const { OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, calc_part_time_us); int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadPartitionCalc not init", KR(ret), KP(this)); - } else { - const ObTableLoadObjRowArray &obj_rows = ctx.obj_rows_; - const int64_t column_count = ctx.param_.column_count_; - if (OB_UNLIKELY(obj_rows.empty())) { + if (OB_UNLIKELY(part_key.count_ != part_key_obj_index_.count())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid part key count", KR(ret), K(part_key.count_), K(part_key_obj_index_.count())); + } + for (int64_t i = 0; OB_SUCC(ret) && i < part_key_obj_index_.count(); ++i) { + const IndexAndType &index_and_type = part_key_obj_index_.at(i); + const int64_t obj_index = index_and_type.index_; + if (OB_UNLIKELY(obj_index >= row.count_)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), K(column_count), K(obj_rows.count())); + LOG_WARN("invalid length", KR(ret), K(obj_index), K(row.count_)); } else { - const int64_t row_count = obj_rows.count(); - if (!is_partitioned_) { // 非分区表 - for (int64_t i = 0; OB_SUCC(ret) && i < row_count; ++i) { - if (OB_FAIL(ctx.partition_ids_.push_back(partition_id_))) { - LOG_WARN("failed to push back partition id", KR(ret)); - } - } - } else { // 分区表 - ObArray part_rows; - part_rows.set_block_allocator(ModulePageAllocator(ctx.allocator_)); - for (int64_t i = 0; OB_SUCC(ret) && i < row_count; ++i) { - ObNewRow part_row; - if (OB_FAIL( - get_row(ctx, obj_rows.at(i), column_count, part_row, ctx.allocator_))) { - LOG_WARN("fail to get rowkey", KR(ret)); - } else if (OB_FAIL(part_rows.push_back(part_row))) { - LOG_WARN("failed to push back partition row", KR(ret), K(part_row)); - } - } - if (OB_SUCC(ret)) { - if (OB_FAIL(get_partition_by_row(part_rows, ctx.partition_ids_))) { - LOG_WARN("fail to get partition", KR(ret)); - } - } - } + part_key.cells_[i] = row.cells_[obj_index]; } } return ret; } -// FIXME: TODO, 对于非分区表,不能进入到这里计算 -int ObTableLoadPartitionCalc::get_row(ObTableLoadPartitionCalcContext &ctx, const ObTableLoadObjRow &obj_row, int32_t length, ObNewRow &part_row, - ObIAllocator &allocator) const +int ObTableLoadPartitionCalc::cast_part_key(common::ObNewRow &part_key, common::ObIAllocator &allocator) const { int ret = OB_SUCCESS; - const int64_t rowkey_obj_count = part_key_obj_index_.count(); - ObObj *rowkey_objs = static_cast(allocator.alloc(sizeof(ObObj) * rowkey_obj_count)); - ObDataTypeCastParams cast_params(session_info_->get_timezone_info()); - ObCastCtx cast_ctx(&allocator, &cast_params, CM_NONE, ObCharset::get_system_collation()); - ObTableLoadCastObjCtx cast_obj_ctx(ctx.param_, &time_cvrt_, &cast_ctx, true); - if (OB_ISNULL(rowkey_objs)) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to alloc memory", KR(ret)); - } - for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_obj_count; ++i) { - const IndexAndType &index_and_type = part_key_obj_index_.at(i); - const int64_t obj_index = index_and_type.index_; - if (OB_UNLIKELY(obj_index >= length)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid length", KR(ret), K(obj_index), K(length)); - } else if (OB_FAIL(ObTableLoadObjCaster::cast_obj(cast_obj_ctx, index_and_type.column_schema_, - obj_row.cells_[obj_index], rowkey_objs[i]))) { - LOG_WARN("fail to cast obj", KR(ret)); + if (OB_UNLIKELY(part_key.count_ != part_key_obj_index_.count())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid part key count", KR(ret), K(part_key.count_), K(part_key_obj_index_.count())); + } else { + ObDataTypeCastParams cast_params(session_info_->get_timezone_info()); + ObCastCtx cast_ctx(&allocator, &cast_params, CM_NONE, ObCharset::get_system_collation()); + ObTableLoadCastObjCtx cast_obj_ctx(*param_, &time_cvrt_, &cast_ctx, true); + ObObj obj; + for (int64_t i = 0; OB_SUCC(ret) && i < part_key_obj_index_.count(); ++i) { + const IndexAndType &index_and_type = part_key_obj_index_.at(i); + if (OB_FAIL(ObTableLoadObjCaster::cast_obj(cast_obj_ctx, index_and_type.column_schema_, + part_key.cells_[i], obj))) { + LOG_WARN("fail to cast obj", KR(ret)); + } else { + part_key.cells_[i] = obj; + } } - - } - if (OB_SUCC(ret)) { - part_row.assign(rowkey_objs, rowkey_obj_count); } return ret; } @@ -217,7 +186,7 @@ int ObTableLoadPartitionCalc::get_partition_by_row( ObArray tablet_ids; ObArray part_ids; if (OB_FAIL(table_location_.calculate_partition_ids_by_rows2( - *session_info_, schema_guard_, table_id_, part_rows, tablet_ids, part_ids))) { + *session_info_, schema_guard_, param_->table_id_, part_rows, tablet_ids, part_ids))) { LOG_WARN("fail to calc partition id", KR(ret)); } else if (OB_UNLIKELY(part_rows.count() != part_ids.count() || part_rows.count() != tablet_ids.count())) { @@ -225,11 +194,7 @@ int ObTableLoadPartitionCalc::get_partition_by_row( LOG_WARN("invalid args", K(part_ids.count()), K(tablet_ids.count())); } for (int i = 0; OB_SUCC(ret) && i < part_rows.count(); i++) { - if (OB_INVALID_PARTITION_ID == part_ids.at(i) - || ObTabletID::INVALID_TABLET_ID == tablet_ids.at(i).id()) { - ret = OB_NO_PARTITION_FOR_GIVEN_VALUE; - LOG_WARN("no partition matched", KR(ret), K(part_ids.at(i)), K(tablet_ids.at(i))); - } else if (OB_FAIL( + if (OB_FAIL( partition_ids.push_back(ObTableLoadPartitionId(part_ids.at(i), tablet_ids.at(i))))) { LOG_WARN("fail to push partition id", KR(ret)); } diff --git a/src/observer/table_load/ob_table_load_partition_calc.h b/src/observer/table_load/ob_table_load_partition_calc.h index 20a680ab01..ecb692f867 100644 --- a/src/observer/table_load/ob_table_load_partition_calc.h +++ b/src/observer/table_load/ob_table_load_partition_calc.h @@ -21,33 +21,20 @@ namespace observer { class ObTableLoadSchema; -struct ObTableLoadPartitionCalcContext -{ - ObTableLoadPartitionCalcContext(const table::ObTableLoadObjRowArray &obj_rows, - const ObTableLoadParam ¶m, common::ObIAllocator &allocator) - : obj_rows_(obj_rows), param_(param), allocator_(allocator) - { - partition_ids_.set_block_allocator(common::ModulePageAllocator(allocator_)); - } - const table::ObTableLoadObjRowArray &obj_rows_; - const ObTableLoadParam ¶m_; - common::ObIAllocator &allocator_; - common::ObArray partition_ids_; -}; class ObTableLoadPartitionCalc { public: ObTableLoadPartitionCalc(); - int init(uint64_t tenant_id, uint64_t table_id, sql::ObSQLSessionInfo *session_info); - int calc(ObTableLoadPartitionCalcContext &ctx); + int init(const ObTableLoadParam ¶m, sql::ObSQLSessionInfo *session_info); + int get_part_key(const table::ObTableLoadObjRow &row, common::ObNewRow &part_key) const; + int cast_part_key(common::ObNewRow &part_key, common::ObIAllocator &allocator) const; + int get_partition_by_row(common::ObIArray &part_rows, + common::ObIArray &partition_ids); + int64_t get_part_key_obj_count() const {return part_key_obj_index_.count();} private: int init_part_key_index(const share::schema::ObTableSchema *table_schema, common::ObIAllocator &allocator); - int get_row(ObTableLoadPartitionCalcContext &ctx, const table::ObTableLoadObjRow &obj_row, int32_t length, common::ObNewRow &part_row, - common::ObIAllocator &allocator) const; - int get_partition_by_row(common::ObIArray &part_rows, - common::ObIArray &partition_ids); public: struct IndexAndType { @@ -63,9 +50,8 @@ public: bool is_partition_with_autoinc_; int64_t partition_with_autoinc_idx_; private: + const ObTableLoadParam *param_; // data members - uint64_t tenant_id_; - uint64_t table_id_; bool is_partitioned_; // 非分区表 table::ObTableLoadPartitionId partition_id_; diff --git a/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp b/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp index 1445ac21e9..2a5648b23c 100644 --- a/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp +++ b/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp @@ -8,6 +8,7 @@ #include "observer/table_load/ob_table_load_trans_bucket_writer.h" #include "observer/table_load/ob_table_load_coordinator.h" #include "observer/table_load/ob_table_load_coordinator_ctx.h" +#include "observer/table_load/ob_table_load_error_row_handler.h" #include "observer/table_load/ob_table_load_obj_cast.h" #include "observer/table_load/ob_table_load_partition_calc.h" #include "observer/table_load/ob_table_load_stat.h" @@ -309,19 +310,56 @@ int ObTableLoadTransBucketWriter::write_for_partitioned(SessionContext &session_ { int ret = OB_SUCCESS; ObArenaAllocator allocator("TLD_Misc", OB_MALLOC_NORMAL_BLOCK_SIZE, param_.tenant_id_); - ObTableLoadPartitionCalcContext calc_ctx(obj_rows, param_, allocator); - if (OB_FAIL(coordinator_ctx_->partition_calc_.calc(calc_ctx))) { - LOG_WARN("fail to calc partition", KR(ret)); + const int64_t part_key_obj_count = coordinator_ctx_->partition_calc_.get_part_key_obj_count(); + ObArray partition_ids; + ObArray part_keys; + ObArray row_idxs; + ObTableLoadErrorRowHandler *error_row_handler = + trans_ctx_->ctx_->store_ctx_->error_row_handler_; + partition_ids.set_block_allocator(common::ModulePageAllocator(allocator)); + for (int64_t i = 0; OB_SUCC(ret) && i < obj_rows.count(); ++i) { + ObNewRow part_key; + part_key.count_ = part_key_obj_count; + part_key.cells_ = static_cast(allocator.alloc(sizeof(ObObj) * part_key_obj_count)); + if (OB_ISNULL(part_key.cells_)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to alloc memory", KR(ret)); + } else if (OB_FAIL(coordinator_ctx_->partition_calc_.get_part_key(obj_rows.at(i), part_key))) { + LOG_WARN("fail to get part key", KR(ret)); + } else if (OB_FAIL(coordinator_ctx_->partition_calc_.cast_part_key(part_key, allocator))) { + if (OB_FAIL(error_row_handler->handle_error_row(ret, part_key))) { + LOG_WARN("failed to handle error row", K(ret), K(part_key)); + } else { + ret = OB_SUCCESS; + } + } else if (OB_FAIL(part_keys.push_back(part_key))) { + LOG_WARN("fail to push back part key", KR(ret)); + } else if (OB_FAIL(row_idxs.push_back(i))) { + LOG_WARN("fail to push back row idx", KR(ret)); + } } - for (int64_t i = 0; OB_SUCC(ret) && i < calc_ctx.partition_ids_.count(); ++i) { - const ObTableLoadPartitionId &partition_id = calc_ctx.partition_ids_.at(i); + if (OB_SUCC(ret)) { + if (OB_FAIL(coordinator_ctx_->partition_calc_.get_partition_by_row(part_keys, partition_ids))) { + LOG_WARN("fail to calc partition", KR(ret)); + } + } + for (int64_t i = 0; OB_SUCC(ret) && i < row_idxs.count(); ++i) { + const ObTableLoadPartitionId &partition_id = partition_ids.at(i); + const ObTableLoadObjRow &row = obj_rows.at(row_idxs.at(i)); ObTableLoadBucket *load_bucket = nullptr; bool need_write = false; - if (OB_FAIL(get_load_bucket(session_ctx, partition_id, load_bucket))) { + if (OB_UNLIKELY(!partition_id.is_valid())) { + ret = OB_NO_PARTITION_FOR_GIVEN_VALUE; + if (OB_FAIL(error_row_handler->handle_error_row(ret, part_keys.at(i)))) { + LOG_WARN("failed to handle error row", K(ret), K(part_keys.at(i))); + } else { + ret = OB_SUCCESS; + } + } else if (OB_FAIL(get_load_bucket(session_ctx, partition_id, load_bucket))) { LOG_WARN("fail to get partition bucket", KR(ret), K(session_ctx.session_id_), K(partition_id)); } else if (OB_FAIL(load_bucket->add_row( - partition_id.tablet_id_, obj_rows.at(i), + partition_id.tablet_id_, row, param_.column_count_, param_.batch_size_, need_write))) { LOG_WARN("fail to add row", KR(ret)); } else if (need_write && OB_FAIL(write_load_bucket(session_ctx, load_bucket))) { diff --git a/src/sql/optimizer/ob_table_location.cpp b/src/sql/optimizer/ob_table_location.cpp index 58051ee18d..f1516b6b14 100644 --- a/src/sql/optimizer/ob_table_location.cpp +++ b/src/sql/optimizer/ob_table_location.cpp @@ -1505,12 +1505,22 @@ int ObTableLocation::calculate_partition_ids_by_rows2(ObSQLSessionInfo &session_ } } if (OB_SUCC(ret)) { - if ((tmp_tablet_ids.count() != 1) || (tmp_part_ids.count() != 1)) { + ObObjectID part_id; + ObTabletID tablet_id; + if (OB_UNLIKELY(tmp_tablet_ids.empty() && tmp_part_ids.empty())) { + part_id = OB_INVALID_PARTITION_ID; + tablet_id = ObTabletID::INVALID_TABLET_ID; + } else if (OB_UNLIKELY((tmp_tablet_ids.count() != 1) || (tmp_part_ids.count() != 1))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid tablet ids or partition ids", KR(ret), K(tmp_tablet_ids), K(tmp_part_ids)); - } else if (OB_FAIL(tablet_ids.push_back(tmp_tablet_ids.at(0)))) { + } else { + part_id = tmp_part_ids.at(0); + tablet_id = tmp_tablet_ids.at(0); + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(tablet_ids.push_back(tablet_id))) { LOG_WARN("fail to push tablet id", KR(ret)); - } else if (OB_FAIL(part_ids.push_back(tmp_part_ids.at(0)))) { + } else if (OB_FAIL(part_ids.push_back(part_id))) { LOG_WARN("fail to push object id", KR(ret)); } }