fix partition calc bug
This commit is contained in:
@ -102,7 +102,6 @@
|
||||
#include "logservice/palf/election/interface/election.h"
|
||||
#include "storage/ddl/ob_ddl_redo_log_writer.h"
|
||||
#include "observer/ob_server_utils.h"
|
||||
#include "observer/table_load/ob_table_load_partition_calc.h"
|
||||
#include "share/detect/ob_detect_manager.h"
|
||||
|
||||
using namespace oceanbase::lib;
|
||||
@ -267,8 +266,6 @@ int ObServer::init(const ObServerOptions &opts, const ObPLogWriterCfg &log_cfg)
|
||||
LOG_ERROR("init retry ctrl failed", KR(ret));
|
||||
} else if (OB_FAIL(ObTableApiProcessorBase::init_session())) {
|
||||
LOG_ERROR("init static session failed", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadPartitionCalc::init_session())) {
|
||||
LOG_ERROR("failed to init static session", KR(ret));
|
||||
} else if (OB_FAIL(init_loaddata_global_stat())) {
|
||||
LOG_ERROR("init global load data stat map failed", KR(ret));
|
||||
} else if (OB_FAIL(init_pre_setting())) {
|
||||
|
||||
@ -72,7 +72,7 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray<int64_t> &idx_array, uint64_t
|
||||
}
|
||||
// init partition_calc_
|
||||
else if (OB_FAIL(
|
||||
partition_calc_.init(ctx_->param_.tenant_id_, ctx_->param_.table_id_))) {
|
||||
partition_calc_.init(ctx_->param_.tenant_id_, ctx_->param_.table_id_, ctx_->session_info_))) {
|
||||
LOG_WARN("fail to init partition calc", KR(ret));
|
||||
}
|
||||
// init trans_allocator_
|
||||
|
||||
@ -22,44 +22,6 @@ using namespace share::schema;
|
||||
using namespace sql;
|
||||
using namespace table;
|
||||
|
||||
static ObSQLSessionInfo &session()
|
||||
{
|
||||
static ObSQLSessionInfo SESSION;
|
||||
return SESSION;
|
||||
}
|
||||
|
||||
ObArenaAllocator &session_alloc()
|
||||
{
|
||||
static ObArenaAllocator SESSION_ALLOC;
|
||||
return SESSION_ALLOC;
|
||||
}
|
||||
|
||||
ObSQLSessionInfo &ObTableLoadPartitionCalc::get_session()
|
||||
{
|
||||
return session();
|
||||
}
|
||||
|
||||
int ObTableLoadPartitionCalc::init_session()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
static const uint32_t sess_version = 0;
|
||||
static const uint32_t sess_id = 1;
|
||||
static const uint64_t proxy_sess_id = 1;
|
||||
|
||||
// ensure allocator is constructed before session to
|
||||
// avoid coredump at observer exit
|
||||
//
|
||||
ObArenaAllocator *allocator = &session_alloc();
|
||||
ObSQLSessionInfo &sess = session();
|
||||
|
||||
if (OB_FAIL(sess.test_init(sess_version, sess_id, proxy_sess_id, allocator))) {
|
||||
LOG_WARN("init session failed", KR(ret));
|
||||
} else if (OB_FAIL(sess.load_default_sys_variable(false, true))) {
|
||||
LOG_WARN("failed to load default sys var", KR(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObTableLoadPartitionCalc::ObTableLoadPartitionCalc()
|
||||
: is_partition_with_autoinc_(false),
|
||||
partition_with_autoinc_idx_(OB_INVALID_INDEX),
|
||||
@ -72,7 +34,7 @@ ObTableLoadPartitionCalc::ObTableLoadPartitionCalc()
|
||||
{
|
||||
}
|
||||
|
||||
int ObTableLoadPartitionCalc::init(uint64_t tenant_id, uint64_t table_id)
|
||||
int ObTableLoadPartitionCalc::init(uint64_t tenant_id, uint64_t table_id, sql::ObSQLSessionInfo *session_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
@ -80,41 +42,38 @@ int ObTableLoadPartitionCalc::init(uint64_t tenant_id, uint64_t table_id)
|
||||
LOG_WARN("ObTableLoadPartitionCalc init twice", KR(ret), KP(this));
|
||||
} else {
|
||||
allocator_.set_tenant_id(tenant_id);
|
||||
if (OB_FAIL(OTTZ_MGR.get_tenant_tz(tenant_id, tz_info_.get_tz_map_wrap()))) {
|
||||
LOG_WARN("fail to get tenant time zone", KR(ret), K(tenant_id_));
|
||||
const ObTableSchema *table_schema = nullptr;
|
||||
ObDataTypeCastParams cast_params(session_info->get_timezone_info());
|
||||
if (OB_FAIL(time_cvrt_.init(cast_params.get_nls_format(ObDateTimeType)))) {
|
||||
LOG_WARN("fail to init time converter", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard_,
|
||||
table_schema))) {
|
||||
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
|
||||
} else {
|
||||
const ObTableSchema *table_schema = nullptr;
|
||||
ObDataTypeCastParams cast_params(&tz_info_);
|
||||
if (OB_FAIL(time_cvrt_.init(cast_params.get_nls_format(ObDateTimeType)))) {
|
||||
LOG_WARN("fail to init time converter", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard_,
|
||||
table_schema))) {
|
||||
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
|
||||
} else {
|
||||
const bool is_partitioned = table_schema->is_partitioned_table();
|
||||
if (!is_partitioned) { // 非分区表
|
||||
if (OB_FAIL(table_schema->get_tablet_and_object_id(partition_id_.tablet_id_,
|
||||
partition_id_.partition_id_))) {
|
||||
LOG_WARN("fail to get tablet and object", KR(ret));
|
||||
}
|
||||
} else { // 分区表
|
||||
exec_ctx_.set_sql_ctx(&sql_ctx_);
|
||||
// 初始化table_location_
|
||||
if (OB_FAIL(
|
||||
table_location_.init_partition_ids_by_rowkey2(exec_ctx_, session(), schema_guard_, table_id))) {
|
||||
LOG_WARN("fail to init table location", KR(ret));
|
||||
}
|
||||
// 获取part_key_obj_index_
|
||||
else if (OB_FAIL(init_part_key_index(table_schema, allocator_))) {
|
||||
LOG_WARN("fail to get rowkey index", KR(ret));
|
||||
}
|
||||
const bool is_partitioned = table_schema->is_partitioned_table();
|
||||
if (!is_partitioned) { // 非分区表
|
||||
if (OB_FAIL(table_schema->get_tablet_and_object_id(partition_id_.tablet_id_,
|
||||
partition_id_.partition_id_))) {
|
||||
LOG_WARN("fail to get tablet and object", KR(ret));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
tenant_id_ = tenant_id;
|
||||
table_id_ = table_id;
|
||||
is_partitioned_ = is_partitioned;
|
||||
is_inited_ = true;
|
||||
} else { // 分区表
|
||||
exec_ctx_.set_sql_ctx(&sql_ctx_);
|
||||
// 初始化table_location_
|
||||
if (OB_FAIL(
|
||||
table_location_.init_partition_ids_by_rowkey2(exec_ctx_, *session_info, schema_guard_, table_id))) {
|
||||
LOG_WARN("fail to init table location", KR(ret));
|
||||
}
|
||||
// 获取part_key_obj_index_
|
||||
else if (OB_FAIL(init_part_key_index(table_schema, allocator_))) {
|
||||
LOG_WARN("fail to get rowkey index", KR(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
tenant_id_ = tenant_id;
|
||||
table_id_ = table_id;
|
||||
session_info_ = session_info;
|
||||
is_partitioned_ = is_partitioned;
|
||||
is_inited_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -224,7 +183,7 @@ int ObTableLoadPartitionCalc::get_row(ObTableLoadPartitionCalcContext &ctx, cons
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t rowkey_obj_count = part_key_obj_index_.count();
|
||||
ObObj *rowkey_objs = static_cast<ObObj *>(allocator.alloc(sizeof(ObObj) * rowkey_obj_count));
|
||||
ObDataTypeCastParams cast_params(&tz_info_);
|
||||
ObDataTypeCastParams cast_params(session_info_->get_timezone_info());
|
||||
ObCastCtx cast_ctx(&allocator, &cast_params, CM_NONE, ObCharset::get_system_collation());
|
||||
ObTableLoadCastObjCtx cast_obj_ctx(ctx.param_, &time_cvrt_, &cast_ctx, true);
|
||||
if (OB_ISNULL(rowkey_objs)) {
|
||||
@ -256,7 +215,7 @@ int ObTableLoadPartitionCalc::get_partition_by_row(
|
||||
ObArray<ObTabletID> tablet_ids;
|
||||
ObArray<ObObjectID> part_ids;
|
||||
if (OB_FAIL(table_location_.calculate_partition_ids_by_rows2(
|
||||
session(), schema_guard_, table_id_, part_rows, tablet_ids, part_ids))) {
|
||||
*session_info_, schema_guard_, table_id_, part_rows, tablet_ids, part_ids))) {
|
||||
LOG_WARN("fail to calc partition id", KR(ret));
|
||||
} else if (OB_UNLIKELY(part_rows.count() != part_ids.count() ||
|
||||
part_rows.count() != tablet_ids.count())) {
|
||||
|
||||
@ -13,6 +13,7 @@
|
||||
#include "sql/engine/ob_exec_context.h"
|
||||
#include "observer/table_load/ob_table_load_time_convert.h"
|
||||
#include "observer/table_load/ob_table_load_struct.h"
|
||||
#include "sql/session/ob_sql_session_info.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -36,12 +37,9 @@ struct ObTableLoadPartitionCalcContext
|
||||
|
||||
class ObTableLoadPartitionCalc
|
||||
{
|
||||
public:
|
||||
static int init_session();
|
||||
static oceanbase::sql::ObSQLSessionInfo &get_session();
|
||||
public:
|
||||
ObTableLoadPartitionCalc();
|
||||
int init(uint64_t tenant_id, uint64_t table_id);
|
||||
int init(uint64_t tenant_id, uint64_t table_id, sql::ObSQLSessionInfo *session_info);
|
||||
int calc(ObTableLoadPartitionCalcContext &ctx);
|
||||
private:
|
||||
int init_part_key_index(const share::schema::ObTableSchema *table_schema,
|
||||
@ -60,7 +58,7 @@ public:
|
||||
};
|
||||
public:
|
||||
table::ObTableLoadArray<IndexAndType> part_key_obj_index_;
|
||||
common::ObTimeZoneInfo tz_info_;
|
||||
sql::ObSQLSessionInfo *session_info_;
|
||||
ObTableLoadTimeConverter time_cvrt_;
|
||||
bool is_partition_with_autoinc_;
|
||||
int64_t partition_with_autoinc_idx_;
|
||||
|
||||
@ -204,7 +204,7 @@ int ObTableLoadTransBucketWriter::handle_partition_with_autoinc_identity(
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t row_count = obj_rows.count();
|
||||
ObArenaAllocator autoinc_allocator("TLD_Autoinc", OB_MALLOC_NORMAL_BLOCK_SIZE, param_.tenant_id_);
|
||||
ObDataTypeCastParams cast_params(&(coordinator_ctx_->partition_calc_.tz_info_));
|
||||
ObDataTypeCastParams cast_params(coordinator_ctx_->partition_calc_.session_info_->get_timezone_info());
|
||||
ObCastCtx cast_ctx(&autoinc_allocator, &cast_params, CM_NONE,
|
||||
ObCharset::get_system_collation());
|
||||
ObTableLoadCastObjCtx cast_obj_ctx(param_, &(coordinator_ctx_->partition_calc_.time_cvrt_), &cast_ctx,
|
||||
|
||||
Reference in New Issue
Block a user