[CP] fix direct load column idxs in client task
This commit is contained in:
parent
ed28f133d6
commit
64f5736d2f
@ -30,6 +30,7 @@ namespace observer
|
||||
using namespace common;
|
||||
using namespace sql;
|
||||
using namespace storage;
|
||||
using namespace share::schema;
|
||||
using namespace table;
|
||||
|
||||
/**
|
||||
@ -334,6 +335,57 @@ int ObTableLoadClientTask::create_session_info(uint64_t tenant_id, uint64_t user
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientTask::get_column_idxs(ObIArray<int64_t> &column_idxs) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const uint64_t tenant_id = param_.get_tenant_id();
|
||||
const uint64_t table_id = param_.get_table_id();
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
const ObTableSchema *table_schema = nullptr;
|
||||
ObArray<int64_t> column_ids; // in user define order
|
||||
ObArray<ObColDesc> column_descs; // in storage order
|
||||
bool found_column = true;
|
||||
column_idxs.reset();
|
||||
if (OB_FAIL(
|
||||
ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard, table_schema))) {
|
||||
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
|
||||
} else if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(table_schema, column_ids))) {
|
||||
LOG_WARN("failed to get all column idx", K(ret));
|
||||
} else if (OB_UNLIKELY(column_ids.empty())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected empty column idxs", KR(ret));
|
||||
} else if (OB_FAIL(table_schema->get_column_ids(column_descs))) {
|
||||
LOG_WARN("fail to get column descs", KR(ret));
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && OB_LIKELY(found_column) && i < column_descs.count(); ++i) {
|
||||
const ObColDesc &col_desc = column_descs.at(i);
|
||||
const ObColumnSchemaV2 *col_schema = table_schema->get_column_schema(col_desc.col_id_);
|
||||
if (OB_ISNULL(col_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected null column schema", KR(ret), K(col_desc));
|
||||
} else {
|
||||
found_column = col_schema->is_hidden();
|
||||
}
|
||||
// 在用户定义的列数组中找到对应的列
|
||||
for (int64_t j = 0; OB_SUCC(ret) && OB_LIKELY(!found_column) && j < column_ids.count(); ++j) {
|
||||
const int64_t column_id = column_ids.at(j);
|
||||
if (col_desc.col_id_ == column_id) {
|
||||
found_column = true;
|
||||
if (OB_FAIL(column_idxs.push_back(j))) {
|
||||
LOG_WARN("fail to push back column desc", KR(ret), K(column_idxs), K(i), K(col_desc),
|
||||
K(j), K(column_ids));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && OB_UNLIKELY(!found_column)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected column not found", KR(ret), K(column_ids), K(column_descs),
|
||||
K(column_idxs));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientTask::init_exec_ctx(int64_t timeout_us, int64_t heartbeat_timeout_us)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -570,9 +622,8 @@ int ObTableLoadClientTask::init_instance()
|
||||
ObArray<int64_t> column_idxs;
|
||||
if (OB_FAIL(GCTX.omt_->get_tenant(param_.get_tenant_id(), tenant))) {
|
||||
LOG_WARN("fail to get tenant handle", KR(ret), K(param_.get_tenant_id()));
|
||||
} else if (OB_FAIL(ObTableLoadSchema::get_column_idxs(param_.get_tenant_id(),
|
||||
param_.get_table_id(), column_idxs))) {
|
||||
LOG_WARN("failed to get column idx", K(ret));
|
||||
} else if (OB_FAIL(get_column_idxs(column_idxs))) {
|
||||
LOG_WARN("failed to get column idxs", K(ret));
|
||||
} else if (OB_UNLIKELY(column_idxs.empty())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected empty column idxs", KR(ret));
|
||||
|
@ -107,6 +107,7 @@ private:
|
||||
sql::ObFreeSessionCtx &free_session_ctx);
|
||||
int init_exec_ctx(int64_t timeout_us, int64_t heartbeat_timeout_us);
|
||||
|
||||
int get_column_idxs(ObIArray<int64_t> &column_idxs) const;
|
||||
int init_instance();
|
||||
int commit_instance();
|
||||
void destroy_instance();
|
||||
|
@ -110,8 +110,9 @@ int ObTableLoadSchema::get_table_schema(uint64_t tenant_id, uint64_t table_id,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadSchema::get_column_names(const ObTableSchema *table_schema, ObIAllocator &allocator,
|
||||
ObIArray<ObString> &column_names)
|
||||
int ObTableLoadSchema::get_user_column_names(const ObTableSchema *table_schema,
|
||||
ObIAllocator &allocator,
|
||||
ObIArray<ObString> &column_names)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(table_schema)) {
|
||||
@ -131,7 +132,9 @@ int ObTableLoadSchema::get_column_names(const ObTableSchema *table_schema, ObIAl
|
||||
} else if (OB_ISNULL(column_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("The column is null", KR(ret));
|
||||
} else if (!column_schema->is_hidden() && !column_schema->is_invisible_column()) {
|
||||
} else if (column_schema->is_hidden()) {
|
||||
// 不显示隐藏pk
|
||||
} else {
|
||||
ObString column_name;
|
||||
if (OB_FAIL(
|
||||
ob_write_string(allocator, column_schema->get_column_name_str(), column_name))) {
|
||||
@ -145,46 +148,32 @@ int ObTableLoadSchema::get_column_names(const ObTableSchema *table_schema, ObIAl
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadSchema::get_column_idxs(uint64_t tenant_id, uint64_t table_id,
|
||||
ObIArray<int64_t> &column_idxs)
|
||||
int ObTableLoadSchema::get_user_column_ids(const ObTableSchema *table_schema,
|
||||
ObIArray<int64_t> &column_ids)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
column_idxs.reset();
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
const ObTableSchema *table_schema = nullptr;
|
||||
if (OB_FAIL(get_table_schema(tenant_id, table_id, schema_guard, table_schema))) {
|
||||
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
|
||||
} else {
|
||||
ret = get_column_idxs(table_schema, column_idxs);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadSchema::get_column_idxs(const ObTableSchema *table_schema,
|
||||
ObIArray<int64_t> &column_idxs)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
column_idxs.reset();
|
||||
column_ids.reset();
|
||||
if (OB_ISNULL(table_schema)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), KP(table_schema));
|
||||
} else {
|
||||
ObArray<ObColDesc> column_descs;
|
||||
column_descs.set_tenant_id(MTL_ID());
|
||||
if (OB_FAIL(table_schema->get_column_ids(column_descs, false))) {
|
||||
LOG_WARN("fail to get column ids", KR(ret));
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && (i < column_descs.count()); ++i) {
|
||||
ObColDesc &col_desc = column_descs.at(i);
|
||||
const ObColumnSchemaV2 *column_schema = table_schema->get_column_schema(col_desc.col_id_);
|
||||
if (OB_ISNULL(column_schema)) {
|
||||
ObColumnIterByPrevNextID iter(*table_schema);
|
||||
const ObColumnSchemaV2 *column_schema = NULL;
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(iter.next(column_schema))) {
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
LOG_WARN("fail to iterate all table columns", KR(ret));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
}
|
||||
} else if (OB_ISNULL(column_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("The column is null", KR(ret));
|
||||
} else if (!column_schema->is_hidden() && !column_schema->is_invisible_column()) {
|
||||
const int64_t idx = col_desc.col_id_ - OB_APP_MIN_COLUMN_ID;
|
||||
if (OB_FAIL(column_idxs.push_back(idx))) {
|
||||
LOG_WARN("fail to push back idx", KR(ret), K(idx));
|
||||
}
|
||||
} else if (column_schema->is_hidden()) {
|
||||
// 不显示隐藏pk
|
||||
} else if (OB_FAIL(column_ids.push_back(column_schema->get_column_id()))) {
|
||||
LOG_WARN("fail to push back column id", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -276,6 +265,42 @@ int ObTableLoadSchema::get_lob_meta_tid(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadSchema::check_has_invisible_column(const ObTableSchema *table_schema, bool &bret)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bret = false;
|
||||
for (ObTableSchema::const_column_iterator iter = table_schema->column_begin();
|
||||
OB_SUCC(ret) && iter != table_schema->column_end(); ++iter) {
|
||||
ObColumnSchemaV2 *column_schema = *iter;
|
||||
if (OB_ISNULL(column_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("invalid column schema", K(column_schema));
|
||||
} else if (column_schema->is_invisible_column()) {
|
||||
bret = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadSchema::check_has_unused_column(const ObTableSchema *table_schema, bool &bret)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bret = false;
|
||||
for (ObTableSchema::const_column_iterator iter = table_schema->column_begin();
|
||||
OB_SUCC(ret) && iter != table_schema->column_end(); ++iter) {
|
||||
ObColumnSchemaV2 *column_schema = *iter;
|
||||
if (OB_ISNULL(column_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("invalid column schema", K(column_schema));
|
||||
} else if (column_schema->is_unused()) {
|
||||
bret = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObTableLoadSchema::ObTableLoadSchema()
|
||||
: allocator_("TLD_Schema"),
|
||||
is_partitioned_table_(false),
|
||||
|
@ -38,18 +38,18 @@ public:
|
||||
static int get_table_schema(uint64_t tenant_id, uint64_t table_id,
|
||||
share::schema::ObSchemaGetterGuard &schema_guard,
|
||||
const share::schema::ObTableSchema *&table_schema);
|
||||
static int get_column_names(const share::schema::ObTableSchema *table_schema,
|
||||
common::ObIAllocator &allocator,
|
||||
common::ObIArray<common::ObString> &column_names);
|
||||
static int get_column_idxs(uint64_t tenant_id, uint64_t table_id,
|
||||
common::ObIArray<int64_t> &column_idxs);
|
||||
static int get_column_idxs(const share::schema::ObTableSchema *table_schema,
|
||||
common::ObIArray<int64_t> &column_idxs);
|
||||
static int get_user_column_names(const share::schema::ObTableSchema *table_schema,
|
||||
common::ObIAllocator &allocator,
|
||||
common::ObIArray<common::ObString> &column_names);
|
||||
static int get_user_column_ids(const share::schema::ObTableSchema *table_schema,
|
||||
common::ObIArray<int64_t> &column_ids);
|
||||
static int check_has_udt_column(const share::schema::ObTableSchema *table_schema, bool &bret);
|
||||
static int get_tenant_optimizer_gather_stats_on_load(const uint64_t tenant_id, bool &value);
|
||||
static int get_lob_meta_tid(const uint64_t tenant_id,
|
||||
const uint64_t data_table_id,
|
||||
uint64_t &lob_meta_table_id);
|
||||
static int check_has_invisible_column(const share::schema::ObTableSchema *table_schema, bool &bret);
|
||||
static int check_has_unused_column(const share::schema::ObTableSchema *table_schema, bool &bret);
|
||||
public:
|
||||
ObTableLoadSchema();
|
||||
~ObTableLoadSchema();
|
||||
|
@ -448,6 +448,8 @@ int ObTableLoadService::check_support_direct_load(
|
||||
bool has_udt_column = false;
|
||||
bool has_fts_index = false;
|
||||
bool has_multivalue_index = false;
|
||||
bool has_invisible_column = false;
|
||||
bool has_unused_column = false;
|
||||
if (OB_FAIL(
|
||||
ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard, table_schema))) {
|
||||
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
|
||||
@ -507,6 +509,22 @@ int ObTableLoadService::check_support_direct_load(
|
||||
LOG_WARN("direct-load does not support table has udt column", KR(ret));
|
||||
FORWARD_USER_ERROR_MSG(ret, "direct-load does not support table has udt column");
|
||||
}
|
||||
// check has invisible column
|
||||
else if (OB_FAIL(ObTableLoadSchema::check_has_invisible_column(table_schema, has_invisible_column))) {
|
||||
LOG_WARN("fail to check has invisible column", KR(ret));
|
||||
} else if (has_invisible_column) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("direct-load does not support table has invisible column", KR(ret));
|
||||
FORWARD_USER_ERROR_MSG(ret, "direct-load does not support table has invisible column");
|
||||
}
|
||||
// check has unused column
|
||||
else if (OB_FAIL(ObTableLoadSchema::check_has_unused_column(table_schema, has_unused_column))) {
|
||||
LOG_WARN("fail to check has unused column", KR(ret));
|
||||
} else if (has_unused_column) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("direct-load does not support table has unused column", KR(ret));
|
||||
FORWARD_USER_ERROR_MSG(ret, "direct-load does not support table has unused column");
|
||||
}
|
||||
// check if table has mlog
|
||||
else if (table_schema->has_mlog_table()) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
|
@ -152,11 +152,20 @@ template<class T>
|
||||
int ObTableLoadRow<T>::project(const ObIArray<int64_t> &idx_projector, ObTableLoadRow<T> &projected_row) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(projected_row.init(count_, allocator_handle_))) {
|
||||
if (OB_UNLIKELY(idx_projector.count() != count_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
OB_LOG(WARN, "unexpected count", KR(ret), K(idx_projector), K(count_));
|
||||
} else if (OB_FAIL(projected_row.init(count_, allocator_handle_))) {
|
||||
OB_LOG(WARN, "failed to alloate cells", KR(ret), K(projected_row.count_));
|
||||
} else {
|
||||
for (int64_t j = 0; j < count_; ++j) {
|
||||
projected_row.cells_[j] = cells_[idx_projector.at(j)];
|
||||
const int64_t idx = idx_projector.at(j);
|
||||
if (OB_UNLIKELY(idx < 0 || idx >= count_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
OB_LOG(WARN, "unexpected idx", KR(ret), K(j), K(idx), K(idx_projector));
|
||||
} else {
|
||||
projected_row.cells_[j] = cells_[idx];
|
||||
}
|
||||
}
|
||||
projected_row.seq_no_ = seq_no_;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user