fix partition calc bug

This commit is contained in:
yongshige 2023-08-07 12:48:34 +00:00 committed by ob-robot
parent e74be98c8a
commit 7d5247651d
5 changed files with 101 additions and 102 deletions

View File

@ -107,7 +107,7 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray<int64_t> &idx_array,
}
// init partition_calc_
else if (OB_FAIL(
partition_calc_.init(ctx_->param_.tenant_id_, ctx_->param_.table_id_, ctx_->session_info_))) {
partition_calc_.init(ctx_->param_, ctx_->session_info_))) {
LOG_WARN("fail to init partition calc", KR(ret));
}
// init trans_allocator_

View File

@ -26,8 +26,7 @@ ObTableLoadPartitionCalc::ObTableLoadPartitionCalc()
: session_info_(nullptr),
is_partition_with_autoinc_(false),
partition_with_autoinc_idx_(OB_INVALID_INDEX),
tenant_id_(OB_INVALID_ID),
table_id_(OB_INVALID_ID),
param_(nullptr),
is_partitioned_(false),
allocator_("TLD_PartCalc"),
exec_ctx_(allocator_),
@ -35,13 +34,15 @@ ObTableLoadPartitionCalc::ObTableLoadPartitionCalc()
{
}
int ObTableLoadPartitionCalc::init(uint64_t tenant_id, uint64_t table_id, sql::ObSQLSessionInfo *session_info)
int ObTableLoadPartitionCalc::init(const ObTableLoadParam &param, sql::ObSQLSessionInfo *session_info)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObTableLoadPartitionCalc init twice", KR(ret), KP(this));
} else {
uint64_t tenant_id = param.tenant_id_;
uint64_t table_id = param.table_id_;
allocator_.set_tenant_id(tenant_id);
sql_ctx_.schema_guard_ = &schema_guard_;
exec_ctx_.set_sql_ctx(&sql_ctx_);
@ -71,8 +72,7 @@ int ObTableLoadPartitionCalc::init(uint64_t tenant_id, uint64_t table_id, sql::O
}
}
if (OB_SUCC(ret)) {
tenant_id_ = tenant_id;
table_id_ = table_id;
param_ = &param;
session_info_ = session_info;
is_partitioned_ = is_partitioned;
is_inited_ = true;
@ -134,78 +134,47 @@ int ObTableLoadPartitionCalc::init_part_key_index(const ObTableSchema *table_sch
return ret;
}
int ObTableLoadPartitionCalc::calc(ObTableLoadPartitionCalcContext &ctx)
int ObTableLoadPartitionCalc::get_part_key(const table::ObTableLoadObjRow &row, common::ObNewRow &part_key) const
{
OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, calc_part_time_us);
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadPartitionCalc not init", KR(ret), KP(this));
} else {
const ObTableLoadObjRowArray &obj_rows = ctx.obj_rows_;
const int64_t column_count = ctx.param_.column_count_;
if (OB_UNLIKELY(obj_rows.empty())) {
if (OB_UNLIKELY(part_key.count_ != part_key_obj_index_.count())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid part key count", KR(ret), K(part_key.count_), K(part_key_obj_index_.count()));
}
for (int64_t i = 0; OB_SUCC(ret) && i < part_key_obj_index_.count(); ++i) {
const IndexAndType &index_and_type = part_key_obj_index_.at(i);
const int64_t obj_index = index_and_type.index_;
if (OB_UNLIKELY(obj_index >= row.count_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(column_count), K(obj_rows.count()));
LOG_WARN("invalid length", KR(ret), K(obj_index), K(row.count_));
} else {
const int64_t row_count = obj_rows.count();
if (!is_partitioned_) { // 非分区表
for (int64_t i = 0; OB_SUCC(ret) && i < row_count; ++i) {
if (OB_FAIL(ctx.partition_ids_.push_back(partition_id_))) {
LOG_WARN("failed to push back partition id", KR(ret));
}
}
} else { // 分区表
ObArray<ObNewRow> part_rows;
part_rows.set_block_allocator(ModulePageAllocator(ctx.allocator_));
for (int64_t i = 0; OB_SUCC(ret) && i < row_count; ++i) {
ObNewRow part_row;
if (OB_FAIL(
get_row(ctx, obj_rows.at(i), column_count, part_row, ctx.allocator_))) {
LOG_WARN("fail to get rowkey", KR(ret));
} else if (OB_FAIL(part_rows.push_back(part_row))) {
LOG_WARN("failed to push back partition row", KR(ret), K(part_row));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(get_partition_by_row(part_rows, ctx.partition_ids_))) {
LOG_WARN("fail to get partition", KR(ret));
}
}
}
part_key.cells_[i] = row.cells_[obj_index];
}
}
return ret;
}
// FIXME: TODO, 对于非分区表,不能进入到这里计算
int ObTableLoadPartitionCalc::get_row(ObTableLoadPartitionCalcContext &ctx, const ObTableLoadObjRow &obj_row, int32_t length, ObNewRow &part_row,
ObIAllocator &allocator) const
int ObTableLoadPartitionCalc::cast_part_key(common::ObNewRow &part_key, common::ObIAllocator &allocator) const
{
int ret = OB_SUCCESS;
const int64_t rowkey_obj_count = part_key_obj_index_.count();
ObObj *rowkey_objs = static_cast<ObObj *>(allocator.alloc(sizeof(ObObj) * rowkey_obj_count));
ObDataTypeCastParams cast_params(session_info_->get_timezone_info());
ObCastCtx cast_ctx(&allocator, &cast_params, CM_NONE, ObCharset::get_system_collation());
ObTableLoadCastObjCtx cast_obj_ctx(ctx.param_, &time_cvrt_, &cast_ctx, true);
if (OB_ISNULL(rowkey_objs)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc memory", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_obj_count; ++i) {
const IndexAndType &index_and_type = part_key_obj_index_.at(i);
const int64_t obj_index = index_and_type.index_;
if (OB_UNLIKELY(obj_index >= length)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid length", KR(ret), K(obj_index), K(length));
} else if (OB_FAIL(ObTableLoadObjCaster::cast_obj(cast_obj_ctx, index_and_type.column_schema_,
obj_row.cells_[obj_index], rowkey_objs[i]))) {
LOG_WARN("fail to cast obj", KR(ret));
if (OB_UNLIKELY(part_key.count_ != part_key_obj_index_.count())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid part key count", KR(ret), K(part_key.count_), K(part_key_obj_index_.count()));
} else {
ObDataTypeCastParams cast_params(session_info_->get_timezone_info());
ObCastCtx cast_ctx(&allocator, &cast_params, CM_NONE, ObCharset::get_system_collation());
ObTableLoadCastObjCtx cast_obj_ctx(*param_, &time_cvrt_, &cast_ctx, true);
ObObj obj;
for (int64_t i = 0; OB_SUCC(ret) && i < part_key_obj_index_.count(); ++i) {
const IndexAndType &index_and_type = part_key_obj_index_.at(i);
if (OB_FAIL(ObTableLoadObjCaster::cast_obj(cast_obj_ctx, index_and_type.column_schema_,
part_key.cells_[i], obj))) {
LOG_WARN("fail to cast obj", KR(ret));
} else {
part_key.cells_[i] = obj;
}
}
}
if (OB_SUCC(ret)) {
part_row.assign(rowkey_objs, rowkey_obj_count);
}
return ret;
}
@ -217,7 +186,7 @@ int ObTableLoadPartitionCalc::get_partition_by_row(
ObArray<ObTabletID> tablet_ids;
ObArray<ObObjectID> part_ids;
if (OB_FAIL(table_location_.calculate_partition_ids_by_rows2(
*session_info_, schema_guard_, table_id_, part_rows, tablet_ids, part_ids))) {
*session_info_, schema_guard_, param_->table_id_, part_rows, tablet_ids, part_ids))) {
LOG_WARN("fail to calc partition id", KR(ret));
} else if (OB_UNLIKELY(part_rows.count() != part_ids.count() ||
part_rows.count() != tablet_ids.count())) {
@ -225,11 +194,7 @@ int ObTableLoadPartitionCalc::get_partition_by_row(
LOG_WARN("invalid args", K(part_ids.count()), K(tablet_ids.count()));
}
for (int i = 0; OB_SUCC(ret) && i < part_rows.count(); i++) {
if (OB_INVALID_PARTITION_ID == part_ids.at(i)
|| ObTabletID::INVALID_TABLET_ID == tablet_ids.at(i).id()) {
ret = OB_NO_PARTITION_FOR_GIVEN_VALUE;
LOG_WARN("no partition matched", KR(ret), K(part_ids.at(i)), K(tablet_ids.at(i)));
} else if (OB_FAIL(
if (OB_FAIL(
partition_ids.push_back(ObTableLoadPartitionId(part_ids.at(i), tablet_ids.at(i))))) {
LOG_WARN("fail to push partition id", KR(ret));
}

View File

@ -21,33 +21,20 @@ namespace observer
{
class ObTableLoadSchema;
struct ObTableLoadPartitionCalcContext
{
ObTableLoadPartitionCalcContext(const table::ObTableLoadObjRowArray &obj_rows,
const ObTableLoadParam &param, common::ObIAllocator &allocator)
: obj_rows_(obj_rows), param_(param), allocator_(allocator)
{
partition_ids_.set_block_allocator(common::ModulePageAllocator(allocator_));
}
const table::ObTableLoadObjRowArray &obj_rows_;
const ObTableLoadParam &param_;
common::ObIAllocator &allocator_;
common::ObArray<table::ObTableLoadPartitionId> partition_ids_;
};
class ObTableLoadPartitionCalc
{
public:
ObTableLoadPartitionCalc();
int init(uint64_t tenant_id, uint64_t table_id, sql::ObSQLSessionInfo *session_info);
int calc(ObTableLoadPartitionCalcContext &ctx);
int init(const ObTableLoadParam &param, sql::ObSQLSessionInfo *session_info);
int get_part_key(const table::ObTableLoadObjRow &row, common::ObNewRow &part_key) const;
int cast_part_key(common::ObNewRow &part_key, common::ObIAllocator &allocator) const;
int get_partition_by_row(common::ObIArray<common::ObNewRow> &part_rows,
common::ObIArray<table::ObTableLoadPartitionId> &partition_ids);
int64_t get_part_key_obj_count() const {return part_key_obj_index_.count();}
private:
int init_part_key_index(const share::schema::ObTableSchema *table_schema,
common::ObIAllocator &allocator);
int get_row(ObTableLoadPartitionCalcContext &ctx, const table::ObTableLoadObjRow &obj_row, int32_t length, common::ObNewRow &part_row,
common::ObIAllocator &allocator) const;
int get_partition_by_row(common::ObIArray<common::ObNewRow> &part_rows,
common::ObIArray<table::ObTableLoadPartitionId> &partition_ids);
public:
struct IndexAndType
{
@ -63,9 +50,8 @@ public:
bool is_partition_with_autoinc_;
int64_t partition_with_autoinc_idx_;
private:
const ObTableLoadParam *param_;
// data members
uint64_t tenant_id_;
uint64_t table_id_;
bool is_partitioned_;
// 非分区表
table::ObTableLoadPartitionId partition_id_;

View File

@ -8,6 +8,7 @@
#include "observer/table_load/ob_table_load_trans_bucket_writer.h"
#include "observer/table_load/ob_table_load_coordinator.h"
#include "observer/table_load/ob_table_load_coordinator_ctx.h"
#include "observer/table_load/ob_table_load_error_row_handler.h"
#include "observer/table_load/ob_table_load_obj_cast.h"
#include "observer/table_load/ob_table_load_partition_calc.h"
#include "observer/table_load/ob_table_load_stat.h"
@ -309,19 +310,56 @@ int ObTableLoadTransBucketWriter::write_for_partitioned(SessionContext &session_
{
int ret = OB_SUCCESS;
ObArenaAllocator allocator("TLD_Misc", OB_MALLOC_NORMAL_BLOCK_SIZE, param_.tenant_id_);
ObTableLoadPartitionCalcContext calc_ctx(obj_rows, param_, allocator);
if (OB_FAIL(coordinator_ctx_->partition_calc_.calc(calc_ctx))) {
LOG_WARN("fail to calc partition", KR(ret));
const int64_t part_key_obj_count = coordinator_ctx_->partition_calc_.get_part_key_obj_count();
ObArray<ObTableLoadPartitionId> partition_ids;
ObArray<ObNewRow> part_keys;
ObArray<int64_t> row_idxs;
ObTableLoadErrorRowHandler *error_row_handler =
trans_ctx_->ctx_->store_ctx_->error_row_handler_;
partition_ids.set_block_allocator(common::ModulePageAllocator(allocator));
for (int64_t i = 0; OB_SUCC(ret) && i < obj_rows.count(); ++i) {
ObNewRow part_key;
part_key.count_ = part_key_obj_count;
part_key.cells_ = static_cast<ObObj *>(allocator.alloc(sizeof(ObObj) * part_key_obj_count));
if (OB_ISNULL(part_key.cells_)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc memory", KR(ret));
} else if (OB_FAIL(coordinator_ctx_->partition_calc_.get_part_key(obj_rows.at(i), part_key))) {
LOG_WARN("fail to get part key", KR(ret));
} else if (OB_FAIL(coordinator_ctx_->partition_calc_.cast_part_key(part_key, allocator))) {
if (OB_FAIL(error_row_handler->handle_error_row(ret, part_key))) {
LOG_WARN("failed to handle error row", K(ret), K(part_key));
} else {
ret = OB_SUCCESS;
}
} else if (OB_FAIL(part_keys.push_back(part_key))) {
LOG_WARN("fail to push back part key", KR(ret));
} else if (OB_FAIL(row_idxs.push_back(i))) {
LOG_WARN("fail to push back row idx", KR(ret));
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < calc_ctx.partition_ids_.count(); ++i) {
const ObTableLoadPartitionId &partition_id = calc_ctx.partition_ids_.at(i);
if (OB_SUCC(ret)) {
if (OB_FAIL(coordinator_ctx_->partition_calc_.get_partition_by_row(part_keys, partition_ids))) {
LOG_WARN("fail to calc partition", KR(ret));
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < row_idxs.count(); ++i) {
const ObTableLoadPartitionId &partition_id = partition_ids.at(i);
const ObTableLoadObjRow &row = obj_rows.at(row_idxs.at(i));
ObTableLoadBucket *load_bucket = nullptr;
bool need_write = false;
if (OB_FAIL(get_load_bucket(session_ctx, partition_id, load_bucket))) {
if (OB_UNLIKELY(!partition_id.is_valid())) {
ret = OB_NO_PARTITION_FOR_GIVEN_VALUE;
if (OB_FAIL(error_row_handler->handle_error_row(ret, part_keys.at(i)))) {
LOG_WARN("failed to handle error row", K(ret), K(part_keys.at(i)));
} else {
ret = OB_SUCCESS;
}
} else if (OB_FAIL(get_load_bucket(session_ctx, partition_id, load_bucket))) {
LOG_WARN("fail to get partition bucket", KR(ret), K(session_ctx.session_id_),
K(partition_id));
} else if (OB_FAIL(load_bucket->add_row(
partition_id.tablet_id_, obj_rows.at(i),
partition_id.tablet_id_, row,
param_.column_count_, param_.batch_size_, need_write))) {
LOG_WARN("fail to add row", KR(ret));
} else if (need_write && OB_FAIL(write_load_bucket(session_ctx, load_bucket))) {

View File

@ -1505,12 +1505,22 @@ int ObTableLocation::calculate_partition_ids_by_rows2(ObSQLSessionInfo &session_
}
}
if (OB_SUCC(ret)) {
if ((tmp_tablet_ids.count() != 1) || (tmp_part_ids.count() != 1)) {
ObObjectID part_id;
ObTabletID tablet_id;
if (OB_UNLIKELY(tmp_tablet_ids.empty() && tmp_part_ids.empty())) {
part_id = OB_INVALID_PARTITION_ID;
tablet_id = ObTabletID::INVALID_TABLET_ID;
} else if (OB_UNLIKELY((tmp_tablet_ids.count() != 1) || (tmp_part_ids.count() != 1))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid tablet ids or partition ids", KR(ret), K(tmp_tablet_ids), K(tmp_part_ids));
} else if (OB_FAIL(tablet_ids.push_back(tmp_tablet_ids.at(0)))) {
} else {
part_id = tmp_part_ids.at(0);
tablet_id = tmp_tablet_ids.at(0);
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(tablet_ids.push_back(tablet_id))) {
LOG_WARN("fail to push tablet id", KR(ret));
} else if (OB_FAIL(part_ids.push_back(tmp_part_ids.at(0)))) {
} else if (OB_FAIL(part_ids.push_back(part_id))) {
LOG_WARN("fail to push object id", KR(ret));
}
}