[FEAT MERGE]support ddl real time monitoring
This commit is contained in:
@ -53,7 +53,6 @@ using namespace sql;
|
||||
|
||||
namespace rootserver
|
||||
{
|
||||
|
||||
ObDDLTaskKey::ObDDLTaskKey()
|
||||
: object_id_(OB_INVALID_ID), schema_version_(0)
|
||||
{
|
||||
@ -106,6 +105,26 @@ ObCreateDDLTaskParam::ObCreateDDLTaskParam(const uint64_t tenant_id,
|
||||
{
|
||||
}
|
||||
|
||||
ObDDLTaskStatInfo::ObDDLTaskStatInfo()
|
||||
: start_time_(0), finish_time_(0), time_remaining_(0), percentage_(0), op_name_(), target_(), message_()
|
||||
{
|
||||
}
|
||||
|
||||
int ObDDLTaskStatInfo::init(const char *&ddl_type_str, const uint64_t table_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
MEMSET(op_name_, 0, common::MAX_LONG_OPS_NAME_LENGTH);
|
||||
MEMSET(target_, 0, common::MAX_LONG_OPS_TARGET_LENGTH);
|
||||
if (OB_FAIL(databuff_printf(op_name_, common::MAX_LONG_OPS_NAME_LENGTH, "%s", ddl_type_str))) {
|
||||
LOG_WARN("failed to print ddl type str", K(ret));
|
||||
} else if (OB_FAIL(databuff_printf(target_, common::MAX_LONG_OPS_TARGET_LENGTH, "%lu", table_id))) {
|
||||
LOG_WARN("failed to print ddl table name", K(ret), K(table_id));
|
||||
} else {
|
||||
start_time_ = ObTimeUtility::current_time();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTask::get_ddl_type_str(const int64_t ddl_type, const char *&ddl_type_str)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -333,12 +352,15 @@ int ObDDLTask::switch_status(ObDDLTaskStatus new_status, const int ret_code)
|
||||
} else if (table_task_status == SUCCESS && old_status != table_task_status) {
|
||||
real_new_status = SUCCESS;
|
||||
} else if (old_status == new_status) {
|
||||
// do nothing.
|
||||
// do nothing
|
||||
} else if (OB_FAIL(ObDDLTaskRecordOperator::update_task_status(
|
||||
trans, tenant_id_, task_id_, static_cast<int64_t>(real_new_status)))) {
|
||||
LOG_WARN("update task status failed", K(ret), K(task_id_), K(new_status));
|
||||
} else if (OB_FAIL(ObDDLTaskRecordOperator::update_ret_code(trans, tenant_id_, task_id_, ret_code_))) {
|
||||
LOG_WARN("failed to update ret code", K(ret));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(ObDDLTaskRecordOperator::update_ret_code(trans, tenant_id_, task_id_, ret_code_))) {
|
||||
LOG_WARN("failed to update ret code", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
bool commit = (OB_SUCCESS == ret);
|
||||
@ -590,6 +612,32 @@ int ObDDLTask::batch_release_snapshot(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTask::collect_longops_stat(ObLongopsValue &value)
|
||||
{
|
||||
// do nothing
|
||||
int ret = OB_SUCCESS;
|
||||
return ret;
|
||||
};
|
||||
|
||||
int ObDDLTask::copy_longops_stat(ObLongopsValue &value)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
value.trace_id_ = trace_id_;
|
||||
value.tenant_id_ = tenant_id_;
|
||||
value.start_time_ = stat_info_.start_time_;
|
||||
value.finish_time_ = stat_info_.finish_time_;
|
||||
value.elapsed_seconds_ = (ObTimeUtility::current_time() - stat_info_.start_time_);
|
||||
value.time_remaining_ = stat_info_.time_remaining_;
|
||||
value.last_update_time_ = ObTimeUtility::current_time();
|
||||
MEMCPY(value.op_name_, stat_info_.op_name_, common::MAX_LONG_OPS_NAME_LENGTH);
|
||||
value.op_name_[common::MAX_LONG_OPS_NAME_LENGTH - 1] = '\0';
|
||||
MEMCPY(value.target_, stat_info_.target_, common::MAX_LONG_OPS_TARGET_LENGTH);
|
||||
value.target_[common::MAX_LONG_OPS_TARGET_LENGTH - 1] = '\0';
|
||||
MEMCPY(value.message_, stat_info_.message_, common::MAX_LONG_OPS_MESSAGE_LENGTH);
|
||||
value.message_[common::MAX_LONG_OPS_MESSAGE_LENGTH - 1] = '\0';
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTask::push_execution_id()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -706,6 +754,172 @@ int ObDDLTask::check_errsim_error()
|
||||
}
|
||||
#endif
|
||||
|
||||
int ObDDLTask::gather_scanned_rows(
|
||||
const uint64_t tenant_id,
|
||||
const int64_t task_id,
|
||||
ObMySQLProxy &sql_proxy,
|
||||
int64_t &row_scanned)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
row_scanned = 0;
|
||||
char trace_id_str[OB_MAX_TRACE_ID_BUFFER_SIZE] = "";
|
||||
trace_id_.to_string(trace_id_str, OB_MAX_TRACE_ID_BUFFER_SIZE);
|
||||
if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || task_id <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id));
|
||||
} else {
|
||||
ObSqlString scan_sql;
|
||||
sqlclient::ObMySQLResult *scan_result = NULL;
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, scan_res) {
|
||||
if (OB_FAIL(scan_sql.assign_fmt(
|
||||
"SELECT OUTPUT_ROWS FROM %s WHERE TENANT_ID=%lu "
|
||||
"AND TRACE_ID='%s' AND PLAN_OPERATION='PHY_SUBPLAN_SCAN' AND OTHERSTAT_5_VALUE='%ld'",
|
||||
OB_ALL_VIRTUAL_SQL_PLAN_MONITOR_TNAME, tenant_id, trace_id_str, task_id))) {
|
||||
LOG_WARN("failed to assign sql", K(ret));
|
||||
} else if (OB_FAIL(sql_proxy.read(scan_res, tenant_id, scan_sql.ptr()))) {
|
||||
LOG_WARN("fail to execute sql", K(ret));
|
||||
} else if (OB_ISNULL(scan_result = scan_res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("error unexpected, query result must not be NULL", K(ret));
|
||||
} else {
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(scan_result->next())) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
} else {
|
||||
LOG_WARN("failed to get next row", K(ret));
|
||||
}
|
||||
} else {
|
||||
int64_t row_scanned_tmp = 0;
|
||||
EXTRACT_INT_FIELD_MYSQL(*scan_result, "OUTPUT_ROWS", row_scanned_tmp, int64_t);
|
||||
row_scanned += row_scanned_tmp;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTask::gather_sorted_rows(
|
||||
const uint64_t tenant_id,
|
||||
const int64_t task_id,
|
||||
ObMySQLProxy &sql_proxy,
|
||||
int64_t &row_sorted)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
row_sorted = 0;
|
||||
char trace_id_str[OB_MAX_TRACE_ID_BUFFER_SIZE] = "";
|
||||
trace_id_.to_string(trace_id_str, OB_MAX_TRACE_ID_BUFFER_SIZE);
|
||||
if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || task_id <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id));
|
||||
} else {
|
||||
ObSqlString sort_sql;
|
||||
sqlclient::ObMySQLResult *sort_result = NULL;
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, sort_res) {
|
||||
if (OB_FAIL(sort_sql.assign_fmt(
|
||||
"SELECT OTHERSTAT_1_VALUE AS ROW_SORTED FROM %s WHERE TENANT_ID=%lu "
|
||||
"AND TRACE_ID='%s' AND PLAN_OPERATION='PHY_SORT' AND OTHERSTAT_5_VALUE='%ld'",
|
||||
OB_ALL_VIRTUAL_SQL_PLAN_MONITOR_TNAME, tenant_id, trace_id_str, task_id))) {
|
||||
LOG_WARN("failed to assign sql", K(ret));
|
||||
} else if (OB_FAIL(sql_proxy.read(sort_res, tenant_id, sort_sql.ptr()))) {
|
||||
LOG_WARN("fail to execute sql", K(ret));
|
||||
} else if (OB_ISNULL(sort_result = sort_res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("error unexpected, query result must not be NULL", K(ret));
|
||||
}
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(sort_result->next())) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
} else {
|
||||
LOG_WARN("failed to get next row", K(ret));
|
||||
}
|
||||
} else {
|
||||
int64_t row_sorted_tmp = 0;
|
||||
EXTRACT_INT_FIELD_MYSQL(*sort_result, "ROW_SORTED", row_sorted_tmp, int64_t);
|
||||
row_sorted += row_sorted_tmp;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTask::gather_inserted_rows(
|
||||
const uint64_t tenant_id,
|
||||
const int64_t task_id,
|
||||
ObMySQLProxy &sql_proxy,
|
||||
int64_t &row_inserted)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
row_inserted = 0;
|
||||
char trace_id_str[OB_MAX_TRACE_ID_BUFFER_SIZE] = "";
|
||||
trace_id_.to_string(trace_id_str, OB_MAX_TRACE_ID_BUFFER_SIZE);
|
||||
if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || task_id <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id));
|
||||
} else {
|
||||
ObSqlString insert_sql;
|
||||
sqlclient::ObMySQLResult *insert_result = NULL;
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, insert_res) {
|
||||
if (OB_FAIL(insert_sql.assign_fmt(
|
||||
"SELECT OTHERSTAT_1_VALUE AS ROW_INSERTED FROM %s WHERE TENANT_ID=%lu "
|
||||
"AND TRACE_ID='%s' AND PLAN_OPERATION='PHY_PX_MULTI_PART_SSTABLE_INSERT' AND OTHERSTAT_5_VALUE='%ld'",
|
||||
OB_ALL_VIRTUAL_SQL_PLAN_MONITOR_TNAME, tenant_id, trace_id_str, task_id))) {
|
||||
LOG_WARN("failed to assign sql", K(ret));
|
||||
} else if (OB_FAIL(sql_proxy.read(insert_res, tenant_id, insert_sql.ptr()))) {
|
||||
LOG_WARN("fail to execute sql", K(ret), K(insert_sql));
|
||||
} else if (OB_ISNULL(insert_result = insert_res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("error unexpected, query result must not be NULL", K(ret));
|
||||
}
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(insert_result->next())) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
} else {
|
||||
LOG_WARN("failed to get next row", K(ret));
|
||||
}
|
||||
} else {
|
||||
int64_t row_inserted_tmp = 0;
|
||||
EXTRACT_INT_FIELD_MYSQL(*insert_result, "ROW_INSERTED", row_inserted_tmp, int64_t);
|
||||
row_inserted += row_inserted_tmp;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTask::gather_redefinition_stats(const uint64_t tenant_id,
|
||||
const int64_t task_id,
|
||||
ObMySQLProxy &sql_proxy,
|
||||
int64_t &row_scanned,
|
||||
int64_t &row_sorted,
|
||||
int64_t &row_inserted)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
row_scanned = 0;
|
||||
row_sorted = 0;
|
||||
row_inserted = 0;
|
||||
if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || task_id <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id));
|
||||
} else if (OB_FAIL(gather_scanned_rows(tenant_id, task_id, sql_proxy, row_scanned))) {
|
||||
LOG_WARN("gather scanned rows failed", K(ret));
|
||||
} else if (OB_FAIL(gather_sorted_rows(tenant_id, task_id, sql_proxy, row_sorted))) {
|
||||
LOG_WARN("gather sorted rows failed", K(ret));
|
||||
} else if (OB_FAIL(gather_inserted_rows(tenant_id, task_id, sql_proxy, row_inserted))) {
|
||||
LOG_WARN("gather inserted rows failed", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/****************** ObDDLWaitTransEndCtx *************/
|
||||
|
||||
ObDDLWaitTransEndCtx::ObDDLWaitTransEndCtx()
|
||||
@ -818,13 +1032,15 @@ int check_trans_end(const ObArray<SendItem> &send_array,
|
||||
Arg &arg,
|
||||
Res *res,
|
||||
ObIArray<int> &ret_array,
|
||||
ObIArray<int64_t> &snapshot_array)
|
||||
ObIArray<int64_t> &snapshot_array,
|
||||
transaction::ObTransID &pending_tx_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ret_array.reuse();
|
||||
snapshot_array.reuse();
|
||||
hash::ObHashMap<obrpc::ObLSTabletPair, obrpc::ObCheckTransElapsedResult> result_map;
|
||||
ObArray<SendItem> tmp_send_array;
|
||||
pending_tx_id.reset();
|
||||
if (OB_UNLIKELY(send_array.empty())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret));
|
||||
@ -915,6 +1131,8 @@ int check_trans_end(const ObArray<SendItem> &send_array,
|
||||
LOG_WARN("push back return code failed", K(ret), K(send_item), K(result_item));
|
||||
} else if (OB_FAIL(snapshot_array.push_back(result_item.snapshot_))) {
|
||||
LOG_WARN("push back snapshot failed", K(ret), K(send_item), K(result_item));
|
||||
} else if (result_item.pending_tx_id_.is_valid() && !pending_tx_id.is_valid()) {
|
||||
pending_tx_id = result_item.pending_tx_id_;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -950,7 +1168,7 @@ int ObDDLWaitTransEndCtx::check_schema_trans_end(
|
||||
arg.tenant_id_ = tenant_id;
|
||||
arg.schema_version_ = schema_version;
|
||||
arg.need_wait_trans_end_ = need_wait_trans_end;
|
||||
if (OB_FAIL(check_trans_end(send_array, proxy, arg, res, ret_array, snapshot_array))) {
|
||||
if (OB_FAIL(check_trans_end(send_array, proxy, arg, res, ret_array, snapshot_array, pending_tx_id_))) {
|
||||
LOG_WARN("check trans end failed", K(ret));
|
||||
}
|
||||
}
|
||||
@ -983,7 +1201,7 @@ int ObDDLWaitTransEndCtx::check_sstable_trans_end(
|
||||
obrpc::ObCheckModifyTimeElapsedResult *res = nullptr;
|
||||
arg.tenant_id_ = tenant_id;
|
||||
arg.sstable_exist_ts_ = sstable_exist_ts;
|
||||
if (OB_FAIL(check_trans_end(send_array, proxy, arg, res, ret_array, snapshot_array))) {
|
||||
if (OB_FAIL(check_trans_end(send_array, proxy, arg, res, ret_array, snapshot_array, pending_tx_id_))) {
|
||||
LOG_WARN("check trans end failed", K(ret));
|
||||
}
|
||||
}
|
||||
@ -2162,6 +2380,22 @@ int ObDDLTaskRecordOperator::kill_task_inner_sql(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTask::init_ddl_task_monitor_info(const ObTableSchema *target_schema)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const char *ddl_type_str = nullptr;
|
||||
const char *target_name = nullptr;
|
||||
if (OB_ISNULL(target_schema)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), KP(target_schema));
|
||||
} else if (OB_FAIL(get_ddl_type_str(task_type_, ddl_type_str))) {
|
||||
LOG_WARN("failed to get ddl type str", K(ret));
|
||||
} else if (OB_FAIL(stat_info_.init(ddl_type_str, target_schema->get_table_id()))) {
|
||||
LOG_WARN("failed to init stat info", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
} // end namespace rootserver
|
||||
} // end namespace oceanbase
|
||||
|
||||
Reference in New Issue
Block a user