diff --git a/src/observer/table_load/ob_table_load_obj_cast.cpp b/src/observer/table_load/ob_table_load_obj_cast.cpp index 5896f4e6b..29a2a15d8 100644 --- a/src/observer/table_load/ob_table_load_obj_cast.cpp +++ b/src/observer/table_load/ob_table_load_obj_cast.cpp @@ -9,6 +9,7 @@ #include "sql/engine/cmd/ob_load_data_utils.h" #include "sql/engine/expr/ob_expr_util.h" #include "sql/resolver/expr/ob_raw_expr_util.h" +#include "sql/engine/expr/ob_datum_cast.h" namespace oceanbase { @@ -20,6 +21,39 @@ using namespace sql; const ObObj ObTableLoadObjCaster::zero_obj(0); const ObObj ObTableLoadObjCaster::null_obj(ObObjType::ObNullType); +static int pad_obj(ObTableLoadCastObjCtx &cast_obj_ctx, const ObColumnSchemaV2 *column_schema, ObObj &obj) +{ + int ret = OB_SUCCESS; + bool is_pad = false; + bool is_fixed_string = obj.is_fixed_len_char_type() || obj.is_binary(); + //if (lib::is_mysql_mode()) { + // if (is_fixed_string && (SMO_PAD_CHAR_TO_FULL_LENGTH & cast_obj_ctx.param_.sql_mode_)) { + // is_pad = true; + // } + //} else { + // is_pad = is_fixed_string; + //} + if (is_fixed_string) { + int32_t fixed_len = column_schema->get_data_length(); + if (fixed_len > obj.val_len_) { + char *buf = (char *)cast_obj_ctx.cast_ctx_->allocator_v2_->alloc(fixed_len); + if (buf == nullptr) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to allocate buf", K(fixed_len), KR(ret)); + } + if (OB_SUCC(ret)) { + const ObCharsetType &cs = ObCharset::charset_type_by_coll(obj.get_collation_type()); + char padding_char = (CHARSET_BINARY == cs) ? OB_PADDING_BINARY : OB_PADDING_CHAR; + MEMCPY(buf, obj.v_.ptr_, obj.val_len_); + MEMSET(buf + obj.val_len_, padding_char, fixed_len - obj.val_len_); + obj.v_.ptr_ = buf; + obj.val_len_ = fixed_len; + } + } + } + return ret; +} + int ObTableLoadObjCaster::cast_obj(ObTableLoadCastObjCtx &cast_obj_ctx, const ObColumnSchemaV2 *column_schema, const ObObj &src, ObObj &dst) @@ -38,12 +72,18 @@ int ObTableLoadObjCaster::cast_obj(ObTableLoadCastObjCtx &cast_obj_ctx, LOG_WARN("fail to convert string to enum or set", KR(ret), K(src), K(dst)); } } else { - if (OB_FAIL(to_type(expect_type, cast_obj_ctx, accuracy, *convert_src_obj, dst))) { + if (OB_FAIL(to_type(expect_type, column_schema, cast_obj_ctx, accuracy, *convert_src_obj, dst))) { LOG_WARN("fail to do to type", KR(ret)); } } } + if (OB_SUCC(ret)) { + if (OB_FAIL(pad_obj(cast_obj_ctx, column_schema, dst))) { + LOG_WARN("fail to pad obj", KR(ret)); + } + } + if (OB_SUCC(ret)) { if (cast_obj_ctx.is_need_check_ && OB_FAIL(cast_obj_check(cast_obj_ctx, column_schema, dst))) { @@ -234,17 +274,15 @@ int ObTableLoadObjCaster::string_to_set(ObIAllocator &alloc, const ObObj &src, return ret; } -int ObTableLoadObjCaster::to_type(const ObObjType &expect_type, ObTableLoadCastObjCtx &cast_obj_ctx, +int ObTableLoadObjCaster::to_type(const ObObjType &expect_type, const share::schema::ObColumnSchemaV2 *column_schema, ObTableLoadCastObjCtx &cast_obj_ctx, const ObAccuracy &accuracy, const ObObj &src, ObObj &dst) { int ret = OB_SUCCESS; ObCastCtx cast_ctx = *cast_obj_ctx.cast_ctx_; + cast_ctx.dest_collation_ = column_schema->get_collation_type(); const ObTableLoadTimeConverter time_cvrt = *cast_obj_ctx.time_cvrt_; if (src.is_null()) { dst.set_null(); - } else if (src.get_type() == expect_type && expect_type != ObVarcharType && - expect_type != ObCharType && !ob_is_nstring_type(expect_type)) { - dst = src; } else if (src.get_type_class() == ObStringTC && (expect_type == ObNumberType || expect_type == ObUNumberType)) { ObNumberDesc d(0); @@ -353,4 +391,4 @@ int ObTableLoadObjCaster::string_datetime_oracle(const ObObjType expect_type, } } // namespace observer -} // namespace oceanbase \ No newline at end of file +} // namespace oceanbase diff --git a/src/observer/table_load/ob_table_load_obj_cast.h b/src/observer/table_load/ob_table_load_obj_cast.h index a54708391..d4c2eac1a 100644 --- a/src/observer/table_load/ob_table_load_obj_cast.h +++ b/src/observer/table_load/ob_table_load_obj_cast.h @@ -9,6 +9,7 @@ #include "observer/table_load/ob_table_load_time_convert.h" #include "share/object/ob_obj_cast.h" #include "share/schema/ob_column_schema.h" +#include "observer/table_load/ob_table_load_struct.h" namespace oceanbase { @@ -34,11 +35,12 @@ public: struct ObTableLoadCastObjCtx { public: - ObTableLoadCastObjCtx(const ObTableLoadTimeConverter *time_cvrt, common::ObCastCtx *cast_ctx, + ObTableLoadCastObjCtx(const ObTableLoadParam ¶m, const ObTableLoadTimeConverter *time_cvrt, common::ObCastCtx *cast_ctx, const bool is_need_check) - : time_cvrt_(time_cvrt), cast_ctx_(cast_ctx), is_need_check_(is_need_check){} + : param_(param), time_cvrt_(time_cvrt), cast_ctx_(cast_ctx), is_need_check_(is_need_check){} public: + const ObTableLoadParam ¶m_; const ObTableLoadTimeConverter *time_cvrt_; common::ObCastCtx *cast_ctx_; ObTableLoadNumberFastCtx number_fast_ctx_; @@ -56,6 +58,7 @@ public: const common::ObObj &src, common::ObObj &dst); private: + static int pad_column(const ObAccuracy accuracy, common::ObIAllocator &padding_alloc, common::ObObj &cell); static int convert_obj(const common::ObObjType &expect_type, const common::ObObj &src, const common::ObObj *&dest); static int handle_string_to_enum_set(ObTableLoadCastObjCtx &cast_obj_ctx, @@ -74,7 +77,7 @@ private: static int cast_obj_check(ObTableLoadCastObjCtx &cast_obj_ctx, const share::schema::ObColumnSchemaV2 *column_schema, common::ObObj &obj); - static int to_type(const common::ObObjType &expect_type, ObTableLoadCastObjCtx &cast_obj_ctx, + static int to_type(const common::ObObjType &expect_type, const share::schema::ObColumnSchemaV2 *column_schema, ObTableLoadCastObjCtx &cast_obj_ctx, const common::ObAccuracy &accuracy, const common::ObObj &src, common::ObObj &dst); static int string_datetime_oracle(const common::ObObjType expect_type, common::ObObjCastParams ¶ms, const common::ObObj &in, @@ -235,4 +238,4 @@ private: }; } // namespace observer -} // namespace oceanbase \ No newline at end of file +} // namespace oceanbase diff --git a/src/observer/table_load/ob_table_load_partition_calc.cpp b/src/observer/table_load/ob_table_load_partition_calc.cpp index a49bba98b..c4a244b31 100644 --- a/src/observer/table_load/ob_table_load_partition_calc.cpp +++ b/src/observer/table_load/ob_table_load_partition_calc.cpp @@ -190,7 +190,7 @@ int ObTableLoadPartitionCalc::calc(ObTableLoadPartitionCalcContext &ctx) const LOG_WARN("ObTableLoadPartitionCalc not init", KR(ret), KP(this)); } else { const ObTableLoadObjRowArray &obj_rows = ctx.obj_rows_; - const int64_t column_count = ctx.column_count_; + const int64_t column_count = ctx.param_.column_count_; if (OB_UNLIKELY(obj_rows.empty())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(column_count), K(obj_rows.count())); @@ -208,7 +208,7 @@ int ObTableLoadPartitionCalc::calc(ObTableLoadPartitionCalcContext &ctx) const for (int64_t i = 0; OB_SUCC(ret) && i < row_count; ++i) { ObNewRow part_row; if (OB_FAIL( - get_row(obj_rows.at(i), column_count, part_row, ctx.allocator_))) { + get_row(ctx, obj_rows.at(i), column_count, part_row, ctx.allocator_))) { LOG_WARN("fail to get rowkey", KR(ret)); } else if (OB_FAIL(part_rows.push_back(part_row))) { LOG_WARN("failed to push back partition row", KR(ret), K(part_row)); @@ -226,7 +226,7 @@ int ObTableLoadPartitionCalc::calc(ObTableLoadPartitionCalcContext &ctx) const } // FIXME: TODO, 对于非分区表,不能进入到这里计算 -int ObTableLoadPartitionCalc::get_row(const ObTableLoadObjRow &obj_row, int32_t length, ObNewRow &part_row, +int ObTableLoadPartitionCalc::get_row(ObTableLoadPartitionCalcContext &ctx, const ObTableLoadObjRow &obj_row, int32_t length, ObNewRow &part_row, ObIAllocator &allocator) const { int ret = OB_SUCCESS; @@ -234,7 +234,7 @@ int ObTableLoadPartitionCalc::get_row(const ObTableLoadObjRow &obj_row, int32_t ObObj *rowkey_objs = static_cast(allocator.alloc(sizeof(ObObj) * rowkey_obj_count)); ObDataTypeCastParams cast_params(&tz_info_); ObCastCtx cast_ctx(&allocator, &cast_params, CM_NONE, ObCharset::get_system_collation()); - ObTableLoadCastObjCtx cast_obj_ctx(&time_cvrt_, &cast_ctx, false); + ObTableLoadCastObjCtx cast_obj_ctx(ctx.param_, &time_cvrt_, &cast_ctx, false); if (OB_ISNULL(rowkey_objs)) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc memory", KR(ret)); @@ -249,6 +249,7 @@ int ObTableLoadPartitionCalc::get_row(const ObTableLoadObjRow &obj_row, int32_t obj_row.cells_[obj_index], rowkey_objs[i]))) { LOG_WARN("fail to cast obj", KR(ret)); } + } if (OB_SUCC(ret)) { part_row.assign(rowkey_objs, rowkey_obj_count); diff --git a/src/observer/table_load/ob_table_load_partition_calc.h b/src/observer/table_load/ob_table_load_partition_calc.h index c6cbcb2d4..990c32dc8 100644 --- a/src/observer/table_load/ob_table_load_partition_calc.h +++ b/src/observer/table_load/ob_table_load_partition_calc.h @@ -12,6 +12,7 @@ #include "sql/optimizer/ob_table_location.h" #include "sql/engine/ob_exec_context.h" #include "observer/table_load/ob_table_load_time_convert.h" +#include "observer/table_load/ob_table_load_struct.h" namespace oceanbase { @@ -22,13 +23,13 @@ class ObTableLoadSchema; struct ObTableLoadPartitionCalcContext { ObTableLoadPartitionCalcContext(const table::ObTableLoadObjRowArray &obj_rows, - int64_t column_count, common::ObIAllocator &allocator) - : obj_rows_(obj_rows), column_count_(column_count), allocator_(allocator) + const ObTableLoadParam ¶m, common::ObIAllocator &allocator) + : obj_rows_(obj_rows), param_(param), allocator_(allocator) { partition_ids_.set_block_allocator(common::ModulePageAllocator(allocator_)); } const table::ObTableLoadObjRowArray &obj_rows_; - const int64_t column_count_; + const ObTableLoadParam ¶m_; common::ObIAllocator &allocator_; common::ObArray partition_ids_; }; @@ -45,7 +46,7 @@ public: private: int init_rowkey_index(const share::schema::ObTableSchema *table_schema, common::ObIAllocator &allocator); - int get_row(const table::ObTableLoadObjRow &obj_row, int32_t length, common::ObNewRow &part_row, + int get_row(ObTableLoadPartitionCalcContext &ctx, const table::ObTableLoadObjRow &obj_row, int32_t length, common::ObNewRow &part_row, common::ObIAllocator &allocator) const; int get_partition_by_row(common::ObIArray &part_rows, common::ObIArray &partition_ids) const; diff --git a/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp b/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp index 3c9084f46..505ab05bc 100644 --- a/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp +++ b/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp @@ -207,7 +207,7 @@ int ObTableLoadTransBucketWriter::handle_partition_with_autoinc_identity( ObDataTypeCastParams cast_params(&(coordinator_ctx_->partition_calc_.tz_info_)); ObCastCtx cast_ctx(&autoinc_allocator, &cast_params, CM_NONE, ObCharset::get_system_collation()); - ObTableLoadCastObjCtx cast_obj_ctx(&(coordinator_ctx_->partition_calc_.time_cvrt_), &cast_ctx, + ObTableLoadCastObjCtx cast_obj_ctx(param_, &(coordinator_ctx_->partition_calc_.time_cvrt_), &cast_ctx, false); ObObj out_obj; for (int64_t j = 0; OB_SUCC(ret) && j < row_count; ++j) { @@ -309,7 +309,7 @@ int ObTableLoadTransBucketWriter::write_for_partitioned(SessionContext &session_ { int ret = OB_SUCCESS; ObArenaAllocator allocator("TLD_Misc", OB_MALLOC_NORMAL_BLOCK_SIZE, param_.tenant_id_); - ObTableLoadPartitionCalcContext calc_ctx(obj_rows, param_.column_count_, allocator); + ObTableLoadPartitionCalcContext calc_ctx(obj_rows, param_, allocator); if (OB_FAIL(coordinator_ctx_->partition_calc_.calc(calc_ctx))) { LOG_WARN("fail to calc partition", KR(ret)); } diff --git a/src/observer/table_load/ob_table_load_trans_store.cpp b/src/observer/table_load/ob_table_load_trans_store.cpp index 7e9c8ec95..e17ad18ea 100644 --- a/src/observer/table_load/ob_table_load_trans_store.cpp +++ b/src/observer/table_load/ob_table_load_trans_store.cpp @@ -365,7 +365,7 @@ int ObTableLoadTransStoreWriter::cast_row(ObArenaAllocator &cast_allocator, out_obj.set_null(); const ObColumnSchemaV2 *column_schema = column_schemas_.at(i); ObCastCtx cast_ctx(&cast_allocator, &cast_params, CM_NONE, column_schema->get_collation_type()); - ObTableLoadCastObjCtx cast_obj_ctx(&time_cvrt_, &cast_ctx, true); + ObTableLoadCastObjCtx cast_obj_ctx(param_, &time_cvrt_, &cast_ctx, true); if ((!row.cells_[i].is_null() || (!column_schema->is_autoincrement() && !column_schema->is_identity_column())) && OB_FAIL(