diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index 4b6be524f..3fd1f8dec 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -439,7 +439,7 @@ int ObCalcColumnChecksumRequestP::process() ret = OB_INVALID_ARGUMENT; LOG_ERROR("invalid arguments", K(ret), KP(gctx_.ob_service_)); } else { - ret = gctx_.ob_service_->calc_column_checksum_request(arg_); + ret = gctx_.ob_service_->calc_column_checksum_request(arg_, result_); } return ret; } diff --git a/src/observer/ob_service.cpp b/src/observer/ob_service.cpp index 0ce6bf706..9ea464bfb 100644 --- a/src/observer/ob_service.cpp +++ b/src/observer/ob_service.cpp @@ -518,7 +518,7 @@ int ObService::get_min_sstable_schema_version( return ret; } -int ObService::calc_column_checksum_request(const obrpc::ObCalcColumnChecksumRequestArg &arg) +int ObService::calc_column_checksum_request(const obrpc::ObCalcColumnChecksumRequestArg &arg, obrpc::ObCalcColumnChecksumRequestRes &res) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!inited_)) { @@ -532,30 +532,56 @@ int ObService::calc_column_checksum_request(const obrpc::ObCalcColumnChecksumReq const uint64_t tenant_id = arg.tenant_id_; MTL_SWITCH(tenant_id) { ObGlobalUniqueIndexCallback *callback = NULL; - ObUniqueCheckingDag *dag = NULL; ObTenantDagScheduler* dag_scheduler = nullptr; if (OB_ISNULL(dag_scheduler = MTL(ObTenantDagScheduler *))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("error unexpected, dag scheduler must not be nullptr", KR(ret)); - } else if (OB_FAIL(dag_scheduler->alloc_dag(dag))) { - STORAGE_LOG(WARN, "fail to alloc dag", KR(ret)); - } else if (OB_FAIL(dag->init(arg.tenant_id_, arg.ls_id_, arg.tablet_id_, arg.calc_table_id_ == arg.target_table_id_, arg.target_table_id_, arg.schema_version_, arg.task_id_, arg.execution_id_, arg.snapshot_version_))) { - STORAGE_LOG(WARN, "fail to init ObUniqueCheckingDag", KR(ret)); - } else if (OB_FAIL(dag->alloc_global_index_task_callback(arg.tablet_id_, arg.target_table_id_, arg.source_table_id_, arg.schema_version_, arg.task_id_, callback))) { - STORAGE_LOG(WARN, "fail to alloc global index task callback", KR(ret)); - } else if (OB_FAIL(dag->alloc_unique_checking_prepare_task(callback))) { - STORAGE_LOG(WARN, "fail to alloc unique checking prepare task", KR(ret)); - } else if (OB_FAIL(dag_scheduler->add_dag(dag))) { - if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) { - STORAGE_LOG(WARN, "fail to add dag to queue", KR(ret)); - } else { - ret = OB_EAGAIN; + } else if (OB_FAIL(res.ret_codes_.reserve(arg.calc_items_.count()))) { + LOG_WARN("reserve return code array failed", K(ret), K(arg.calc_items_.count())); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < arg.calc_items_.count(); ++i) { + const ObCalcColumnChecksumRequestArg::SingleItem &calc_item = arg.calc_items_.at(i); + ObUniqueCheckingDag *dag = NULL; + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(dag_scheduler->alloc_dag(dag))) { + STORAGE_LOG(WARN, "fail to alloc dag", KR(tmp_ret)); + } else if (OB_TMP_FAIL(dag->init(arg.tenant_id_, + calc_item.ls_id_, + calc_item.tablet_id_, + calc_item.calc_table_id_ == arg.target_table_id_, + arg.target_table_id_, + arg.schema_version_, + arg.task_id_, + arg.execution_id_, + arg.snapshot_version_))) { + STORAGE_LOG(WARN, "fail to init ObUniqueCheckingDag", KR(tmp_ret)); + } else if (OB_TMP_FAIL(dag->alloc_global_index_task_callback(calc_item.tablet_id_, + arg.target_table_id_, + arg.source_table_id_, + arg.schema_version_, + arg.task_id_, + callback))) { + STORAGE_LOG(WARN, "fail to alloc global index task callback", KR(tmp_ret)); + } else if (OB_TMP_FAIL(dag->alloc_unique_checking_prepare_task(callback))) { + STORAGE_LOG(WARN, "fail to alloc unique checking prepare task", KR(tmp_ret)); + } else if (OB_TMP_FAIL(dag_scheduler->add_dag(dag))) { + if (OB_EAGAIN != tmp_ret && OB_SIZE_OVERFLOW != tmp_ret) { + STORAGE_LOG(WARN, "fail to add dag to queue", KR(tmp_ret)); + } else { + tmp_ret = OB_EAGAIN; + } + } + if (OB_SUCCESS != tmp_ret && NULL != dag) { + dag_scheduler->free_dag(*dag); + dag = NULL; + } + if (OB_SUCC(ret)) { + if (OB_FAIL(res.ret_codes_.push_back(tmp_ret))) { + LOG_WARN("push back return code failed", K(ret), K(tmp_ret)); + } + } } } - if (OB_FAIL(ret) && NULL != dag) { - dag_scheduler->free_dag(*dag); - dag = NULL; - } } LOG_INFO("receive column checksum request", K(arg)); } @@ -1010,18 +1036,34 @@ int ObService::check_modify_time_elapsed( ObLSHandle ls_handle; transaction::ObTransService *txs = MTL(transaction::ObTransService *); ObLSService *ls_service = MTL(ObLSService *); - if (OB_FAIL(ls_service->get_ls(ObLSID(arg.ls_id_), ls_handle, ObLSGetMod::OBSERVER_MOD))) { - LOG_WARN("get ls failed", K(ret), K(arg.ls_id_)); - } else if (OB_FAIL(ls_handle.get_ls()->check_modify_time_elapsed(arg.tablet_id_, - arg.sstable_exist_ts_, - result.pending_tx_id_))) { - if (OB_EAGAIN != ret) { - LOG_WARN("check schema version elapsed failed", K(ret), K(arg)); + if (OB_FAIL(result.results_.reserve(arg.tablets_.count()))) { + LOG_WARN("reserve result array failed", K(ret), K(arg.tablets_.count())); + } + + for (int64_t i = 0; OB_SUCC(ret) && i < arg.tablets_.count(); ++i) { + ObTabletHandle tablet_handle; + ObLSHandle ls_handle; + const ObLSID &ls_id = arg.tablets_.at(i).ls_id_; + const ObTabletID &tablet_id = arg.tablets_.at(i).tablet_id_; + ObCheckTransElapsedResult single_result; + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::OBSERVER_MOD))) { + LOG_WARN("get ls failed", K(tmp_ret), K(ls_id)); + } else if (OB_TMP_FAIL(ls_handle.get_ls()->check_modify_time_elapsed(tablet_id, + arg.sstable_exist_ts_, + single_result.pending_tx_id_))) { + if (OB_EAGAIN != tmp_ret) { + LOG_WARN("check schema version elapsed failed", K(tmp_ret), K(arg)); + } + } else if (OB_TMP_FAIL(txs->get_max_commit_version(single_result.snapshot_))) { + LOG_WARN("fail to get max commit version", K(tmp_ret)); + } + if (OB_SUCC(ret)) { + single_result.ret_code_ = tmp_ret; + if (OB_FAIL(result.results_.push_back(single_result))) { + LOG_WARN("push back single result failed", K(ret), K(i), K(single_result)); + } } - } else if (OB_FAIL(txs->get_max_commit_version(result.snapshot_))) { - LOG_WARN("fail to get max commit version", K(ret)); - } else { - LOG_INFO("succeed to wait transaction end", K(arg)); } } } @@ -1042,21 +1084,36 @@ int ObService::check_schema_version_elapsed( LOG_WARN("invalid argument", K(ret), K(arg)); } else { MTL_SWITCH(arg.tenant_id_) { - ObTabletHandle tablet_handle; - ObLSHandle ls_handle; ObLSService *ls_service = nullptr; if (OB_ISNULL(ls_service = MTL(ObLSService *))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("error unexpected, get ls service failed", K(ret)); - } else if (OB_FAIL(ls_service->get_ls(ObLSID(arg.ls_id_), ls_handle, ObLSGetMod::OBSERVER_MOD))) { - LOG_WARN("get ls failed", K(ret), K(arg.ls_id_)); - } else if (OB_FAIL(ls_handle.get_ls()->get_tablet(arg.data_tablet_id_, tablet_handle))) { - LOG_WARN("fail to get tablet", K(ret), K(arg)); - } else if (OB_FAIL(tablet_handle.get_obj()->check_schema_version_elapsed(arg.schema_version_, - arg.need_wait_trans_end_, - result.snapshot_, - result.pending_tx_id_))) { - LOG_WARN("check schema version elapsed failed", K(ret), K(arg)); + } else if (OB_FAIL(result.results_.reserve(arg.tablets_.count()))) { + LOG_WARN("reserve result array failed", K(ret), K(arg.tablets_.count())); + } + for (int64_t i = 0; OB_SUCC(ret) && i < arg.tablets_.count(); ++i) { + ObTabletHandle tablet_handle; + ObLSHandle ls_handle; + const ObLSID &ls_id = arg.tablets_.at(i).ls_id_; + const ObTabletID &tablet_id = arg.tablets_.at(i).tablet_id_; + ObCheckTransElapsedResult single_result; + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::OBSERVER_MOD))) { + LOG_WARN("get ls failed", K(tmp_ret), K(i), K(ls_id)); + } else if (OB_TMP_FAIL(ls_handle.get_ls()->get_tablet(tablet_id, tablet_handle))) { + LOG_WARN("fail to get tablet", K(tmp_ret), K(i), K(ls_id), K(tablet_id)); + } else if (OB_TMP_FAIL(tablet_handle.get_obj()->check_schema_version_elapsed(arg.schema_version_, + arg.need_wait_trans_end_, + single_result.snapshot_, + single_result.pending_tx_id_))) { + LOG_WARN("check schema version elapsed failed", K(tmp_ret), K(arg), K(ls_id), K(tablet_id)); + } + if (OB_SUCC(ret)) { + single_result.ret_code_ = tmp_ret; + if (OB_FAIL(result.results_.push_back(single_result))) { + LOG_WARN("push back single result failed", K(ret), K(i), K(single_result)); + } + } } } } diff --git a/src/observer/ob_service.h b/src/observer/ob_service.h index 36d50c0e0..358fb77d8 100644 --- a/src/observer/ob_service.h +++ b/src/observer/ob_service.h @@ -126,7 +126,7 @@ public: obrpc::ObGetMinSSTableSchemaVersionRes &result); // ObRpcSwitchSchemaP @RS DDL int switch_schema(const obrpc::ObSwitchSchemaArg &arg, obrpc::ObSwitchSchemaResult &result); - int calc_column_checksum_request(const obrpc::ObCalcColumnChecksumRequestArg &arg); + int calc_column_checksum_request(const obrpc::ObCalcColumnChecksumRequestArg &arg, obrpc::ObCalcColumnChecksumRequestRes &res); int build_ddl_single_replica_request(const obrpc::ObDDLBuildSingleReplicaRequestArg &arg); int write_ddl_sstable_commit_log(const obrpc::ObDDLWriteSSTableCommitLogArg &arg); int stop_partition_write(const obrpc::Int64 &switchover_timestamp, obrpc::Int64 &result); diff --git a/src/rootserver/ddl_task/ob_ddl_task.cpp b/src/rootserver/ddl_task/ob_ddl_task.cpp index b7961c919..8dc5ed151 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_task.cpp @@ -336,8 +336,8 @@ int ObDDLTask::switch_status(ObDDLTaskStatus new_status, const int ret_code) ret = (OB_SUCCESS == ret) ? tmp_ret : ret; } if (OB_SUCC(ret) && old_status != real_new_status) { - ROOTSERVICE_EVENT_ADD("ddl_scheduler", "switch_state", K_(tenant_id), K_(object_id), K_(target_object_id), - "pre_state", old_status, "new_state", real_new_status, K_(snapshot_version)); + ROOTSERVICE_EVENT_ADD("ddl_scheduler", "switch_state", K_(tenant_id), K_(task_id), K_(object_id), K_(target_object_id), + "new_state", real_new_status, K_(snapshot_version), ret_code_); task_status_ = real_new_status; } } @@ -623,6 +623,136 @@ void ObDDLWaitTransEndCtx::reset() snapshot_array_.reset(); } +struct SendItem final +{ +public: + bool operator < (const SendItem &other) const { return leader_addr_ < other.leader_addr_; } + TO_STRING_KV(K_(leader_addr), K_(ls_id), K_(tablet_id), KP_(other_info)); +public: + ObAddr leader_addr_; + ObLSID ls_id_; + ObTabletID tablet_id_; + void *other_info_; +}; + +int group_tablets_leader_addr(const uint64_t tenant_id, const ObIArray &tablet_ids, ObLocationService *location_service, ObArray &group_items) +{ + int ret = OB_SUCCESS; + group_items.reuse(); + if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || nullptr == location_service)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(tenant_id), K(tablet_ids.count())); + } else { + const int64_t rpc_timeout = max(GCONF.rpc_timeout, 1000L * 1000L * 9L); + if (OB_FAIL(group_items.reserve(tablet_ids.count()))) { + LOG_WARN("reserve send array failed", K(ret), K(tablet_ids.count())); + } + for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); ++i) { + const ObTabletID &tablet_id = tablet_ids.at(i); + SendItem item; + if (OB_FAIL(ObDDLUtil::get_tablet_leader_addr(location_service, + tenant_id, + tablet_id, + rpc_timeout, + item.ls_id_, + item.leader_addr_))) { + LOG_WARN("get tablet leader addr failed", K(ret)); + } else if (FALSE_IT(item.tablet_id_ = tablet_id)) { + } else if (OB_FAIL(group_items.push_back(item))) { + LOG_WARN("push back send item failed", K(ret), K(item)); + } + } + } + return ret; +} + +template +int check_trans_end(ObArray &send_array, + Proxy &proxy, + Arg &arg, + Res *res, + ObIArray &ret_array, + ObIArray &snapshot_array) +{ + int ret = OB_SUCCESS; + ret_array.reuse(); + snapshot_array.reuse(); + if (OB_UNLIKELY(send_array.empty())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret)); + } else { + // group by leader addr and send batch rpc + std::sort(send_array.begin(), send_array.end()); + + const int64_t rpc_timeout = max(GCONF.rpc_timeout, 1000L * 1000L * 9L); + ObAddr last_addr; + for (int64_t i = 0; OB_SUCC(ret) && i < send_array.count(); ++i) { + const SendItem &send_item = send_array.at(i); + if (send_item.leader_addr_ != last_addr) { + if (arg.tablets_.count() > 0) { + if (OB_FAIL(proxy.call(last_addr, rpc_timeout, arg.tenant_id_, arg))) { + LOG_WARN("send rpc failed", K(ret), K(arg), K(last_addr), K(arg.tenant_id_)); + } + } + if (OB_SUCC(ret)) { + arg.tablets_.reuse(); + last_addr = send_item.leader_addr_; + } + } + if (OB_SUCC(ret)) { + ObLSTabletPair ls_tablet_pair; + ls_tablet_pair.ls_id_ = send_item.ls_id_; + ls_tablet_pair.tablet_id_ = send_item.tablet_id_; + if (OB_FAIL(arg.tablets_.push_back(ls_tablet_pair))) { + LOG_WARN("push back send item failed", K(ret), K(i), K(send_item)); + } + } + } + if (OB_SUCC(ret) && arg.tablets_.count() > 0) { + if (OB_FAIL(proxy.call(last_addr, rpc_timeout, arg.tenant_id_, arg))) { + LOG_WARN("send rpc failed", K(ret), K(arg), K(last_addr), K(arg.tenant_id_)); + } + } + + // collect result + int tmp_ret = OB_SUCCESS; + common::ObArray tmp_ret_array; + if (OB_SUCCESS != (tmp_ret = proxy.wait_all(tmp_ret_array))) { + LOG_WARN("rpc proxy wait failed", K(tmp_ret)); + ret = OB_SUCCESS == ret ? tmp_ret : ret; + } else if (OB_SUCC(ret)) { + const ObIArray &result_array = proxy.get_results(); + const ObIArray &arg_array = proxy.get_args(); + const ObIArray &dest_array = proxy.get_dests(); + for (int64_t i = 0; OB_SUCC(ret) && i < result_array.count(); ++i) { + const Res *cur_result = result_array.at(i); + const Arg &cur_arg = arg_array.at(i); + const ObAddr &cur_dest_addr = dest_array.at(i); + if (OB_ISNULL(cur_result)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("result it null", K(ret), K(i), KP(cur_result)); + } else if (OB_FAIL(tmp_ret_array.at(i))) { + LOG_WARN("check shema trans elapsed failed", K(ret), K(i), K(cur_dest_addr), K(cur_arg), KPC(cur_result)); + } else if (cur_arg.tablets_.count() != cur_result->results_.count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("the result count does not match the argument", K(ret), K(cur_arg), KPC(cur_result)); + } else { + for (int64_t j = 0; OB_SUCC(ret) && j < cur_result->results_.count(); ++j) { + const obrpc::ObLSTabletPair &send_item = cur_arg.tablets_.at(j); + const obrpc::ObCheckTransElapsedResult &result_item = cur_result->results_.at(j); + if (OB_FAIL(ret_array.push_back(result_item.ret_code_))) { + LOG_WARN("push back ret code failed", K(ret), K(i), K(j), K(send_item), K(result_item)); + } else if (OB_FAIL(snapshot_array.push_back(result_item.snapshot_))) { + LOG_WARN("push back snapshot failed", K(ret), K(i), K(j), K(send_item), K(result_item)); + } + } + } + } + } + } + return ret; +} + int ObDDLWaitTransEndCtx::check_schema_trans_end( const int64_t schema_version, const common::ObIArray &tablet_ids, @@ -636,52 +766,22 @@ int ObDDLWaitTransEndCtx::check_schema_trans_end( int ret = OB_SUCCESS; ret_array.reset(); snapshot_array.reset(); + ObArray send_array; if (OB_UNLIKELY(schema_version <= 0 || tablet_ids.count() <= 0 || OB_INVALID_ID == tenant_id || nullptr == rpc_proxy || nullptr == location_service)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(schema_version), K(tablet_ids.count()), K(tenant_id), KP(rpc_proxy), KP(location_service)); + } else if (OB_FAIL(group_tablets_leader_addr(tenant_id, tablet_ids, location_service, send_array))) { + LOG_WARN("group tablet by leader addr failed", K(ret), K(tenant_id), K(tablet_ids.count())); } else { - ObCheckSchemaVersionElapsedProxy proxy(*rpc_proxy, - &obrpc::ObSrvRpcProxy::check_schema_version_elapsed); + ObCheckSchemaVersionElapsedProxy proxy(*rpc_proxy, &obrpc::ObSrvRpcProxy::check_schema_version_elapsed); obrpc::ObCheckSchemaVersionElapsedArg arg; - const int64_t rpc_timeout = max(GCONF.rpc_timeout, 1000L * 1000L * 9L); - for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); ++i) { - const ObTabletID &tablet_id = tablet_ids.at(i); - arg.tenant_id_ = tenant_id; - arg.data_tablet_id_ = tablet_id; - arg.schema_version_ = schema_version; - arg.need_wait_trans_end_ = need_wait_trans_end; - ObAddr leader_addr; - if (OB_FAIL(ObDDLUtil::get_tablet_leader_addr(location_service, tenant_id, tablet_id, rpc_timeout, arg.ls_id_, leader_addr))) { - LOG_WARN("get tablet leader addr failed", K(ret)); - } else if (OB_FAIL(proxy.call(leader_addr, rpc_timeout, tenant_id, arg))) { - LOG_WARN("send rpc failed", K(ret), K(arg), K(leader_addr), K(tenant_id)); - } - } - int tmp_ret = OB_SUCCESS; - common::ObArray tmp_ret_array; - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(tmp_ret_array))) { - LOG_WARN("rpc proxy wait failed", K(tmp_ret)); - ret = OB_SUCCESS == ret ? tmp_ret : ret; - } else if (OB_SUCC(ret)) { - const ObIArray &result_array = proxy.get_results(); - if (tmp_ret_array.count() != tablet_ids.count() || result_array.count() != tablet_ids.count()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("result count not match", K(ret), K(tablet_ids.count()), K(tmp_ret_array.count()), K(result_array.count())); - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < result_array.count(); ++i) { - const ObTabletID &tablet_id = tablet_ids.at(i); - const obrpc::ObCheckSchemaVersionElapsedResult *cur_result = result_array.at(i); - if (OB_ISNULL(cur_result)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("result it null", K(ret), K(tablet_id), K(i), KP(cur_result)); - } else if (OB_FAIL(ret_array.push_back(tmp_ret_array.at(i)))) { - LOG_WARN("push back ret code failed", K(ret), K(tablet_id), K(tmp_ret_array.at(i))); - } else if (OB_FAIL(snapshot_array.push_back(cur_result->snapshot_))) { - LOG_WARN("push back snapshot failed", K(ret), K(tablet_id), K(*cur_result)); - } - } - } + obrpc::ObCheckSchemaVersionElapsedResult *res = nullptr; + arg.tenant_id_ = tenant_id; + arg.schema_version_ = schema_version; + arg.need_wait_trans_end_ = need_wait_trans_end; + if (OB_FAIL(check_trans_end(send_array, proxy, arg, res, ret_array, snapshot_array))) { + LOG_WARN("check trans end failed", K(ret)); } } return ret; @@ -699,52 +799,22 @@ int ObDDLWaitTransEndCtx::check_sstable_trans_end( int ret = OB_SUCCESS; ret_array.reset(); snapshot_array.reset(); + ObArray send_array; if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || sstable_exist_ts <= 0 || tablet_ids.count() <= 0 || nullptr == rpc_proxy || nullptr == location_service)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(tenant_id), K(sstable_exist_ts), K(tablet_ids.count()), KP(rpc_proxy), KP(location_service)); + } else if (OB_FAIL(group_tablets_leader_addr(tenant_id, tablet_ids, location_service, send_array))) { + LOG_WARN("group tablet by leader addr failed", K(ret), K(tenant_id), K(tablet_ids.count())); } else { - ObCheckCtxCreateTimestampElapsedProxy proxy(*rpc_proxy, - &obrpc::ObSrvRpcProxy::check_modify_time_elapsed); + ObCheckCtxCreateTimestampElapsedProxy proxy(*rpc_proxy, &obrpc::ObSrvRpcProxy::check_modify_time_elapsed); obrpc::ObCheckModifyTimeElapsedArg arg; - int64_t rpc_timeout = max(GCONF.rpc_timeout, 1000L * 1000L * 9L); - for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); ++i) { - const ObTabletID &tablet_id = tablet_ids.at(i); - arg.tenant_id_ = tenant_id; - arg.tablet_id_ = tablet_id; - arg.sstable_exist_ts_ = sstable_exist_ts; - ObAddr leader_addr; - if (OB_FAIL(ObDDLUtil::get_tablet_leader_addr(location_service, tenant_id, tablet_id, rpc_timeout, arg.ls_id_, leader_addr))) { - LOG_WARN("get tablet leader addr failed", K(ret)); - } else if (OB_FAIL(proxy.call(leader_addr, rpc_timeout, tenant_id, arg))) { - LOG_WARN("send rpc failed", K(ret), K(arg), K(leader_addr), K(tenant_id)); - } - } - int tmp_ret = OB_SUCCESS; - common::ObArray tmp_ret_array; - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(tmp_ret_array))) { - LOG_WARN("rpc proxy wait failed", K(tmp_ret)); - ret = OB_SUCCESS == ret ? tmp_ret : ret; - } else if (OB_SUCC(ret)) { - const auto &result_array = proxy.get_results(); - if (tmp_ret_array.count() != tablet_ids.count() || result_array.count() != tablet_ids.count()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("result count not match", K(ret), K(tablet_ids.count()), K(tmp_ret_array.count()), K(result_array.count())); - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < result_array.count(); ++i) { - const ObTabletID &tablet_id = tablet_ids.at(i); - const auto *cur_result = result_array.at(i); - if (OB_ISNULL(cur_result)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("result it null", K(ret), K(tablet_id), K(i), KP(cur_result)); - } else if (OB_FAIL(ret_array.push_back(tmp_ret_array.at(i)))) { - LOG_WARN("push back ret code failed", K(ret), K(tablet_id), K(tmp_ret_array.at(i))); - } else if (OB_FAIL(snapshot_array.push_back(cur_result->snapshot_))) { - LOG_WARN("push back snapshot failed", K(ret), K(tablet_id), K(*cur_result)); - } - } - } + obrpc::ObCheckModifyTimeElapsedResult *res = nullptr; + arg.tenant_id_ = tenant_id; + arg.sstable_exist_ts_ = sstable_exist_ts; + if (OB_FAIL(check_trans_end(send_array, proxy, arg, res, ret_array, snapshot_array))) { + LOG_WARN("check trans end failed", K(ret)); } } return ret; @@ -765,6 +835,8 @@ int ObDDLWaitTransEndCtx::try_wait(bool &is_trans_end, int64_t &snapshot_version ObArray tablet_pos_indexes; if (OB_FAIL(get_snapshot_check_list(need_check_tablets, tablet_pos_indexes))) { LOG_WARN("get snapshot check list failed", K(ret)); + } else if (need_check_tablets.empty()) { + is_trans_end_ = true; } else { const int64_t check_count = need_check_tablets.count(); ObArray ret_codes; @@ -1121,6 +1193,48 @@ int ObDDLWaitColumnChecksumCtx::refresh_zombie_task() return ret; } +int send_batch_calc_rpc(obrpc::ObSrvRpcProxy &rpc_proxy, + const ObAddr &leader_addr, + const ObCalcColumnChecksumRequestArg &arg, + ObCalcColumnChecksumRequestRes &res, + ObIArray &send_array, + const int64_t group_start_idx, + const int64_t group_end_idx, + common::SpinRWLock &item_lock, + int64_t &send_succ_count) +{ + int ret = OB_SUCCESS; + const int64_t rpc_timeout = max(GCONF.rpc_timeout, 1000L * 1000L * 9L); + if (OB_FAIL(rpc_proxy.to(leader_addr) + .by(arg.tenant_id_) + .timeout(rpc_timeout) + .calc_column_checksum_request(arg, res))) { + LOG_WARN("send rpc failed", K(ret), K(arg), K(leader_addr), K(arg.tenant_id_)); + } else if (res.ret_codes_.count() != arg.calc_items_.count() || res.ret_codes_.count() != (group_end_idx - group_start_idx)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("return codes count not match the argument", K(ret), K(arg.calc_items_.count()), + K(res.ret_codes_.count()), "group_count", group_end_idx - group_start_idx); + } else { + LOG_INFO("send checksum validation task", K(arg)); + SpinWLockGuard guard(item_lock); + for (int64_t j = group_start_idx, k = 0; j < group_end_idx; ++j, ++k) { // ignore ret + PartitionColChecksumStat *item = reinterpret_cast(send_array.at(j).other_info_); + int ret_code = res.ret_codes_.at(k); + if (OB_SUCCESS == ret_code) { + item->snapshot_ = arg.snapshot_version_; + item->col_checksum_stat_ = CCS_INVALID; + ++send_succ_count; + } else if (OB_EAGAIN == ret_code || OB_HASH_EXIST == ret_code) { // ignore + LOG_INFO("send checksum rpc not success", K(ret), KPC(item)); + } else { + ret = OB_SUCCESS == ret ? ret_code : ret; // keep first error code + LOG_WARN("fail to calc column checksum request", K(ret_code), K(arg), KPC(item)); + } + } + } + return ret; +} + int ObDDLWaitColumnChecksumCtx::send_calc_rpc(int64_t &send_succ_count) { int ret = OB_SUCCESS; @@ -1139,6 +1253,7 @@ int ObDDLWaitColumnChecksumCtx::send_calc_rpc(int64_t &send_succ_count) } else { ObLSID ls_id; const int64_t rpc_timeout = max(GCONF.rpc_timeout, 1000L * 1000L * 9L); + ObArray send_array; for (int64_t i = 0; OB_SUCC(ret) && i < stat_array_.count(); ++i) { PartitionColChecksumStat &item = stat_array_.at(i); ObAddr leader_addr; @@ -1150,34 +1265,64 @@ int ObDDLWaitColumnChecksumCtx::send_calc_rpc(int64_t &send_succ_count) if (OB_FAIL(ObDDLUtil::get_tablet_leader_addr(location_service, tenant_id_, item.tablet_id_, rpc_timeout, ls_id, leader_addr))) { LOG_WARN("get tablet leader addr failed", K(ret)); } else { - ObCalcColumnChecksumRequestArg arg; - arg.tenant_id_ = tenant_id_; - arg.ls_id_ = ls_id; - arg.tablet_id_ = item.tablet_id_; - arg.target_table_id_ = target_table_id_; - arg.schema_version_ = schema_version_; - arg.execution_id_ = item.execution_id_; - arg.snapshot_version_ = snapshot_version_; - arg.source_table_id_ = source_table_id_; - arg.calc_table_id_ = item.table_id_; - arg.task_id_ = task_id_; - if (OB_FAIL(root_service->get_rpc_proxy().to(leader_addr).by(tenant_id_).timeout(rpc_timeout).calc_column_checksum_request(arg))) { - if (OB_EAGAIN == ret || OB_HASH_EXIST == ret) { // ignore - LOG_INFO("send checksum rpc not success", K(ret), K(arg)); - ret = OB_SUCCESS; - } else { - LOG_WARN("fail to calc column checksum request", K(ret), K(arg)); + SendItem send_item; + send_item.leader_addr_ = leader_addr; + send_item.ls_id_ = ls_id; + send_item.tablet_id_ = item.tablet_id_; + send_item.other_info_ = reinterpret_cast(&item); + if (OB_FAIL(send_array.push_back(send_item))) { + LOG_WARN("push send array failed", K(ret)); + } + } + } + } + + if (OB_SUCC(ret)) { + // group by leader addr and send batch rpc + std::sort(send_array.begin(), send_array.end()); + + ObAddr last_addr; + int64_t group_start_idx = 0; + ObCalcColumnChecksumRequestArg arg; + ObCalcColumnChecksumRequestRes res; + arg.tenant_id_ = tenant_id_; + arg.task_id_ = task_id_; + arg.source_table_id_ = source_table_id_; + arg.target_table_id_ = target_table_id_; + arg.schema_version_ = schema_version_; + arg.execution_id_ = execution_id_; + arg.snapshot_version_ = snapshot_version_; + for (int64_t i = 0; OB_SUCC(ret) && i < send_array.count(); ++i) { + const SendItem &send_item = send_array.at(i); + if (send_item.leader_addr_ != last_addr) { + if (arg.calc_items_.count() > 0) { + if (OB_FAIL(send_batch_calc_rpc(root_service->get_rpc_proxy(), last_addr, + arg, res, send_array, group_start_idx, i, lock_, send_succ_count))) { + LOG_WARN("send batch calc rpc failed", K(ret)); } - } else { - LOG_INFO("send checksum validation task", K(arg)); } if (OB_SUCC(ret)) { - SpinWLockGuard guard(lock_); - item.snapshot_ = snapshot_version_; - item.col_checksum_stat_ = CCS_INVALID; - ++send_succ_count; + arg.calc_items_.reuse(); + res.ret_codes_.reuse(); + last_addr = send_item.leader_addr_; + group_start_idx = i; } } + if (OB_SUCC(ret)) { + ObCalcColumnChecksumRequestArg::SingleItem calc_item; + calc_item.ls_id_ = send_item.ls_id_; + calc_item.tablet_id_ = send_item.tablet_id_; + calc_item.calc_table_id_ = reinterpret_cast(send_item.other_info_)->table_id_; + if (OB_FAIL(arg.calc_items_.push_back(calc_item))) { + LOG_WARN("push back send item failed", K(ret), K(i), K(send_item)); + } + } + } + if (OB_SUCC(ret) && arg.calc_items_.count() > 0) { + if (OB_FAIL(send_batch_calc_rpc(root_service->get_rpc_proxy(), last_addr, + arg, res, send_array, group_start_idx, send_array.count(), lock_, send_succ_count))) { + LOG_WARN("send batch calc rpc failed", K(ret)); + } } } } diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index 520f055f4..afa1c4b3b 100644 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -2629,41 +2629,81 @@ DEF_TO_STRING(ObRemoveTabletRes) OB_SERIALIZE_MEMBER(ObRemoveTabletRes, ret_); +OB_SERIALIZE_MEMBER(ObCalcColumnChecksumRequestArg::SingleItem, ls_id_, tablet_id_, calc_table_id_); + +bool ObCalcColumnChecksumRequestArg::SingleItem::is_valid() const +{ + return ls_id_.is_valid() && tablet_id_.is_valid() && OB_INVALID_ID != calc_table_id_; +} + +void ObCalcColumnChecksumRequestArg::SingleItem::reset() +{ + ls_id_.reset(); + tablet_id_.reset(); + calc_table_id_ = OB_INVALID_ID; +} + +int ObCalcColumnChecksumRequestArg::SingleItem::assign(const SingleItem &other) +{ + int ret = OB_SUCCESS; + ls_id_ = other.ls_id_; + tablet_id_ = other.tablet_id_; + calc_table_id_ = other.calc_table_id_; + return ret; +} + OB_SERIALIZE_MEMBER( ObCalcColumnChecksumRequestArg, tenant_id_, - ls_id_, - tablet_id_, target_table_id_, schema_version_, execution_id_, snapshot_version_, source_table_id_, - calc_table_id_, - task_id_); + task_id_, + calc_items_); bool ObCalcColumnChecksumRequestArg::is_valid() const { - return OB_INVALID_ID != tenant_id_ && ls_id_.is_valid() && tablet_id_.is_valid() - && OB_INVALID_ID != target_table_id_ && OB_INVALID_VERSION != schema_version_ - && OB_INVALID_ID != execution_id_ && OB_INVALID_VERSION != snapshot_version_ && OB_INVALID_ID != source_table_id_ - && OB_INVALID_ID != calc_table_id_ && task_id_ > 0; + bool bret = OB_INVALID_ID != tenant_id_ && OB_INVALID_ID != target_table_id_ + && OB_INVALID_VERSION != schema_version_ && OB_INVALID_ID != execution_id_ + && OB_INVALID_VERSION != snapshot_version_ && OB_INVALID_ID != source_table_id_ && task_id_ > 0; + for (int64_t i = 0; bret && i < calc_items_.count(); ++i) { + bret = calc_items_.at(i).is_valid(); + } + return bret; } void ObCalcColumnChecksumRequestArg::reset() { tenant_id_ = OB_INVALID_ID; - ls_id_.reset(); - tablet_id_.reset(); target_table_id_ = OB_INVALID_ID; schema_version_ = OB_INVALID_VERSION; execution_id_ = OB_INVALID_ID; snapshot_version_ = OB_INVALID_VERSION; source_table_id_ = OB_INVALID_ID; - calc_table_id_ = OB_INVALID_ID; task_id_ = 0; + calc_items_.reset(); } +int ObCalcColumnChecksumRequestArg::assign(const ObCalcColumnChecksumRequestArg &other) +{ + int ret = common::OB_SUCCESS; + tenant_id_ = other.tenant_id_; + target_table_id_ = other.target_table_id_; + schema_version_ = other.schema_version_; + execution_id_ = other.execution_id_; + snapshot_version_ = other.snapshot_version_; + source_table_id_ = other.source_table_id_; + task_id_ = other.task_id_; + if (OB_FAIL(calc_items_.assign(other.calc_items_))) { + LOG_WARN("assign calc_items failed", K(ret), K(other.calc_items_.count())); + } + return ret; +} + +OB_SERIALIZE_MEMBER(ObCalcColumnChecksumRequestRes, ret_codes_); + OB_SERIALIZE_MEMBER( ObCalcColumnChecksumResponseArg, tablet_id_, @@ -3041,22 +3081,56 @@ DEF_TO_STRING(ObSwitchLeaderArg) OB_SERIALIZE_MEMBER(ObSwitchLeaderArg, ls_id_, role_, tenant_id_, dest_server_); -OB_SERIALIZE_MEMBER(ObCheckSchemaVersionElapsedArg, tenant_id_, ls_id_, data_tablet_id_, schema_version_, need_wait_trans_end_); +OB_SERIALIZE_MEMBER(ObLSTabletPair, ls_id_, tablet_id_); +OB_SERIALIZE_MEMBER(ObCheckSchemaVersionElapsedArg, tenant_id_, schema_version_, need_wait_trans_end_, tablets_); + +bool ObCheckSchemaVersionElapsedArg::is_valid() const +{ + bool bret = OB_INVALID_ID != tenant_id_ && schema_version_ > 0 && !tablets_.empty(); + for (int64_t i = 0; bret && i < tablets_.count(); ++i) { + bret = tablets_.at(i).is_valid(); + } + return bret; +} void ObCheckSchemaVersionElapsedArg::reuse() { tenant_id_ = OB_INVALID_ID; - ls_id_.reset(); - data_tablet_id_.reset(); schema_version_ = 0; need_wait_trans_end_ = true; + tablets_.reuse(); } -OB_SERIALIZE_MEMBER(ObCheckModifyTimeElapsedArg, tenant_id_, ls_id_, tablet_id_, sstable_exist_ts_); +bool ObCheckModifyTimeElapsedArg::is_valid() const +{ + bool bret = OB_INVALID_ID != tenant_id_ && sstable_exist_ts_ > 0; + for (int64_t i = 0; bret && i < tablets_.count(); ++i) { + bret = tablets_.at(i).is_valid(); + } + return bret; +} -OB_SERIALIZE_MEMBER(ObCheckSchemaVersionElapsedResult, snapshot_); +void ObCheckModifyTimeElapsedArg::reuse() +{ + tenant_id_ = OB_INVALID_ID; + sstable_exist_ts_ = 0; + tablets_.reuse(); +} + +OB_SERIALIZE_MEMBER(ObCheckModifyTimeElapsedArg, tenant_id_, sstable_exist_ts_, tablets_); + +bool ObCheckSchemaVersionElapsedResult::is_valid() const +{ + bool bret = !results_.empty(); + for (int64_t i = 0; bret && i < results_.count(); ++i) { + bret = (common::OB_INVALID_TIMESTAMP != results_.at(i).snapshot_); + } + return bret; +} + +OB_SERIALIZE_MEMBER(ObCheckTransElapsedResult, ret_code_, snapshot_, pending_tx_id_); +OB_SERIALIZE_MEMBER(ObCheckSchemaVersionElapsedResult, results_); -OB_SERIALIZE_MEMBER(ObCheckModifyTimeElapsedResult, snapshot_); OB_SERIALIZE_MEMBER(CandidateStatus, candidate_status_); diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 789131672..f88461cf6 100644 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -2857,7 +2857,7 @@ private: DISALLOW_COPY_AND_ASSIGN(ObGetMinSSTableSchemaVersionRes); }; -struct ObCalcColumnChecksumRequestArg +struct ObCalcColumnChecksumRequestArg final { OB_UNIS_VERSION(1); public: @@ -2865,38 +2865,43 @@ public: ~ObCalcColumnChecksumRequestArg() = default; bool is_valid() const; void reset(); - int assign(const ObCalcColumnChecksumRequestArg &other) { - int ret = common::OB_SUCCESS; - tenant_id_ = other.tenant_id_; - ls_id_ = other.ls_id_; - tablet_id_ = other.tablet_id_; - target_table_id_ = other.target_table_id_; - schema_version_ = other.schema_version_; - execution_id_ = other.execution_id_; - snapshot_version_ = other.snapshot_version_; - source_table_id_ = other.source_table_id_; - calc_table_id_ = other.calc_table_id_; - task_id_ = other.task_id_; - return ret; - } - TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(tablet_id), - K_(target_table_id), K_(schema_version), K_(execution_id), - K_(snapshot_version), K_(source_table_id), K_(calc_table_id)); + int assign(const ObCalcColumnChecksumRequestArg &other); + TO_STRING_KV(K_(tenant_id), K_(target_table_id), K_(schema_version), K_(execution_id), + K_(snapshot_version), K_(source_table_id), K_(task_id), K_(calc_items)); + struct SingleItem final + { + OB_UNIS_VERSION(1); + public: + SingleItem() { reset(); } + ~SingleItem() = default; + bool is_valid() const; + void reset(); + int assign(const SingleItem &other); + TO_STRING_KV(K_(ls_id), K_(tablet_id), K_(calc_table_id)); + share::ObLSID ls_id_; + common::ObTabletID tablet_id_; + int64_t calc_table_id_; + }; public: uint64_t tenant_id_; - share::ObLSID ls_id_; - common::ObTabletID tablet_id_; uint64_t target_table_id_; int64_t schema_version_; uint64_t execution_id_; int64_t snapshot_version_; int64_t source_table_id_; - int64_t calc_table_id_; int64_t task_id_; + common::ObSEArray calc_items_; private: DISALLOW_COPY_AND_ASSIGN(ObCalcColumnChecksumRequestArg); }; +struct ObCalcColumnChecksumRequestRes final +{ + OB_UNIS_VERSION(1); +public: + common::ObSEArray ret_codes_; +}; + struct ObCalcColumnChecksumResponseArg { OB_UNIS_VERSION(2); @@ -3604,66 +3609,69 @@ public: bool force_refresh_; }; -struct ObCheckSchemaVersionElapsedArg +struct ObLSTabletPair final +{ + OB_UNIS_VERSION(1); +public: + bool is_valid() const { return ls_id_.is_valid() && tablet_id_.is_valid(); } + TO_STRING_KV(K_(ls_id), K_(tablet_id)); + share::ObLSID ls_id_; + common::ObTabletID tablet_id_; +}; + +struct ObCheckSchemaVersionElapsedArg final { OB_UNIS_VERSION(1); public: ObCheckSchemaVersionElapsedArg() - : tenant_id_(), ls_id_(), data_tablet_id_(0), schema_version_(0), need_wait_trans_end_(true) + : tenant_id_(), schema_version_(0), need_wait_trans_end_(true) {} - bool is_valid() const { - return OB_INVALID_ID != tenant_id_ && ls_id_.is_valid() && data_tablet_id_.is_valid() && schema_version_ > 0; - } + bool is_valid() const; void reuse(); - TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(data_tablet_id), K_(schema_version), K_(need_wait_trans_end)); + TO_STRING_KV(K_(tenant_id), K_(schema_version), K_(need_wait_trans_end), K_(tablets)); + uint64_t tenant_id_; - share::ObLSID ls_id_; - common::ObTabletID data_tablet_id_; int64_t schema_version_; bool need_wait_trans_end_; + ObSEArray tablets_; }; -struct ObCheckModifyTimeElapsedArg +struct ObCheckModifyTimeElapsedArg final { OB_UNIS_VERSION(1); public: - ObCheckModifyTimeElapsedArg() : tenant_id_(OB_INVALID_ID), ls_id_(), tablet_id_(), sstable_exist_ts_(0) {} - bool is_valid() const { - return OB_INVALID_ID != tenant_id_ && ls_id_.is_valid() && tablet_id_.is_valid() && sstable_exist_ts_ > 0; - } - void reuse() {tenant_id_ = OB_INVALID_ID; ls_id_.reset(); tablet_id_.reset(); sstable_exist_ts_ = 0;} - TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(tablet_id), K_(sstable_exist_ts)); + ObCheckModifyTimeElapsedArg() : tenant_id_(OB_INVALID_ID), sstable_exist_ts_(0) {} + bool is_valid() const; + void reuse(); + TO_STRING_KV(K_(tenant_id), K_(sstable_exist_ts), K_(tablets)); uint64_t tenant_id_; - share::ObLSID ls_id_; - common::ObTabletID tablet_id_; int64_t sstable_exist_ts_; + ObSEArray tablets_; +}; + +struct ObCheckTransElapsedResult final +{ + OB_UNIS_VERSION(1); +public: + ObCheckTransElapsedResult() : ret_code_(common::OB_SUCCESS), snapshot_(common::OB_INVALID_TIMESTAMP) {} + TO_STRING_KV(K_(ret_code), K_(snapshot), K_(pending_tx_id)); + int ret_code_; + int64_t snapshot_; + transaction::ObTransID pending_tx_id_; }; struct ObCheckSchemaVersionElapsedResult { OB_UNIS_VERSION(1); public: - ObCheckSchemaVersionElapsedResult() - : snapshot_(common::OB_INVALID_TIMESTAMP), pending_tx_id_() {} - bool is_valid() const { return snapshot_ != common::OB_INVALID_TIMESTAMP; } - void reuse() { snapshot_ = common::OB_INVALID_TIMESTAMP; pending_tx_id_.reset(); } - TO_STRING_KV(K_(snapshot), K_(pending_tx_id)); - int64_t snapshot_; - transaction::ObTransID pending_tx_id_; + ObCheckSchemaVersionElapsedResult() {} + bool is_valid() const; + void reuse() { results_.reuse(); } + TO_STRING_KV(K_(results)); + ObSEArray results_; }; -struct ObCheckModifyTimeElapsedResult -{ - OB_UNIS_VERSION(1); -public: - ObCheckModifyTimeElapsedResult() - : snapshot_(common::OB_INVALID_TIMESTAMP), pending_tx_id_() {} - bool is_valid() const { return snapshot_ != common::OB_INVALID_TIMESTAMP; } - void reuse() { snapshot_ = common::OB_INVALID_TIMESTAMP; pending_tx_id_.reset(); } - TO_STRING_KV(K_(snapshot), K_(pending_tx_id)); - int64_t snapshot_; - transaction::ObTransID pending_tx_id_; -}; +typedef ObCheckSchemaVersionElapsedResult ObCheckModifyTimeElapsedResult; class CandidateStatus { diff --git a/src/share/ob_srv_rpc_proxy.h b/src/share/ob_srv_rpc_proxy.h index 4d2ad3064..4f57f0a89 100644 --- a/src/share/ob_srv_rpc_proxy.h +++ b/src/share/ob_srv_rpc_proxy.h @@ -117,7 +117,7 @@ public: RPC_S(PR5 force_switch_ilog_file, OB_FORCE_SWITCH_ILOG_FILE, (ObForceSwitchILogFileArg)); RPC_S(PR5 force_set_all_as_single_replica, OB_FORCE_SET_ALL_AS_SINGLE_REPLICA, (ObForceSetAllAsSingleReplicaArg)); RPC_S(PR5 force_set_server_list, OB_FORCE_SET_SERVER_LIST, (ObForceSetServerListArg)); - RPC_S(PR5 calc_column_checksum_request, OB_CALC_COLUMN_CHECKSUM_REQUEST, (ObCalcColumnChecksumRequestArg)); + RPC_S(PR5 calc_column_checksum_request, OB_CALC_COLUMN_CHECKSUM_REQUEST, (ObCalcColumnChecksumRequestArg), obrpc::ObCalcColumnChecksumRequestRes); RPC_AP(PR5 build_ddl_single_replica_request, OB_DDL_BUILD_SINGLE_REPLICA_REQUEST, (obrpc::ObDDLBuildSingleReplicaRequestArg), obrpc::ObDDLBuildSingleReplicaRequestResult); RPC_S(PR5 fetch_tablet_autoinc_seq_cache, OB_FETCH_TABLET_AUTOINC_SEQ_CACHE, (obrpc::ObFetchTabletSeqArg), obrpc::ObFetchTabletSeqRes); RPC_AP(PR5 batch_get_tablet_autoinc_seq, OB_BATCH_GET_TABLET_AUTOINC_SEQ, (obrpc::ObBatchGetTabletAutoincSeqArg), obrpc::ObBatchGetTabletAutoincSeqRes); diff --git a/src/sql/engine/expr/ob_expr_to_outfile_row.cpp b/src/sql/engine/expr/ob_expr_to_outfile_row.cpp index 76f49f22c..edd1ba4c7 100644 --- a/src/sql/engine/expr/ob_expr_to_outfile_row.cpp +++ b/src/sql/engine/expr/ob_expr_to_outfile_row.cpp @@ -13,9 +13,9 @@ #define USING_LOG_PREFIX SQL_ENG #include "sql/engine/expr/ob_expr_to_outfile_row.h" -#include -#include "lib/oblog/ob_log.h" -#include "objit/common/ob_item_type.h" +#include +#include "lib/oblog/ob_log.h" +#include "objit/common/ob_item_type.h" #include "sql/session/ob_sql_session_info.h" #include "sql/engine/ob_exec_context.h" @@ -120,9 +120,9 @@ int ObExprToOutfileRow::calc_outfile_info(const ObExpr &expr, ObSQLSessionInfo *session = ctx.exec_ctx_.get_my_session(); if (OB_ISNULL(session)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("session is null", K(ret)); - } else if (OB_ISNULL(out_info.print_params_.tz_info_ = session->get_timezone_info())) { - ret = OB_ERR_UNEXPECTED; + LOG_WARN("session is null", K(ret)); + } else if (OB_ISNULL(out_info.print_params_.tz_info_ = session->get_timezone_info())) { + ret = OB_ERR_UNEXPECTED; LOG_WARN("fail to get timezone info", K(ret)); } else { out_info.print_params_.use_memcpy_ = true; @@ -130,9 +130,9 @@ int ObExprToOutfileRow::calc_outfile_info(const ObExpr &expr, out_info.is_optional_ = expr.locate_param_datum(ctx, PARAM_OPTIONAL).get_bool(); } - for (int i = 0; OB_SUCC(ret) && i < PARAM_SELECT_ITEM; ++i) { - OZ(expr.locate_param_datum(ctx, i).to_obj(objs_array[i], expr.args_[i]->obj_meta_, - expr.args_[i]->obj_datum_map_)); + for (int i = 0; OB_SUCC(ret) && i < PARAM_SELECT_ITEM; ++i) { + OZ(expr.locate_param_datum(ctx, i).to_obj(objs_array[i], expr.args_[i]->obj_meta_, + expr.args_[i]->obj_datum_map_)); } if (OB_SUCC(ret)) { out_info.field_ = objs_array[PARAM_FIELD];