fix single replica reponse of drop column task wait lock too long.

This commit is contained in:
obdev
2023-05-12 13:41:14 +00:00
committed by ob-robot
parent 9c5f5cdb56
commit 5177ba45a4
2 changed files with 52 additions and 36 deletions

View File

@ -100,47 +100,62 @@ int ObDDLSingleReplicaExecutor::schedule_task()
const int64_t expire_renew_time = force_renew ? INT64_MAX : 0; const int64_t expire_renew_time = force_renew ? INT64_MAX : 0;
ObLSID ls_id; ObLSID ls_id;
common::ObArray<int> ret_array; common::ObArray<int> ret_array;
ObArray<common::ObTabletID> request_source_tablet_ids;
ObArray<common::ObTabletID> request_dest_tablet_ids;
ObArray<int64_t> request_tablet_task_ids;
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
{ {
ObSpinLockGuard guard(lock_); {
for (int64_t i = 0; OB_SUCC(ret) && i < build_infos.count(); ++i) { ObSpinLockGuard guard(lock_);
ObPartitionBuildInfo &build_info = build_infos.at(i); // send rpc request may cost too much time, thus set some status before rpc request under lock.
if (ObPartitionBuildStat::BUILD_INIT == build_info.stat_|| for (int64_t i = 0; OB_SUCC(ret) && i < build_infos.count(); i++) {
build_info.need_schedule()) { ObPartitionBuildInfo &build_info = build_infos.at(i);
// get leader of partition if (ObPartitionBuildStat::BUILD_INIT == build_info.stat_ || build_info.need_schedule()) {
ObAddr leader_addr; if (OB_FAIL(request_source_tablet_ids.push_back(source_tablet_ids_.at(i)))) {
obrpc::ObDDLBuildSingleReplicaRequestArg arg; LOG_WARN("push backed failed", K(ret));
arg.ls_id_ = share::ObLSID::INVALID_LS_ID; } else if (OB_FAIL(request_dest_tablet_ids.push_back(dest_tablet_ids_.at(i)))) {
arg.tenant_id_ = tenant_id_; LOG_WARN("push back failed", K(ret));
arg.source_tablet_id_ = source_tablet_ids_.at(i); } else if (OB_FAIL(request_tablet_task_ids.push_back(tablet_task_ids_.at(i)))) {
arg.dest_tablet_id_ = dest_tablet_ids_.at(i); LOG_WARN("push back failed", K(ret));
arg.source_table_id_ = source_table_id_; } else {
arg.dest_schema_id_ = dest_table_id_; build_info.stat_ = ObPartitionBuildStat::BUILD_INIT;
arg.schema_version_ = schema_version_; }
arg.snapshot_version_ = snapshot_version_;
arg.ddl_type_ = type_;
arg.task_id_ = task_id_;
arg.parallelism_ = parallelism_;
arg.execution_id_ = execution_id_;
arg.data_format_version_ = data_format_version_;
arg.tablet_task_id_ = tablet_task_ids_.at(i);
arg.consumer_group_id_ = consumer_group_id_;
if (OB_FAIL(location_service->get(tenant_id_, arg.source_tablet_id_,
expire_renew_time, is_cache_hit, ls_id))) {
LOG_WARN("get ls failed", K(ret), K(arg.source_tablet_id_));
} else if (OB_FAIL(location_service->get_leader(GCONF.cluster_id, tenant_id_, ls_id, force_renew, leader_addr))) {
LOG_WARN("get leader failed", K(ret), K(tenant_id_), K(ls_id));
} else if (FALSE_IT(arg.ls_id_ = ls_id)) {
} else if (OB_FAIL(proxy.call(leader_addr, rpc_timeout, tenant_id_, arg))) {
LOG_WARN("fail to send rpc", K(ret), K(rpc_timeout));
} else if (OB_FAIL(idxs.push_back(i))) {
LOG_WARN("fail to push back idx", K(ret));
} else {
LOG_INFO("send build single replica request", K(arg));
build_info.stat_ = ObPartitionBuildStat::BUILD_INIT;
} }
} }
} }
for (int64_t i = 0; OB_SUCC(ret) && i < request_source_tablet_ids.count(); ++i) {
ObAddr leader_addr;
obrpc::ObDDLBuildSingleReplicaRequestArg arg;
arg.ls_id_ = share::ObLSID::INVALID_LS_ID;
arg.tenant_id_ = tenant_id_;
arg.source_tablet_id_ = request_source_tablet_ids.at(i);
arg.dest_tablet_id_ = request_dest_tablet_ids.at(i);
arg.source_table_id_ = source_table_id_;
arg.dest_schema_id_ = dest_table_id_;
arg.schema_version_ = schema_version_;
arg.snapshot_version_ = snapshot_version_;
arg.ddl_type_ = type_;
arg.task_id_ = task_id_;
arg.parallelism_ = parallelism_;
arg.execution_id_ = execution_id_;
arg.data_format_version_ = data_format_version_;
arg.tablet_task_id_ = request_tablet_task_ids.at(i);
arg.consumer_group_id_ = consumer_group_id_;
if (OB_FAIL(location_service->get(tenant_id_, arg.source_tablet_id_,
expire_renew_time, is_cache_hit, ls_id))) {
LOG_WARN("get ls failed", K(ret), K(arg.source_tablet_id_));
} else if (OB_FAIL(location_service->get_leader(GCONF.cluster_id, tenant_id_, ls_id, force_renew, leader_addr))) {
LOG_WARN("get leader failed", K(ret), K(tenant_id_), K(ls_id));
} else if (FALSE_IT(arg.ls_id_ = ls_id)) {
} else if (OB_FAIL(proxy.call(leader_addr, rpc_timeout, tenant_id_, arg))) {
LOG_WARN("fail to send rpc", K(ret), K(rpc_timeout));
} else if (OB_FAIL(idxs.push_back(i))) {
LOG_WARN("fail to push back idx", K(ret));
} else {
LOG_INFO("send build single replica request", K(arg));
}
}
} }
if (OB_SUCCESS != (tmp_ret = proxy.wait_all(ret_array))) { if (OB_SUCCESS != (tmp_ret = proxy.wait_all(ret_array))) {
LOG_WARN("rpc_proxy wait failed", K(ret), K(tmp_ret)); LOG_WARN("rpc_proxy wait failed", K(ret), K(tmp_ret));

View File

@ -732,6 +732,7 @@ int ObTabletDDLUtil::update_ddl_table_store(const ObTabletDDLParam &ddl_param,
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_UNLIKELY(!ddl_param.is_valid() || !table_handle.is_valid())) { if (OB_UNLIKELY(!ddl_param.is_valid() || !table_handle.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(ddl_param), K(table_handle)); LOG_WARN("invalid argument", K(ret), K(ddl_param), K(table_handle));
} else { } else {
ObLSService *ls_service = MTL(ObLSService *); ObLSService *ls_service = MTL(ObLSService *);