[FEAT MERGE] information_schema enhence
This commit is contained in:
@ -26,7 +26,9 @@ namespace sql
|
||||
ObMaintainObjDepInfoTask::ObMaintainObjDepInfoTask (const uint64_t tenant_id)
|
||||
: tenant_id_(tenant_id),
|
||||
gctx_(GCTX),
|
||||
rs_rpc_proxy_(*GCTX.rs_rpc_proxy_)
|
||||
rs_rpc_proxy_(*GCTX.rs_rpc_proxy_),
|
||||
view_schema_(&alloc_),
|
||||
reset_view_column_infos_(false)
|
||||
{
|
||||
set_retry_times(0);
|
||||
}
|
||||
@ -39,7 +41,9 @@ ObMaintainObjDepInfoTask::ObMaintainObjDepInfoTask (
|
||||
const DepObjKeyItemList &delete_dep_objs)
|
||||
: tenant_id_(tenant_id),
|
||||
gctx_(GCTX),
|
||||
rs_rpc_proxy_(rs_rpc_proxy)
|
||||
rs_rpc_proxy_(rs_rpc_proxy),
|
||||
view_schema_(&alloc_),
|
||||
reset_view_column_infos_(false)
|
||||
{
|
||||
set_retry_times(0);
|
||||
insert_dep_objs_.assign(insert_dep_objs);
|
||||
@ -88,6 +92,14 @@ int ObMaintainObjDepInfoTask::check_and_build_dep_info_arg(
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_valid = false;
|
||||
for (int64_t i = 0 ; OB_SUCC(ret) && i < dep_objs.count(); ++i) {
|
||||
// multi views in one stmt share ref_obj_array, just update own dep
|
||||
if (view_schema_.get_table_id() != dep_objs.at(i).dep_obj_key_.dep_obj_id_) {
|
||||
/*
|
||||
* create view will maintain -1 as dep obj id, but will not call this function
|
||||
* if current view level is less than 1 will maintain -1 as dep obj id, try to remove task like this earlier
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
if (OB_FAIL(check_cur_maintain_task_is_valid(
|
||||
dep_objs.at(i).dep_obj_key_,
|
||||
dep_objs.at(i).dep_obj_item_.dep_obj_schema_version_,
|
||||
@ -132,6 +144,8 @@ share::ObAsyncTask *ObMaintainObjDepInfoTask::deep_copy(char *buf, const int64_t
|
||||
insert_dep_objs_,
|
||||
update_dep_objs_,
|
||||
delete_dep_objs_);
|
||||
OZ ((static_cast<ObMaintainObjDepInfoTask *> (task))->assign_view_schema(view_schema_));
|
||||
OX ((static_cast<ObMaintainObjDepInfoTask *> (task))->reset_view_column_infos_ = reset_view_column_infos_);
|
||||
}
|
||||
return task;
|
||||
}
|
||||
@ -144,6 +158,7 @@ int ObMaintainObjDepInfoTask::process()
|
||||
SMART_VAR(obrpc::ObDependencyObjDDLArg, dep_obj_info_arg) {
|
||||
dep_obj_info_arg.tenant_id_ = tenant_id_;
|
||||
dep_obj_info_arg.exec_tenant_id_ = tenant_id_;
|
||||
dep_obj_info_arg.reset_view_column_infos_ = reset_view_column_infos_;
|
||||
OZ (gctx_.schema_service_->async_refresh_schema(tenant_id_, last_version));
|
||||
OZ (gctx_.schema_service_->get_tenant_schema_guard(tenant_id_, schema_guard));
|
||||
OZ (check_and_build_dep_info_arg(schema_guard, dep_obj_info_arg,
|
||||
@ -153,9 +168,12 @@ int ObMaintainObjDepInfoTask::process()
|
||||
OZ (check_and_build_dep_info_arg(schema_guard, dep_obj_info_arg,
|
||||
delete_dep_objs_, share::schema::ObReferenceObjTable::DELETE_OP));
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (view_schema_.is_valid() && OB_FAIL(dep_obj_info_arg.schema_.assign(view_schema_))) {
|
||||
LOG_WARN("failed to assign view schema", K(ret));
|
||||
} else if (dep_obj_info_arg.insert_dep_objs_.empty()
|
||||
&& dep_obj_info_arg.update_dep_objs_.empty()
|
||||
&& dep_obj_info_arg.delete_dep_objs_.empty()) {
|
||||
&& dep_obj_info_arg.delete_dep_objs_.empty()
|
||||
&& !dep_obj_info_arg.schema_.is_valid()) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(rs_rpc_proxy_.maintain_obj_dependency_info(dep_obj_info_arg))) {
|
||||
LOG_WARN("failed to maintain_obj_dependency_info", K(ret), K(dep_obj_info_arg));
|
||||
@ -164,6 +182,26 @@ int ObMaintainObjDepInfoTask::process()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMaintainObjDepInfoTask::assign_view_schema(const ObTableSchema &view_schema)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(view_schema_.assign(view_schema))) {
|
||||
LOG_WARN("failed to copy view schema", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMaintainDepInfoTaskQueue::init(const int64_t thread_cnt, const int64_t queue_size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(ObAsyncTaskQueue::init(thread_cnt, queue_size))) {
|
||||
LOG_WARN("failed to init base queue", K(ret));
|
||||
} else if (OB_FAIL(view_info_set_.create(INIT_BKT_SIZE))) {
|
||||
LOG_WARN("failed to init view set", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObMaintainDepInfoTaskQueue::run2()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -174,7 +212,11 @@ void ObMaintainDepInfoTaskQueue::run2()
|
||||
} else {
|
||||
ObAddr zero_addr;
|
||||
while (!stop_) {
|
||||
if (REACH_TIME_INTERVAL(600 * 1000 * 1000)) {
|
||||
if (REACH_TIME_INTERVAL(6 * 1000 * 1000)) {
|
||||
if (0 == queue_.size() && 0 != view_info_set_.size()) {
|
||||
LOG_WARN("queue size not match", K(queue_.size()), K(view_info_set_.size()));
|
||||
view_info_set_.clear();
|
||||
}
|
||||
LOG_INFO("[ASYNC TASK QUEUE]", "queue_size", queue_.size());
|
||||
}
|
||||
if (last_execute_time_ > 0
|
||||
@ -229,6 +271,15 @@ void ObMaintainDepInfoTaskQueue::run2()
|
||||
rescheduled = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
const ObTableSchema &view_schema = (static_cast<ObMaintainObjDepInfoTask *> (task))->get_view_schema();
|
||||
if (view_schema.is_valid()) {
|
||||
// ignore ret
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_SUCCESS != (tmp_ret = view_info_set_.erase_refactored(view_schema.get_table_id()))) {
|
||||
LOG_WARN("failed to erase obj id", K(tmp_ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
last_execute_time_ = ObTimeUtility::current_time();
|
||||
if (!rescheduled) {
|
||||
|
||||
Reference in New Issue
Block a user