remove database id from direct_load
This commit is contained in:
@ -66,7 +66,6 @@ int ObTableLoadBeginP::process()
|
|||||||
bool is_new = false;
|
bool is_new = false;
|
||||||
ObTableLoadParam param;
|
ObTableLoadParam param;
|
||||||
param.tenant_id_ = credential_.tenant_id_;
|
param.tenant_id_ = credential_.tenant_id_;
|
||||||
param.database_id_ = credential_.database_id_;
|
|
||||||
param.table_id_ = table_id;
|
param.table_id_ = table_id;
|
||||||
param.batch_size_ = arg_.config_.batch_size_;
|
param.batch_size_ = arg_.config_.batch_size_;
|
||||||
param.session_count_ = session_count;
|
param.session_count_ = session_count;
|
||||||
@ -181,7 +180,6 @@ int ObTableLoadPreBeginPeerP::process()
|
|||||||
bool is_new = false;
|
bool is_new = false;
|
||||||
ObTableLoadParam param;
|
ObTableLoadParam param;
|
||||||
param.tenant_id_ = credential_.tenant_id_;
|
param.tenant_id_ = credential_.tenant_id_;
|
||||||
param.database_id_ = credential_.database_id_;
|
|
||||||
param.table_id_ = arg_.table_id_;
|
param.table_id_ = arg_.table_id_;
|
||||||
param.target_table_id_ = arg_.target_table_id_;
|
param.target_table_id_ = arg_.target_table_id_;
|
||||||
param.batch_size_ = arg_.config_.batch_size_;
|
param.batch_size_ = arg_.config_.batch_size_;
|
||||||
|
|||||||
@ -72,10 +72,10 @@ int ObTableLoadCoordinator::init_ctx(ObTableLoadTableCtx *ctx, const ObIArray<in
|
|||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid agrs", KR(ret));
|
LOG_WARN("invalid agrs", KR(ret));
|
||||||
} else if (OB_FAIL(ObTableLoadSchema::get_table_schema(ctx->param_.tenant_id_,
|
} else if (OB_FAIL(ObTableLoadSchema::get_table_schema(ctx->param_.tenant_id_,
|
||||||
ctx->param_.database_id_, ctx->schema_.table_name_,
|
ctx->param_.table_id_,
|
||||||
schema_guard, table_schema))) {
|
schema_guard, table_schema))) {
|
||||||
LOG_WARN("fail to get table schema", KR(ret), K(ctx->param_.tenant_id_),
|
LOG_WARN("fail to get table schema", KR(ret), K(ctx->param_.tenant_id_),
|
||||||
K(ctx->param_.database_id_), K(ctx->schema_.table_name_));
|
K(ctx->schema_.table_name_));
|
||||||
} else if (OB_FAIL(ObTableLoadSchema::check_constraints(ctx->param_.tenant_id_,
|
} else if (OB_FAIL(ObTableLoadSchema::check_constraints(ctx->param_.tenant_id_,
|
||||||
schema_guard, table_schema))) {
|
schema_guard, table_schema))) {
|
||||||
LOG_WARN("fail to check schema constraints", KR(ret), K(ctx->param_.tenant_id_));
|
LOG_WARN("fail to check schema constraints", KR(ret), K(ctx->param_.tenant_id_));
|
||||||
|
|||||||
@ -54,7 +54,7 @@ int ObTableLoadCoordinatorCtx::init(ObSQLSessionInfo *session_info,
|
|||||||
LOG_WARN("failed to init ddl processor", KR(ret));
|
LOG_WARN("failed to init ddl processor", KR(ret));
|
||||||
} else if (OB_FAIL(redef_table_.start())) {
|
} else if (OB_FAIL(redef_table_.start())) {
|
||||||
LOG_WARN("failed to create hidden table", KR(ret));
|
LOG_WARN("failed to create hidden table", KR(ret));
|
||||||
} else if (OB_FAIL(target_schema_.init(ctx_->param_.tenant_id_, ctx_->param_.database_id_,
|
} else if (OB_FAIL(target_schema_.init(ctx_->param_.tenant_id_,
|
||||||
ctx_->param_.target_table_id_))) {
|
ctx_->param_.target_table_id_))) {
|
||||||
LOG_WARN("fail to init table load schema", KR(ret), K(ctx_->param_.tenant_id_),
|
LOG_WARN("fail to init table load schema", KR(ret), K(ctx_->param_.tenant_id_),
|
||||||
K(ctx_->param_.target_table_id_));
|
K(ctx_->param_.target_table_id_));
|
||||||
@ -171,7 +171,7 @@ int ObTableLoadCoordinatorCtx::generate_credential(uint64_t user_id)
|
|||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("user info is null", K(ret), K(ctx_->param_.tenant_id_), K(user_id));
|
LOG_WARN("user info is null", K(ret), K(ctx_->param_.tenant_id_), K(user_id));
|
||||||
} else if (OB_FAIL(ObTableLoadUtils::generate_credential(ctx_->param_.tenant_id_, user_id,
|
} else if (OB_FAIL(ObTableLoadUtils::generate_credential(ctx_->param_.tenant_id_, user_id,
|
||||||
ctx_->param_.database_id_, expire_ts,
|
0, expire_ts,
|
||||||
user_info->get_passwd_str().hash(), allocator_, credential_))) {
|
user_info->get_passwd_str().hash(), allocator_, credential_))) {
|
||||||
LOG_WARN("fail to generate credential", KR(ret));
|
LOG_WARN("fail to generate credential", KR(ret));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -63,29 +63,6 @@ int ObTableLoadSchema::get_table_schema(uint64_t tenant_id, uint64_t table_id,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTableLoadSchema::get_database_and_table_schema(uint64_t tenant_id, uint64_t database_id,
|
|
||||||
uint64_t table_id,
|
|
||||||
ObSchemaGetterGuard &schema_guard,
|
|
||||||
const ObDatabaseSchema *&database_schema,
|
|
||||||
const ObTableSchema *&table_schema)
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
if (OB_FAIL(get_schema_guard(tenant_id, schema_guard))) {
|
|
||||||
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
|
|
||||||
} else if (OB_FAIL(schema_guard.get_database_schema(tenant_id, database_id, database_schema))) {
|
|
||||||
LOG_WARN("fail to get database schema", KR(ret), K(tenant_id), K(database_id));
|
|
||||||
} else if (OB_ISNULL(database_schema)) {
|
|
||||||
ret = OB_ERR_BAD_DATABASE;
|
|
||||||
LOG_WARN("table not exist", KR(ret), K(tenant_id), K(database_id));
|
|
||||||
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
|
|
||||||
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(database_id), K(table_id));
|
|
||||||
} else if (OB_ISNULL(table_schema)) {
|
|
||||||
ret = OB_TABLE_NOT_EXIST;
|
|
||||||
LOG_WARN("table not exist", KR(ret), K(tenant_id), K(database_id), K(table_id));
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObTableLoadSchema::get_column_names(const ObTableSchema *table_schema, ObIAllocator &allocator,
|
int ObTableLoadSchema::get_column_names(const ObTableSchema *table_schema, ObIAllocator &allocator,
|
||||||
ObTableLoadArray<ObString> &column_names)
|
ObTableLoadArray<ObString> &column_names)
|
||||||
{
|
{
|
||||||
@ -186,7 +163,6 @@ ObTableLoadSchema::~ObTableLoadSchema()
|
|||||||
|
|
||||||
void ObTableLoadSchema::reset()
|
void ObTableLoadSchema::reset()
|
||||||
{
|
{
|
||||||
database_name_.reset();
|
|
||||||
table_name_.reset();
|
table_name_.reset();
|
||||||
is_partitioned_table_ = false;
|
is_partitioned_table_ = false;
|
||||||
is_heap_table_ = false;
|
is_heap_table_ = false;
|
||||||
@ -204,7 +180,7 @@ void ObTableLoadSchema::reset()
|
|||||||
is_inited_ = false;
|
is_inited_ = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTableLoadSchema::init(uint64_t tenant_id, uint64_t database_id, uint64_t table_id)
|
int ObTableLoadSchema::init(uint64_t tenant_id, uint64_t table_id)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (IS_INIT) {
|
if (IS_INIT) {
|
||||||
@ -213,13 +189,9 @@ int ObTableLoadSchema::init(uint64_t tenant_id, uint64_t database_id, uint64_t t
|
|||||||
} else {
|
} else {
|
||||||
allocator_.set_tenant_id(tenant_id);
|
allocator_.set_tenant_id(tenant_id);
|
||||||
ObSchemaGetterGuard schema_guard;
|
ObSchemaGetterGuard schema_guard;
|
||||||
const ObDatabaseSchema *database_schema = nullptr;
|
|
||||||
const ObTableSchema *table_schema = nullptr;
|
const ObTableSchema *table_schema = nullptr;
|
||||||
if (OB_FAIL(get_database_and_table_schema(tenant_id, database_id, table_id, schema_guard,
|
if (OB_FAIL(get_table_schema(tenant_id, table_id, schema_guard, table_schema))) {
|
||||||
database_schema, table_schema))) {
|
|
||||||
LOG_WARN("fail to get database and table schema", KR(ret), K(tenant_id));
|
LOG_WARN("fail to get database and table schema", KR(ret), K(tenant_id));
|
||||||
} else if (OB_FAIL(init_database_schema(database_schema))) {
|
|
||||||
LOG_WARN("fail to init database schema", KR(ret));
|
|
||||||
} else if (OB_FAIL(init_table_schema(table_schema))) {
|
} else if (OB_FAIL(init_table_schema(table_schema))) {
|
||||||
LOG_WARN("fail to init table schema", KR(ret));
|
LOG_WARN("fail to init table schema", KR(ret));
|
||||||
} else {
|
} else {
|
||||||
@ -229,21 +201,6 @@ int ObTableLoadSchema::init(uint64_t tenant_id, uint64_t database_id, uint64_t t
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTableLoadSchema::init_database_schema(const ObDatabaseSchema *database_schema)
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
if (OB_ISNULL(database_schema)) {
|
|
||||||
ret = OB_INVALID_ARGUMENT;
|
|
||||||
LOG_WARN("invalid args", KR(ret), KP(database_schema));
|
|
||||||
} else {
|
|
||||||
if (OB_FAIL(ObTableLoadUtils::deep_copy(database_schema->get_database_name_str(),
|
|
||||||
database_name_, allocator_))) {
|
|
||||||
LOG_WARN("fail to deep copy database name", KR(ret));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObTableLoadSchema::init_table_schema(const ObTableSchema *table_schema)
|
int ObTableLoadSchema::init_table_schema(const ObTableSchema *table_schema)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|||||||
@ -28,11 +28,6 @@ public:
|
|||||||
static int get_table_schema(uint64_t tenant_id, uint64_t table_id,
|
static int get_table_schema(uint64_t tenant_id, uint64_t table_id,
|
||||||
share::schema::ObSchemaGetterGuard &schema_guard,
|
share::schema::ObSchemaGetterGuard &schema_guard,
|
||||||
const share::schema::ObTableSchema *&table_schema);
|
const share::schema::ObTableSchema *&table_schema);
|
||||||
static int get_database_and_table_schema(uint64_t tenant_id, uint64_t database_id,
|
|
||||||
uint64_t table_id,
|
|
||||||
share::schema::ObSchemaGetterGuard &schema_guard,
|
|
||||||
const share::schema::ObDatabaseSchema *&database_schema,
|
|
||||||
const share::schema::ObTableSchema *&table_schema);
|
|
||||||
static int get_column_names(const share::schema::ObTableSchema *table_schema,
|
static int get_column_names(const share::schema::ObTableSchema *table_schema,
|
||||||
common::ObIAllocator &allocator,
|
common::ObIAllocator &allocator,
|
||||||
table::ObTableLoadArray<common::ObString> &column_names);
|
table::ObTableLoadArray<common::ObString> &column_names);
|
||||||
@ -44,17 +39,15 @@ public:
|
|||||||
ObTableLoadSchema();
|
ObTableLoadSchema();
|
||||||
~ObTableLoadSchema();
|
~ObTableLoadSchema();
|
||||||
void reset();
|
void reset();
|
||||||
int init(uint64_t tenant_id, uint64_t database_id, uint64_t table_id);
|
int init(uint64_t tenant_id, uint64_t table_id);
|
||||||
bool is_valid() const { return is_inited_; }
|
bool is_valid() const { return is_inited_; }
|
||||||
TO_STRING_KV(K_(database_name), K_(table_name), K_(is_partitioned_table), K_(is_heap_table),
|
TO_STRING_KV(K_(table_name), K_(is_partitioned_table), K_(is_heap_table),
|
||||||
K_(has_autoinc_column), K_(has_identity_column), K_(rowkey_column_count), K_(column_count),
|
K_(has_autoinc_column), K_(has_identity_column), K_(rowkey_column_count), K_(column_count),
|
||||||
K_(collation_type), K_(column_descs), K_(is_inited));
|
K_(collation_type), K_(column_descs), K_(is_inited));
|
||||||
private:
|
private:
|
||||||
int init_database_schema(const share::schema::ObDatabaseSchema *database_schema);
|
|
||||||
int init_table_schema(const share::schema::ObTableSchema *table_schema);
|
int init_table_schema(const share::schema::ObTableSchema *table_schema);
|
||||||
public:
|
public:
|
||||||
common::ObArenaAllocator allocator_;
|
common::ObArenaAllocator allocator_;
|
||||||
common::ObString database_name_;
|
|
||||||
common::ObString table_name_;
|
common::ObString table_name_;
|
||||||
bool is_partitioned_table_;
|
bool is_partitioned_table_;
|
||||||
bool is_heap_table_;
|
bool is_heap_table_;
|
||||||
|
|||||||
@ -66,7 +66,6 @@ struct ObTableLoadParam
|
|||||||
{
|
{
|
||||||
ObTableLoadParam()
|
ObTableLoadParam()
|
||||||
: tenant_id_(common::OB_INVALID_ID),
|
: tenant_id_(common::OB_INVALID_ID),
|
||||||
database_id_(common::OB_INVALID_ID),
|
|
||||||
table_id_(common::OB_INVALID_ID),
|
table_id_(common::OB_INVALID_ID),
|
||||||
target_table_id_(common::OB_INVALID_ID),
|
target_table_id_(common::OB_INVALID_ID),
|
||||||
session_count_(0),
|
session_count_(0),
|
||||||
@ -82,7 +81,6 @@ struct ObTableLoadParam
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
uint64_t tenant_id_;
|
uint64_t tenant_id_;
|
||||||
uint64_t database_id_;
|
|
||||||
uint64_t table_id_;
|
uint64_t table_id_;
|
||||||
uint64_t target_table_id_;
|
uint64_t target_table_id_;
|
||||||
int32_t session_count_;
|
int32_t session_count_;
|
||||||
@ -110,7 +108,6 @@ struct ObTableLoadParam
|
|||||||
bool is_valid() const
|
bool is_valid() const
|
||||||
{
|
{
|
||||||
return common::OB_INVALID_ID != tenant_id_ &&
|
return common::OB_INVALID_ID != tenant_id_ &&
|
||||||
common::OB_INVALID_ID != database_id_ &&
|
|
||||||
common::OB_INVALID_ID != table_id_ &&
|
common::OB_INVALID_ID != table_id_ &&
|
||||||
//common::OB_INVALID_ID != target_table_id_ &&
|
//common::OB_INVALID_ID != target_table_id_ &&
|
||||||
session_count_ > 0 && session_count_ <= MAX_TABLE_LOAD_SESSION_COUNT &&
|
session_count_ > 0 && session_count_ <= MAX_TABLE_LOAD_SESSION_COUNT &&
|
||||||
@ -118,7 +115,7 @@ struct ObTableLoadParam
|
|||||||
column_count_ > 0;
|
column_count_ > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
TO_STRING_KV(K_(tenant_id), K_(database_id), K_(table_id), K_(target_table_id), K_(session_count),
|
TO_STRING_KV(K_(tenant_id), K_(table_id), K_(target_table_id), K_(session_count),
|
||||||
K_(batch_size), K_(max_error_row_count), K_(column_count),
|
K_(batch_size), K_(max_error_row_count), K_(column_count),
|
||||||
K_(need_sort), K_(px_mode), K_(data_type), K_(dup_action));
|
K_(need_sort), K_(px_mode), K_(data_type), K_(dup_action));
|
||||||
};
|
};
|
||||||
|
|||||||
@ -47,7 +47,7 @@ int ObTableLoadTableCtx::init()
|
|||||||
ret = OB_INIT_TWICE;
|
ret = OB_INIT_TWICE;
|
||||||
LOG_WARN("ObTableLoadTableCtx init twice", KR(ret));
|
LOG_WARN("ObTableLoadTableCtx init twice", KR(ret));
|
||||||
} else {
|
} else {
|
||||||
if (OB_FAIL(schema_.init(param_.tenant_id_, param_.database_id_, param_.table_id_))) {
|
if (OB_FAIL(schema_.init(param_.tenant_id_, param_.table_id_))) {
|
||||||
LOG_WARN("fail to init table load schema", KR(ret), K(param_.tenant_id_),
|
LOG_WARN("fail to init table load schema", KR(ret), K(param_.tenant_id_),
|
||||||
K(param_.table_id_));
|
K(param_.table_id_));
|
||||||
} else if (OB_FAIL(task_allocator_.init("TLD_TaskPool", param_.tenant_id_))) {
|
} else if (OB_FAIL(task_allocator_.init("TLD_TaskPool", param_.tenant_id_))) {
|
||||||
|
|||||||
@ -2037,7 +2037,6 @@ int ObLoadDataDirectImpl::init_execute_context()
|
|||||||
execute_ctx_.allocator_ = &ctx_->get_allocator();
|
execute_ctx_.allocator_ = &ctx_->get_allocator();
|
||||||
ObTableLoadParam load_param;
|
ObTableLoadParam load_param;
|
||||||
load_param.tenant_id_ = execute_param_.tenant_id_;
|
load_param.tenant_id_ = execute_param_.tenant_id_;
|
||||||
load_param.database_id_ = execute_param_.database_id_;
|
|
||||||
load_param.table_id_ = execute_param_.table_id_;
|
load_param.table_id_ = execute_param_.table_id_;
|
||||||
load_param.session_count_ = execute_param_.parallel_;
|
load_param.session_count_ = execute_param_.parallel_;
|
||||||
load_param.batch_size_ = execute_param_.batch_row_count_;
|
load_param.batch_size_ = execute_param_.batch_row_count_;
|
||||||
|
|||||||
@ -56,7 +56,6 @@ int ObTableDirectInsertCtx::init(ObExecContext *exec_ctx,
|
|||||||
ObTableLoadParam param;
|
ObTableLoadParam param;
|
||||||
param.column_count_ = store_column_idxs.count();
|
param.column_count_ = store_column_idxs.count();
|
||||||
param.tenant_id_ = MTL_ID();
|
param.tenant_id_ = MTL_ID();
|
||||||
param.database_id_ = exec_ctx->get_my_session()->get_database_id();
|
|
||||||
param.table_id_ = table_id;
|
param.table_id_ = table_id;
|
||||||
param.batch_size_ = 100;
|
param.batch_size_ = 100;
|
||||||
param.session_count_ = parallel;
|
param.session_count_ = parallel;
|
||||||
@ -155,4 +154,4 @@ int ObTableDirectInsertCtx::init_store_column_idxs(const uint64_t tenant_id,
|
|||||||
}
|
}
|
||||||
|
|
||||||
} // namespace sql
|
} // namespace sql
|
||||||
} // namespace oceanbase
|
} // namespace oceanbase
|
||||||
|
|||||||
Reference in New Issue
Block a user