[CP] Fix truncate partition blocks in ddl scheduler
This commit is contained in:
@ -335,14 +335,11 @@ int ObDDLRetryTask::drop_schema(const ObDDLTaskStatus next_task_status)
|
|||||||
case ObDDLType::DDL_DROP_SUB_PARTITION:
|
case ObDDLType::DDL_DROP_SUB_PARTITION:
|
||||||
case ObDDLType::DDL_TRUNCATE_PARTITION:
|
case ObDDLType::DDL_TRUNCATE_PARTITION:
|
||||||
case ObDDLType::DDL_TRUNCATE_SUB_PARTITION: {
|
case ObDDLType::DDL_TRUNCATE_SUB_PARTITION: {
|
||||||
obrpc::ObAlterTableRes alter_table_res;
|
|
||||||
obrpc::ObAlterTableArg *arg = static_cast<obrpc::ObAlterTableArg *>(ddl_arg_);
|
obrpc::ObAlterTableArg *arg = static_cast<obrpc::ObAlterTableArg *>(ddl_arg_);
|
||||||
arg->is_add_to_scheduler_ = false;
|
arg->is_add_to_scheduler_ = false;
|
||||||
arg->task_id_ = task_id_;
|
arg->task_id_ = task_id_;
|
||||||
if (OB_FAIL(common_rpc_proxy.alter_table(*arg, alter_table_res))) {
|
if (OB_FAIL(common_rpc_proxy.alter_table(*arg, alter_table_res_))) {
|
||||||
LOG_WARN("fail to alter table", K(ret));
|
LOG_WARN("fail to alter table", K(ret));
|
||||||
} else if (OB_FAIL(wait_alter_table(*arg, alter_table_res))) {
|
|
||||||
LOG_WARN("failed to wait alter table", K(ret));
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -373,29 +370,63 @@ int ObDDLRetryTask::drop_schema(const ObDDLTaskStatus next_task_status)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObDDLRetryTask::wait_alter_table(const ObAlterTableArg &alter_table_arg, const ObAlterTableRes &res)
|
int ObDDLRetryTask::wait_alter_table(const ObDDLTaskStatus new_status)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
const static int CHECK_INTERVAL = 100 * 1000; // 100ms
|
bool finish = false;
|
||||||
const uint64_t tenant_id = alter_table_arg.exec_tenant_id_;
|
if (OB_UNLIKELY(!is_inited_)) {
|
||||||
if (alter_table_arg.is_update_global_indexes_
|
ret = OB_NOT_INIT;
|
||||||
&& (ObAlterTableArg::DROP_PARTITION == alter_table_arg.alter_part_type_
|
LOG_WARN("ObDDLRetryTask has not been inited", K(ret));
|
||||||
|| ObAlterTableArg::DROP_SUB_PARTITION == alter_table_arg.alter_part_type_
|
} else if (OB_ISNULL(root_service_)) {
|
||||||
|| ObAlterTableArg::TRUNCATE_PARTITION == alter_table_arg.alter_part_type_
|
ret = OB_ERR_SYS;
|
||||||
|| ObAlterTableArg::TRUNCATE_SUB_PARTITION == alter_table_arg.alter_part_type_)) {
|
LOG_WARN("error sys, root service must not be nullptr", K(ret));
|
||||||
const common::ObSArray<ObAlterTableResArg> &res_array = res.res_arg_array_;
|
} else if (OB_ISNULL(ddl_arg_) || lib::Worker::CompatMode::INVALID == compat_mode_) {
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < res_array.size(); ++i) {
|
ret = OB_ERR_UNEXPECTED;
|
||||||
bool is_finish = false;
|
LOG_WARN("unexpected error", K(ret), KP(ddl_arg_), K(compat_mode_));
|
||||||
while (OB_SUCC(ret) && !is_finish) {
|
} else {
|
||||||
if (OB_FAIL(sql::ObDDLExecutorUtil::wait_build_index_finish(tenant_id, res.task_id_, is_finish))) {
|
switch (task_type_) {
|
||||||
LOG_WARN("wait build index finish failed", K(ret), K(tenant_id), K(res.task_id_));
|
case ObDDLType::DDL_DROP_DATABASE:
|
||||||
} else if (!is_finish) {
|
case ObDDLType::DDL_DROP_TABLE:
|
||||||
ob_usleep(CHECK_INTERVAL);
|
case ObDDLType::DDL_TRUNCATE_TABLE: {
|
||||||
LOG_INFO("index status is not final", K(res));
|
finish = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case ObDDLType::DDL_DROP_PARTITION:
|
||||||
|
case ObDDLType::DDL_DROP_SUB_PARTITION:
|
||||||
|
case ObDDLType::DDL_TRUNCATE_PARTITION:
|
||||||
|
case ObDDLType::DDL_TRUNCATE_SUB_PARTITION: {
|
||||||
|
obrpc::ObAlterTableArg *arg = static_cast<obrpc::ObAlterTableArg *>(ddl_arg_);
|
||||||
|
const uint64_t tenant_id = arg->exec_tenant_id_;
|
||||||
|
common::ObSArray<ObDDLRes> &res_array = alter_table_res_.ddl_res_array_;
|
||||||
|
while (OB_SUCC(ret) && res_array.count() > 0) {
|
||||||
|
const int64_t task_id = res_array.at(res_array.count() - 1).task_id_;
|
||||||
|
bool is_finish = false;
|
||||||
|
if (OB_FAIL(sql::ObDDLExecutorUtil::wait_build_index_finish(tenant_id, task_id, is_finish))) {
|
||||||
|
LOG_WARN("wait build index finish failed", K(ret), K(tenant_id), K(task_id));
|
||||||
|
} else if (is_finish) {
|
||||||
|
res_array.pop_back();
|
||||||
|
LOG_INFO("index status is final", K(ret), K(task_id));
|
||||||
} else {
|
} else {
|
||||||
LOG_INFO("index status is final", K(ret), K(res));
|
LOG_INFO("index status is not final", K(task_id));
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (OB_SUCC(ret) && res_array.count() == 0) {
|
||||||
|
finish = true;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("unexpected ddl type", K(ret), K(task_type_));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_FAIL(ret) || finish) {
|
||||||
|
if (OB_FAIL(switch_status(new_status, ret))) {
|
||||||
|
LOG_WARN("fail to switch task status", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -476,11 +507,17 @@ int ObDDLRetryTask::process()
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case ObDDLTaskStatus::DROP_SCHEMA: {
|
case ObDDLTaskStatus::DROP_SCHEMA: {
|
||||||
if (OB_FAIL(drop_schema(ObDDLTaskStatus::SUCCESS))) {
|
if (OB_FAIL(drop_schema(ObDDLTaskStatus::WAIT_CHILD_TASK_FINISH))) {
|
||||||
LOG_WARN("fail to write barrier log", K(ret));
|
LOG_WARN("fail to write barrier log", K(ret));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case ObDDLTaskStatus::WAIT_CHILD_TASK_FINISH: {
|
||||||
|
if (OB_FAIL(wait_alter_table(ObDDLTaskStatus::SUCCESS))) {
|
||||||
|
LOG_WARN("failed to wait child task", K(ret));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
case ObDDLTaskStatus::FAIL: {
|
case ObDDLTaskStatus::FAIL: {
|
||||||
if (OB_FAIL(fail())) {
|
if (OB_FAIL(fail())) {
|
||||||
LOG_WARN("fail to do clean up", K(ret));
|
LOG_WARN("fail to do clean up", K(ret));
|
||||||
@ -574,7 +611,7 @@ int64_t ObDDLRetryTask::get_serialize_param_size() const
|
|||||||
return serialize_param_size;
|
return serialize_param_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObDDLRetryTask::update_task_status_succ(
|
int ObDDLRetryTask::update_task_status_wait_child_task_finish(
|
||||||
common::ObMySQLTransaction &trans,
|
common::ObMySQLTransaction &trans,
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const int64_t task_id)
|
const int64_t task_id)
|
||||||
@ -584,7 +621,7 @@ int ObDDLRetryTask::update_task_status_succ(
|
|||||||
ObSqlString sql_string;
|
ObSqlString sql_string;
|
||||||
int64_t curr_task_status = 0;
|
int64_t curr_task_status = 0;
|
||||||
int64_t execution_id = 0; /*unused*/
|
int64_t execution_id = 0; /*unused*/
|
||||||
const int64_t new_task_status = ObDDLTaskStatus::SUCCESS;
|
const int64_t new_task_status = ObDDLTaskStatus::WAIT_CHILD_TASK_FINISH;
|
||||||
if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || task_id <= 0)) {
|
if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || task_id <= 0)) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id));
|
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id));
|
||||||
|
|||||||
@ -41,7 +41,7 @@ public:
|
|||||||
virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const override;
|
virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const override;
|
||||||
virtual int deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos) override;
|
virtual int deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos) override;
|
||||||
virtual int64_t get_serialize_param_size() const override;
|
virtual int64_t get_serialize_param_size() const override;
|
||||||
static int update_task_status_succ(
|
static int update_task_status_wait_child_task_finish(
|
||||||
common::ObMySQLTransaction &trans,
|
common::ObMySQLTransaction &trans,
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const int64_t task_id);
|
const int64_t task_id);
|
||||||
@ -49,7 +49,7 @@ private:
|
|||||||
int check_health();
|
int check_health();
|
||||||
int prepare(const share::ObDDLTaskStatus next_task_status);
|
int prepare(const share::ObDDLTaskStatus next_task_status);
|
||||||
int drop_schema(const share::ObDDLTaskStatus next_task_status);
|
int drop_schema(const share::ObDDLTaskStatus next_task_status);
|
||||||
int wait_alter_table(const obrpc::ObAlterTableArg &alter_table_arg, const obrpc::ObAlterTableRes &res);
|
int wait_alter_table(const share::ObDDLTaskStatus next_task_status);
|
||||||
int succ();
|
int succ();
|
||||||
int fail();
|
int fail();
|
||||||
int cleanup();
|
int cleanup();
|
||||||
@ -67,6 +67,7 @@ private:
|
|||||||
int64_t affected_rows_;
|
int64_t affected_rows_;
|
||||||
common::ObString forward_user_message_;
|
common::ObString forward_user_message_;
|
||||||
common::ObArenaAllocator allocator_;
|
common::ObArenaAllocator allocator_;
|
||||||
|
obrpc::ObAlterTableRes alter_table_res_; // in memory
|
||||||
};
|
};
|
||||||
|
|
||||||
} // end namespace rootserver
|
} // end namespace rootserver
|
||||||
|
|||||||
@ -10001,7 +10001,7 @@ int ObDDLService::alter_table_in_trans(obrpc::ObAlterTableArg &alter_table_arg,
|
|||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("array count is unexpected" , K(orig_table_schemas), K(new_table_schemas),
|
LOG_WARN("array count is unexpected" , K(orig_table_schemas), K(new_table_schemas),
|
||||||
K(inc_table_schemas), K(del_table_schemas), KR(ret));
|
K(inc_table_schemas), K(del_table_schemas), KR(ret));
|
||||||
} else if (alter_table_arg.task_id_ > 0 && OB_FAIL(ObDDLRetryTask::update_task_status_succ(trans, tenant_id, alter_table_arg.task_id_))) {
|
} else if (alter_table_arg.task_id_ > 0 && OB_FAIL(ObDDLRetryTask::update_task_status_wait_child_task_finish(trans, tenant_id, alter_table_arg.task_id_))) {
|
||||||
LOG_WARN("update ddl task status failed", K(ret));
|
LOG_WARN("update ddl task status failed", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -10252,11 +10252,17 @@ int ObDDLService::alter_table_in_trans(obrpc::ObAlterTableArg &alter_table_arg,
|
|||||||
LOG_WARN("fail to push ddl task", K(ret), K(task_record));
|
LOG_WARN("fail to push ddl task", K(ret), K(task_record));
|
||||||
} else {
|
} else {
|
||||||
res.task_id_ = task_record.task_id_;
|
res.task_id_ = task_record.task_id_;
|
||||||
|
ObDDLRes ddl_res;
|
||||||
|
ddl_res.tenant_id_ = tenant_id;
|
||||||
|
ddl_res.schema_id_ = create_index_arg->index_schema_.get_schema_version();
|
||||||
|
ddl_res.task_id_ = task_record.task_id_;
|
||||||
obrpc::ObAlterTableResArg arg(TABLE_SCHEMA,
|
obrpc::ObAlterTableResArg arg(TABLE_SCHEMA,
|
||||||
create_index_arg->index_schema_.get_table_id(),
|
create_index_arg->index_schema_.get_table_id(),
|
||||||
create_index_arg->index_schema_.get_schema_version());
|
create_index_arg->index_schema_.get_schema_version());
|
||||||
if (OB_FAIL(res.res_arg_array_.push_back(arg))) {
|
if (OB_FAIL(res.res_arg_array_.push_back(arg))) {
|
||||||
LOG_WARN("push back to res_arg_array failed", K(ret), K(arg));
|
LOG_WARN("push back to res_arg_array failed", K(ret), K(arg));
|
||||||
|
} else if (OB_FAIL(res.ddl_res_array_.push_back(ddl_res))) {
|
||||||
|
LOG_WARN("failed to push back ddl res array", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -12789,7 +12795,7 @@ int ObDDLService::truncate_table_in_trans(const obrpc::ObTruncateTableArg &arg,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
} else if (arg.task_id_ > 0 && OB_FAIL(ObDDLRetryTask::update_task_status_succ(trans, tenant_id, arg.task_id_))) {
|
} else if (arg.task_id_ > 0 && OB_FAIL(ObDDLRetryTask::update_task_status_wait_child_task_finish(trans, tenant_id, arg.task_id_))) {
|
||||||
LOG_WARN("update ddl task status failed", K(ret));
|
LOG_WARN("update ddl task status failed", K(ret));
|
||||||
}
|
}
|
||||||
if (trans.is_started()) {
|
if (trans.is_started()) {
|
||||||
@ -18353,7 +18359,7 @@ int ObDDLService::drop_table(const ObDropTableArg &drop_table_arg, const obrpc::
|
|||||||
// drop table and update ddl task status should be done in single trans.
|
// drop table and update ddl task status should be done in single trans.
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
} else if (drop_table_arg.task_id_ > 0
|
} else if (drop_table_arg.task_id_ > 0
|
||||||
&& OB_FAIL(ObDDLRetryTask::update_task_status_succ(trans, tenant_id, drop_table_arg.task_id_))) {
|
&& OB_FAIL(ObDDLRetryTask::update_task_status_wait_child_task_finish(trans, tenant_id, drop_table_arg.task_id_))) {
|
||||||
LOG_WARN("update task status of drop table failed", K(ret));
|
LOG_WARN("update task status of drop table failed", K(ret));
|
||||||
}
|
}
|
||||||
//no matter success or not, we should publish schema
|
//no matter success or not, we should publish schema
|
||||||
@ -22538,7 +22544,7 @@ int ObDDLService::drop_database(const ObDropDatabaseArg &arg,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
} else if (arg.task_id_ > 0 && OB_FAIL(ObDDLRetryTask::update_task_status_succ(
|
} else if (arg.task_id_ > 0 && OB_FAIL(ObDDLRetryTask::update_task_status_wait_child_task_finish(
|
||||||
OB_ISNULL(ora_user_trans) ? trans : *ora_user_trans, tenant_id, arg.task_id_))) {
|
OB_ISNULL(ora_user_trans) ? trans : *ora_user_trans, tenant_id, arg.task_id_))) {
|
||||||
LOG_WARN("update ddl task status to success failed", K(ret));
|
LOG_WARN("update ddl task status to success failed", K(ret));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -117,6 +117,7 @@ enum ObDDLTaskStatus {
|
|||||||
WAIT_TRANS_END_FOR_UNUSABLE = 13,
|
WAIT_TRANS_END_FOR_UNUSABLE = 13,
|
||||||
DROP_SCHEMA = 14,
|
DROP_SCHEMA = 14,
|
||||||
CHECK_TABLE_EMPTY = 15,
|
CHECK_TABLE_EMPTY = 15,
|
||||||
|
WAIT_CHILD_TASK_FINISH = 16,
|
||||||
FAIL = 99,
|
FAIL = 99,
|
||||||
SUCCESS = 100
|
SUCCESS = 100
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user