[FEAT MERGE] merge transfer

Co-authored-by: wxhwang <wxhwang@126.com>
Co-authored-by: godyangfight <godyangfight@gmail.com>
Co-authored-by: Tyshawn <tuyunshan@gmail.com>
This commit is contained in:
xuhuleon
2023-06-21 11:42:26 +00:00
committed by ob-robot
parent d06678002e
commit 9dae112952
1280 changed files with 149724 additions and 48813 deletions

90
src/share/scheduler/ob_dag_scheduler.cpp Normal file → Executable file
View File

@ -31,6 +31,7 @@
#include "storage/compaction/ob_tablet_merge_task.h"
#include "storage/compaction/ob_compaction_diagnose.h"
#include "storage/ddl/ob_complement_data_task.h"
#include "storage/multi_data_source/ob_mds_table_merge_dag.h"
#include <sys/sysinfo.h>
#include <algorithm>
@ -727,19 +728,20 @@ int64_t ObIDag::to_string(char *buf, const int64_t buf_len) const
return pos;
}
void ObIDag::gene_basic_warning_info(ObDagWarningInfo &info)
{
info.dag_type_ = type_;
info.tenant_id_ = MTL_ID();
info.gmt_create_ = info.gmt_modified_;
}
void ObIDag::gene_warning_info(ObDagWarningInfo &info)
int ObIDag::gene_warning_info(ObDagWarningInfo &info, ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
info.dag_ret_ = dag_ret_;
info.task_id_ = id_;
info.gmt_modified_ = ObTimeUtility::fast_current_time();
fill_comment(info.warning_info_, OB_DAG_WARNING_INFO_LENGTH);
info.dag_type_ = type_;
info.tenant_id_ = MTL_ID();
info.gmt_create_ = info.gmt_modified_;
info.dag_status_ = ObDagWarningInfo::ODS_WARNING;
if (OB_FAIL(fill_info_param(info.info_param_, allocator))) {
COMMON_LOG(WARN, "failed to fill info param into dag warning info", K(ret));
}
return ret;
}
int ObIDag::reset_status_for_retry()
@ -838,6 +840,22 @@ int ObIDag::inner_add_child_without_inheritance(ObIDag &child)
return ret;
}
int ObIDag::fill_comment(char *buf, const int64_t buf_len)
{
int ret = OB_SUCCESS;
MEMSET(buf, '\0', buf_len);
compaction::ObInfoParamBuffer allocator;
compaction::ObIBasicInfoParam *param = nullptr;
if (OB_FAIL(fill_info_param(param, allocator))) {
COMMON_LOG(WARN, "failed to fill info param", K(ret));
} else if (OB_ISNULL(param)) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "param is null", K(ret));
} else if (OB_FAIL(param->fill_comment(buf, buf_len))) {
COMMON_LOG(WARN, "failed to fill comment", K(ret));
}
return ret;
}
/*************************************ObIDagNet***********************************/
@ -1104,10 +1122,9 @@ void ObIDag::gene_dag_info(ObDagInfo &info, const char *list_info)
COMMON_LOG_RET(WARN, tmp_ret, "failed to fill dag key", K(tmp_ret));
}
if (OB_TMP_FAIL(fill_comment(info.comment_, OB_DAG_COMMET_LENGTH))) {
COMMON_LOG_RET(WARN, tmp_ret, "failed to fill dag comment", K(tmp_ret));
if (OB_TMP_FAIL(fill_comment(info.comment_, sizeof(info.comment_)))) {
COMMON_LOG_RET(WARN, tmp_ret, "failed to fill comment");
}
info.comment_[strlen(info.comment_)] = ';';
ADD_TASK_INFO_PARAM(info.comment_, OB_DAG_COMMET_LENGTH,
"check_can_schedule", check_can_schedule(),
@ -1129,10 +1146,6 @@ void ObIDag::gene_dag_info(ObDagInfo &info, const char *list_info)
"indegree", cur_task->indegree_);
cur_task = cur_task->get_next();
} // end while
} else {
if (OB_TMP_FAIL(fill_comment(info.comment_, OB_DAG_COMMET_LENGTH))) {
COMMON_LOG_RET(WARN, tmp_ret, "failed to fill dag comment", K(tmp_ret));
}
}
if (OB_NOT_NULL(dag_net_)) {
info.dag_net_type_ = dag_net_->get_type();
@ -2169,10 +2182,19 @@ int ObTenantDagScheduler::check_ls_compaction_dag_exist(const ObLSID &ls_id, boo
ObIDag *head = dag_list_[READY_DAG_LIST].get_head(ObIDag::MergeDagPrio[i]);
ObIDag *cur = head->get_next();
while (head != cur) {
dag = static_cast<compaction::ObTabletMergeDag *>(cur);
if (ls_id == dag->get_ctx().param_.ls_id_) {
exist = true;
break;
if (ObDagType::DAG_TYPE_MDS_TABLE_MERGE == cur->get_type()) {
// TODO (bowen.gbw) : make ObMdsTableMergeDag inherit from ObTabletMergeDag
const mds::ObMdsTableMergeDag *mds_dag = static_cast<const mds::ObMdsTableMergeDag *>(cur);
if (ls_id == mds_dag->get_param().ls_id_) {
exist = true;
break;
}
} else {
dag = static_cast<compaction::ObTabletMergeDag *>(cur);
if (ls_id == dag->get_ctx().param_.ls_id_) {
exist = true;
break;
}
}
cur = cur->get_next();
}
@ -2499,7 +2521,7 @@ int ObTenantDagScheduler::finish_dag_(
LOG_INFO("dag finished", "dag_ret", dag.get_dag_ret(),
"runtime", ObTimeUtility::fast_current_time() - dag.start_time_,
K_(dag_cnt), K(dag_cnts_[dag.get_type()]), K(&dag), K(dag));
if (OB_TMP_FAIL(ObDagWarningHistoryManager::get_instance().add_dag_warning_info(&dag))) { // ignore failure
if (OB_TMP_FAIL(MTL(ObDagWarningHistoryManager *)->add_dag_warning_info(&dag))) { // ignore failure
COMMON_LOG(WARN, "failed to add dag warning info", K(tmp_ret), K(dag));
}
if (OB_TMP_FAIL(ObSysTaskStatMgr::get_instance().del_task(dag.get_dag_id()))) {
@ -2630,8 +2652,11 @@ int ObTenantDagScheduler::sys_task_start(ObIDag *dag)
sys_task_status.tenant_id_ = MTL_ID();
sys_task_status.task_type_ = OB_DAG_TYPES[dag->get_type()].sys_task_type_;
int tmp_ret = OB_SUCCESS;
// allow comment truncation, no need to set ret
(void) dag->fill_comment(sys_task_status.comment_,sizeof(sys_task_status.comment_));
if (OB_TMP_FAIL(dag->fill_comment(sys_task_status.comment_, sizeof(sys_task_status.comment_)))) {
COMMON_LOG(WARN, "failed to fill comment", K(ret));
}
if (OB_SUCCESS != (ret = ObSysTaskStatMgr::get_instance().add_task(sys_task_status))) {
COMMON_LOG(WARN, "failed to add sys task", K(ret), K(sys_task_status));
} else if (OB_SUCCESS != (ret = dag->set_dag_id(sys_task_status.task_id_))) { // may generate task_id in ObSysTaskStatMgr::add_task
@ -2922,7 +2947,7 @@ int ObTenantDagScheduler::schedule_one(const int64_t priority)
++total_running_task_cnt_;
running_workers_.add_last(worker, priority);
if (task != NULL) {
COMMON_LOG(INFO, "schedule one task", KPC(task), "priority", OB_DAG_PRIOS[priority].dag_prio_str_,
COMMON_LOG(INFO, "schedule one task", KP(task), "priority", OB_DAG_PRIOS[priority].dag_prio_str_,
"group id", worker->get_group_id(), K_(total_running_task_cnt), K(running_task_cnts_[priority]),
K(low_limits_[priority]), K(up_limits_[priority]), KP(task->get_dag()->get_dag_net()));
}
@ -3469,12 +3494,29 @@ int ObTenantDagScheduler::get_complement_data_dag_progress(const ObIDag *dag,
return ret;
}
// for unittest
int ObTenantDagScheduler::get_first_dag_net(ObIDagNet *&dag_net)
{
int ret = OB_SUCCESS;
dag_net = nullptr;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
COMMON_LOG(WARN, "ObDagScheduler is not inited", K(ret));
} else {
ObMutexGuard guard(dag_net_map_lock_);
DagNetMap::iterator iter = dag_net_map_[RUNNING_DAG_NET_MAP].begin();
if (iter != dag_net_map_[RUNNING_DAG_NET_MAP].end()) {
dag_net = iter->second;
}
}
return ret;
}
int ObFakeTask::process()
{
COMMON_LOG(INFO, "ObFakeTask process");
return OB_SUCCESS;
}
} //namespace share
} //namespace oceanbase