From 5177ba45a482112a581447b9f06b37ccbe5dea6e Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 12 May 2023 13:41:14 +0000 Subject: [PATCH] fix single replica reponse of drop column task wait lock too long. --- .../ob_ddl_single_replica_executor.cpp | 87 +++++++++++-------- src/storage/ddl/ob_ddl_merge_task.cpp | 1 + 2 files changed, 52 insertions(+), 36 deletions(-) diff --git a/src/rootserver/ddl_task/ob_ddl_single_replica_executor.cpp b/src/rootserver/ddl_task/ob_ddl_single_replica_executor.cpp index 63a5d424e3..4555b7873d 100644 --- a/src/rootserver/ddl_task/ob_ddl_single_replica_executor.cpp +++ b/src/rootserver/ddl_task/ob_ddl_single_replica_executor.cpp @@ -100,47 +100,62 @@ int ObDDLSingleReplicaExecutor::schedule_task() const int64_t expire_renew_time = force_renew ? INT64_MAX : 0; ObLSID ls_id; common::ObArray ret_array; + ObArray request_source_tablet_ids; + ObArray request_dest_tablet_ids; + ObArray request_tablet_task_ids; int tmp_ret = OB_SUCCESS; { - ObSpinLockGuard guard(lock_); - for (int64_t i = 0; OB_SUCC(ret) && i < build_infos.count(); ++i) { - ObPartitionBuildInfo &build_info = build_infos.at(i); - if (ObPartitionBuildStat::BUILD_INIT == build_info.stat_|| - build_info.need_schedule()) { - // get leader of partition - ObAddr leader_addr; - obrpc::ObDDLBuildSingleReplicaRequestArg arg; - arg.ls_id_ = share::ObLSID::INVALID_LS_ID; - arg.tenant_id_ = tenant_id_; - arg.source_tablet_id_ = source_tablet_ids_.at(i); - arg.dest_tablet_id_ = dest_tablet_ids_.at(i); - arg.source_table_id_ = source_table_id_; - arg.dest_schema_id_ = dest_table_id_; - arg.schema_version_ = schema_version_; - arg.snapshot_version_ = snapshot_version_; - arg.ddl_type_ = type_; - arg.task_id_ = task_id_; - arg.parallelism_ = parallelism_; - arg.execution_id_ = execution_id_; - arg.data_format_version_ = data_format_version_; - arg.tablet_task_id_ = tablet_task_ids_.at(i); - arg.consumer_group_id_ = consumer_group_id_; - if (OB_FAIL(location_service->get(tenant_id_, arg.source_tablet_id_, - expire_renew_time, is_cache_hit, ls_id))) { - LOG_WARN("get ls failed", K(ret), K(arg.source_tablet_id_)); - } else if (OB_FAIL(location_service->get_leader(GCONF.cluster_id, tenant_id_, ls_id, force_renew, leader_addr))) { - LOG_WARN("get leader failed", K(ret), K(tenant_id_), K(ls_id)); - } else if (FALSE_IT(arg.ls_id_ = ls_id)) { - } else if (OB_FAIL(proxy.call(leader_addr, rpc_timeout, tenant_id_, arg))) { - LOG_WARN("fail to send rpc", K(ret), K(rpc_timeout)); - } else if (OB_FAIL(idxs.push_back(i))) { - LOG_WARN("fail to push back idx", K(ret)); - } else { - LOG_INFO("send build single replica request", K(arg)); - build_info.stat_ = ObPartitionBuildStat::BUILD_INIT; + { + ObSpinLockGuard guard(lock_); + // send rpc request may cost too much time, thus set some status before rpc request under lock. + for (int64_t i = 0; OB_SUCC(ret) && i < build_infos.count(); i++) { + ObPartitionBuildInfo &build_info = build_infos.at(i); + if (ObPartitionBuildStat::BUILD_INIT == build_info.stat_ || build_info.need_schedule()) { + if (OB_FAIL(request_source_tablet_ids.push_back(source_tablet_ids_.at(i)))) { + LOG_WARN("push backed failed", K(ret)); + } else if (OB_FAIL(request_dest_tablet_ids.push_back(dest_tablet_ids_.at(i)))) { + LOG_WARN("push back failed", K(ret)); + } else if (OB_FAIL(request_tablet_task_ids.push_back(tablet_task_ids_.at(i)))) { + LOG_WARN("push back failed", K(ret)); + } else { + build_info.stat_ = ObPartitionBuildStat::BUILD_INIT; + } } } } + + for (int64_t i = 0; OB_SUCC(ret) && i < request_source_tablet_ids.count(); ++i) { + ObAddr leader_addr; + obrpc::ObDDLBuildSingleReplicaRequestArg arg; + arg.ls_id_ = share::ObLSID::INVALID_LS_ID; + arg.tenant_id_ = tenant_id_; + arg.source_tablet_id_ = request_source_tablet_ids.at(i); + arg.dest_tablet_id_ = request_dest_tablet_ids.at(i); + arg.source_table_id_ = source_table_id_; + arg.dest_schema_id_ = dest_table_id_; + arg.schema_version_ = schema_version_; + arg.snapshot_version_ = snapshot_version_; + arg.ddl_type_ = type_; + arg.task_id_ = task_id_; + arg.parallelism_ = parallelism_; + arg.execution_id_ = execution_id_; + arg.data_format_version_ = data_format_version_; + arg.tablet_task_id_ = request_tablet_task_ids.at(i); + arg.consumer_group_id_ = consumer_group_id_; + if (OB_FAIL(location_service->get(tenant_id_, arg.source_tablet_id_, + expire_renew_time, is_cache_hit, ls_id))) { + LOG_WARN("get ls failed", K(ret), K(arg.source_tablet_id_)); + } else if (OB_FAIL(location_service->get_leader(GCONF.cluster_id, tenant_id_, ls_id, force_renew, leader_addr))) { + LOG_WARN("get leader failed", K(ret), K(tenant_id_), K(ls_id)); + } else if (FALSE_IT(arg.ls_id_ = ls_id)) { + } else if (OB_FAIL(proxy.call(leader_addr, rpc_timeout, tenant_id_, arg))) { + LOG_WARN("fail to send rpc", K(ret), K(rpc_timeout)); + } else if (OB_FAIL(idxs.push_back(i))) { + LOG_WARN("fail to push back idx", K(ret)); + } else { + LOG_INFO("send build single replica request", K(arg)); + } + } } if (OB_SUCCESS != (tmp_ret = proxy.wait_all(ret_array))) { LOG_WARN("rpc_proxy wait failed", K(ret), K(tmp_ret)); diff --git a/src/storage/ddl/ob_ddl_merge_task.cpp b/src/storage/ddl/ob_ddl_merge_task.cpp index 320f8969c6..098e68ec83 100644 --- a/src/storage/ddl/ob_ddl_merge_task.cpp +++ b/src/storage/ddl/ob_ddl_merge_task.cpp @@ -732,6 +732,7 @@ int ObTabletDDLUtil::update_ddl_table_store(const ObTabletDDLParam &ddl_param, { int ret = OB_SUCCESS; if (OB_UNLIKELY(!ddl_param.is_valid() || !table_handle.is_valid())) { + ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(ddl_param), K(table_handle)); } else { ObLSService *ls_service = MTL(ObLSService *);