Incorrect partition calculation in direct load

This commit is contained in:
obdev 2023-03-07 04:44:00 +00:00 committed by ob-robot
parent 4187cab988
commit f09c3eff3d
6 changed files with 64 additions and 21 deletions

View File

@ -9,6 +9,7 @@
#include "sql/engine/cmd/ob_load_data_utils.h"
#include "sql/engine/expr/ob_expr_util.h"
#include "sql/resolver/expr/ob_raw_expr_util.h"
#include "sql/engine/expr/ob_datum_cast.h"
namespace oceanbase
{
@ -20,6 +21,39 @@ using namespace sql;
const ObObj ObTableLoadObjCaster::zero_obj(0);
const ObObj ObTableLoadObjCaster::null_obj(ObObjType::ObNullType);
static int pad_obj(ObTableLoadCastObjCtx &cast_obj_ctx, const ObColumnSchemaV2 *column_schema, ObObj &obj)
{
int ret = OB_SUCCESS;
bool is_pad = false;
bool is_fixed_string = obj.is_fixed_len_char_type() || obj.is_binary();
//if (lib::is_mysql_mode()) {
// if (is_fixed_string && (SMO_PAD_CHAR_TO_FULL_LENGTH & cast_obj_ctx.param_.sql_mode_)) {
// is_pad = true;
// }
//} else {
// is_pad = is_fixed_string;
//}
if (is_fixed_string) {
int32_t fixed_len = column_schema->get_data_length();
if (fixed_len > obj.val_len_) {
char *buf = (char *)cast_obj_ctx.cast_ctx_->allocator_v2_->alloc(fixed_len);
if (buf == nullptr) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate buf", K(fixed_len), KR(ret));
}
if (OB_SUCC(ret)) {
const ObCharsetType &cs = ObCharset::charset_type_by_coll(obj.get_collation_type());
char padding_char = (CHARSET_BINARY == cs) ? OB_PADDING_BINARY : OB_PADDING_CHAR;
MEMCPY(buf, obj.v_.ptr_, obj.val_len_);
MEMSET(buf + obj.val_len_, padding_char, fixed_len - obj.val_len_);
obj.v_.ptr_ = buf;
obj.val_len_ = fixed_len;
}
}
}
return ret;
}
int ObTableLoadObjCaster::cast_obj(ObTableLoadCastObjCtx &cast_obj_ctx,
const ObColumnSchemaV2 *column_schema, const ObObj &src,
ObObj &dst)
@ -38,12 +72,18 @@ int ObTableLoadObjCaster::cast_obj(ObTableLoadCastObjCtx &cast_obj_ctx,
LOG_WARN("fail to convert string to enum or set", KR(ret), K(src), K(dst));
}
} else {
if (OB_FAIL(to_type(expect_type, cast_obj_ctx, accuracy, *convert_src_obj, dst))) {
if (OB_FAIL(to_type(expect_type, column_schema, cast_obj_ctx, accuracy, *convert_src_obj, dst))) {
LOG_WARN("fail to do to type", KR(ret));
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(pad_obj(cast_obj_ctx, column_schema, dst))) {
LOG_WARN("fail to pad obj", KR(ret));
}
}
if (OB_SUCC(ret)) {
if (cast_obj_ctx.is_need_check_ &&
OB_FAIL(cast_obj_check(cast_obj_ctx, column_schema, dst))) {
@ -234,17 +274,15 @@ int ObTableLoadObjCaster::string_to_set(ObIAllocator &alloc, const ObObj &src,
return ret;
}
int ObTableLoadObjCaster::to_type(const ObObjType &expect_type, ObTableLoadCastObjCtx &cast_obj_ctx,
int ObTableLoadObjCaster::to_type(const ObObjType &expect_type, const share::schema::ObColumnSchemaV2 *column_schema, ObTableLoadCastObjCtx &cast_obj_ctx,
const ObAccuracy &accuracy, const ObObj &src, ObObj &dst)
{
int ret = OB_SUCCESS;
ObCastCtx cast_ctx = *cast_obj_ctx.cast_ctx_;
cast_ctx.dest_collation_ = column_schema->get_collation_type();
const ObTableLoadTimeConverter time_cvrt = *cast_obj_ctx.time_cvrt_;
if (src.is_null()) {
dst.set_null();
} else if (src.get_type() == expect_type && expect_type != ObVarcharType &&
expect_type != ObCharType && !ob_is_nstring_type(expect_type)) {
dst = src;
} else if (src.get_type_class() == ObStringTC &&
(expect_type == ObNumberType || expect_type == ObUNumberType)) {
ObNumberDesc d(0);
@ -353,4 +391,4 @@ int ObTableLoadObjCaster::string_datetime_oracle(const ObObjType expect_type,
}
} // namespace observer
} // namespace oceanbase
} // namespace oceanbase

View File

@ -9,6 +9,7 @@
#include "observer/table_load/ob_table_load_time_convert.h"
#include "share/object/ob_obj_cast.h"
#include "share/schema/ob_column_schema.h"
#include "observer/table_load/ob_table_load_struct.h"
namespace oceanbase
{
@ -34,11 +35,12 @@ public:
struct ObTableLoadCastObjCtx
{
public:
ObTableLoadCastObjCtx(const ObTableLoadTimeConverter *time_cvrt, common::ObCastCtx *cast_ctx,
ObTableLoadCastObjCtx(const ObTableLoadParam &param, const ObTableLoadTimeConverter *time_cvrt, common::ObCastCtx *cast_ctx,
const bool is_need_check)
: time_cvrt_(time_cvrt), cast_ctx_(cast_ctx), is_need_check_(is_need_check){}
: param_(param), time_cvrt_(time_cvrt), cast_ctx_(cast_ctx), is_need_check_(is_need_check){}
public:
const ObTableLoadParam &param_;
const ObTableLoadTimeConverter *time_cvrt_;
common::ObCastCtx *cast_ctx_;
ObTableLoadNumberFastCtx number_fast_ctx_;
@ -56,6 +58,7 @@ public:
const common::ObObj &src, common::ObObj &dst);
private:
static int pad_column(const ObAccuracy accuracy, common::ObIAllocator &padding_alloc, common::ObObj &cell);
static int convert_obj(const common::ObObjType &expect_type, const common::ObObj &src,
const common::ObObj *&dest);
static int handle_string_to_enum_set(ObTableLoadCastObjCtx &cast_obj_ctx,
@ -74,7 +77,7 @@ private:
static int cast_obj_check(ObTableLoadCastObjCtx &cast_obj_ctx,
const share::schema::ObColumnSchemaV2 *column_schema,
common::ObObj &obj);
static int to_type(const common::ObObjType &expect_type, ObTableLoadCastObjCtx &cast_obj_ctx,
static int to_type(const common::ObObjType &expect_type, const share::schema::ObColumnSchemaV2 *column_schema, ObTableLoadCastObjCtx &cast_obj_ctx,
const common::ObAccuracy &accuracy, const common::ObObj &src, common::ObObj &dst);
static int string_datetime_oracle(const common::ObObjType expect_type,
common::ObObjCastParams &params, const common::ObObj &in,
@ -235,4 +238,4 @@ private:
};
} // namespace observer
} // namespace oceanbase
} // namespace oceanbase

View File

@ -190,7 +190,7 @@ int ObTableLoadPartitionCalc::calc(ObTableLoadPartitionCalcContext &ctx) const
LOG_WARN("ObTableLoadPartitionCalc not init", KR(ret), KP(this));
} else {
const ObTableLoadObjRowArray &obj_rows = ctx.obj_rows_;
const int64_t column_count = ctx.column_count_;
const int64_t column_count = ctx.param_.column_count_;
if (OB_UNLIKELY(obj_rows.empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(column_count), K(obj_rows.count()));
@ -208,7 +208,7 @@ int ObTableLoadPartitionCalc::calc(ObTableLoadPartitionCalcContext &ctx) const
for (int64_t i = 0; OB_SUCC(ret) && i < row_count; ++i) {
ObNewRow part_row;
if (OB_FAIL(
get_row(obj_rows.at(i), column_count, part_row, ctx.allocator_))) {
get_row(ctx, obj_rows.at(i), column_count, part_row, ctx.allocator_))) {
LOG_WARN("fail to get rowkey", KR(ret));
} else if (OB_FAIL(part_rows.push_back(part_row))) {
LOG_WARN("failed to push back partition row", KR(ret), K(part_row));
@ -226,7 +226,7 @@ int ObTableLoadPartitionCalc::calc(ObTableLoadPartitionCalcContext &ctx) const
}
// FIXME: TODO, 对于非分区表,不能进入到这里计算
int ObTableLoadPartitionCalc::get_row(const ObTableLoadObjRow &obj_row, int32_t length, ObNewRow &part_row,
int ObTableLoadPartitionCalc::get_row(ObTableLoadPartitionCalcContext &ctx, const ObTableLoadObjRow &obj_row, int32_t length, ObNewRow &part_row,
ObIAllocator &allocator) const
{
int ret = OB_SUCCESS;
@ -234,7 +234,7 @@ int ObTableLoadPartitionCalc::get_row(const ObTableLoadObjRow &obj_row, int32_t
ObObj *rowkey_objs = static_cast<ObObj *>(allocator.alloc(sizeof(ObObj) * rowkey_obj_count));
ObDataTypeCastParams cast_params(&tz_info_);
ObCastCtx cast_ctx(&allocator, &cast_params, CM_NONE, ObCharset::get_system_collation());
ObTableLoadCastObjCtx cast_obj_ctx(&time_cvrt_, &cast_ctx, false);
ObTableLoadCastObjCtx cast_obj_ctx(ctx.param_, &time_cvrt_, &cast_ctx, false);
if (OB_ISNULL(rowkey_objs)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc memory", KR(ret));
@ -249,6 +249,7 @@ int ObTableLoadPartitionCalc::get_row(const ObTableLoadObjRow &obj_row, int32_t
obj_row.cells_[obj_index], rowkey_objs[i]))) {
LOG_WARN("fail to cast obj", KR(ret));
}
}
if (OB_SUCC(ret)) {
part_row.assign(rowkey_objs, rowkey_obj_count);

View File

@ -12,6 +12,7 @@
#include "sql/optimizer/ob_table_location.h"
#include "sql/engine/ob_exec_context.h"
#include "observer/table_load/ob_table_load_time_convert.h"
#include "observer/table_load/ob_table_load_struct.h"
namespace oceanbase
{
@ -22,13 +23,13 @@ class ObTableLoadSchema;
struct ObTableLoadPartitionCalcContext
{
ObTableLoadPartitionCalcContext(const table::ObTableLoadObjRowArray &obj_rows,
int64_t column_count, common::ObIAllocator &allocator)
: obj_rows_(obj_rows), column_count_(column_count), allocator_(allocator)
const ObTableLoadParam &param, common::ObIAllocator &allocator)
: obj_rows_(obj_rows), param_(param), allocator_(allocator)
{
partition_ids_.set_block_allocator(common::ModulePageAllocator(allocator_));
}
const table::ObTableLoadObjRowArray &obj_rows_;
const int64_t column_count_;
const ObTableLoadParam &param_;
common::ObIAllocator &allocator_;
common::ObArray<table::ObTableLoadPartitionId> partition_ids_;
};
@ -45,7 +46,7 @@ public:
private:
int init_rowkey_index(const share::schema::ObTableSchema *table_schema,
common::ObIAllocator &allocator);
int get_row(const table::ObTableLoadObjRow &obj_row, int32_t length, common::ObNewRow &part_row,
int get_row(ObTableLoadPartitionCalcContext &ctx, const table::ObTableLoadObjRow &obj_row, int32_t length, common::ObNewRow &part_row,
common::ObIAllocator &allocator) const;
int get_partition_by_row(common::ObIArray<common::ObNewRow> &part_rows,
common::ObIArray<table::ObTableLoadPartitionId> &partition_ids) const;

View File

@ -207,7 +207,7 @@ int ObTableLoadTransBucketWriter::handle_partition_with_autoinc_identity(
ObDataTypeCastParams cast_params(&(coordinator_ctx_->partition_calc_.tz_info_));
ObCastCtx cast_ctx(&autoinc_allocator, &cast_params, CM_NONE,
ObCharset::get_system_collation());
ObTableLoadCastObjCtx cast_obj_ctx(&(coordinator_ctx_->partition_calc_.time_cvrt_), &cast_ctx,
ObTableLoadCastObjCtx cast_obj_ctx(param_, &(coordinator_ctx_->partition_calc_.time_cvrt_), &cast_ctx,
false);
ObObj out_obj;
for (int64_t j = 0; OB_SUCC(ret) && j < row_count; ++j) {
@ -309,7 +309,7 @@ int ObTableLoadTransBucketWriter::write_for_partitioned(SessionContext &session_
{
int ret = OB_SUCCESS;
ObArenaAllocator allocator("TLD_Misc", OB_MALLOC_NORMAL_BLOCK_SIZE, param_.tenant_id_);
ObTableLoadPartitionCalcContext calc_ctx(obj_rows, param_.column_count_, allocator);
ObTableLoadPartitionCalcContext calc_ctx(obj_rows, param_, allocator);
if (OB_FAIL(coordinator_ctx_->partition_calc_.calc(calc_ctx))) {
LOG_WARN("fail to calc partition", KR(ret));
}

View File

@ -365,7 +365,7 @@ int ObTableLoadTransStoreWriter::cast_row(ObArenaAllocator &cast_allocator,
out_obj.set_null();
const ObColumnSchemaV2 *column_schema = column_schemas_.at(i);
ObCastCtx cast_ctx(&cast_allocator, &cast_params, CM_NONE, column_schema->get_collation_type());
ObTableLoadCastObjCtx cast_obj_ctx(&time_cvrt_, &cast_ctx, true);
ObTableLoadCastObjCtx cast_obj_ctx(param_, &time_cvrt_, &cast_ctx, true);
if ((!row.cells_[i].is_null() ||
(!column_schema->is_autoincrement() && !column_schema->is_identity_column())) &&
OB_FAIL(