use batch rpc in ddl, for check trans end and and calc column checksum
This commit is contained in:
parent
9b0627ec4d
commit
ba634cea50
@ -439,7 +439,7 @@ int ObCalcColumnChecksumRequestP::process()
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_ERROR("invalid arguments", K(ret), KP(gctx_.ob_service_));
|
||||
} else {
|
||||
ret = gctx_.ob_service_->calc_column_checksum_request(arg_);
|
||||
ret = gctx_.ob_service_->calc_column_checksum_request(arg_, result_);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -518,7 +518,7 @@ int ObService::get_min_sstable_schema_version(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObService::calc_column_checksum_request(const obrpc::ObCalcColumnChecksumRequestArg &arg)
|
||||
int ObService::calc_column_checksum_request(const obrpc::ObCalcColumnChecksumRequestArg &arg, obrpc::ObCalcColumnChecksumRequestRes &res)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!inited_)) {
|
||||
@ -532,30 +532,56 @@ int ObService::calc_column_checksum_request(const obrpc::ObCalcColumnChecksumReq
|
||||
const uint64_t tenant_id = arg.tenant_id_;
|
||||
MTL_SWITCH(tenant_id) {
|
||||
ObGlobalUniqueIndexCallback *callback = NULL;
|
||||
ObUniqueCheckingDag *dag = NULL;
|
||||
ObTenantDagScheduler* dag_scheduler = nullptr;
|
||||
if (OB_ISNULL(dag_scheduler = MTL(ObTenantDagScheduler *))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("error unexpected, dag scheduler must not be nullptr", KR(ret));
|
||||
} else if (OB_FAIL(dag_scheduler->alloc_dag(dag))) {
|
||||
STORAGE_LOG(WARN, "fail to alloc dag", KR(ret));
|
||||
} else if (OB_FAIL(dag->init(arg.tenant_id_, arg.ls_id_, arg.tablet_id_, arg.calc_table_id_ == arg.target_table_id_, arg.target_table_id_, arg.schema_version_, arg.task_id_, arg.execution_id_, arg.snapshot_version_))) {
|
||||
STORAGE_LOG(WARN, "fail to init ObUniqueCheckingDag", KR(ret));
|
||||
} else if (OB_FAIL(dag->alloc_global_index_task_callback(arg.tablet_id_, arg.target_table_id_, arg.source_table_id_, arg.schema_version_, arg.task_id_, callback))) {
|
||||
STORAGE_LOG(WARN, "fail to alloc global index task callback", KR(ret));
|
||||
} else if (OB_FAIL(dag->alloc_unique_checking_prepare_task(callback))) {
|
||||
STORAGE_LOG(WARN, "fail to alloc unique checking prepare task", KR(ret));
|
||||
} else if (OB_FAIL(dag_scheduler->add_dag(dag))) {
|
||||
if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) {
|
||||
STORAGE_LOG(WARN, "fail to add dag to queue", KR(ret));
|
||||
} else {
|
||||
ret = OB_EAGAIN;
|
||||
} else if (OB_FAIL(res.ret_codes_.reserve(arg.calc_items_.count()))) {
|
||||
LOG_WARN("reserve return code array failed", K(ret), K(arg.calc_items_.count()));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < arg.calc_items_.count(); ++i) {
|
||||
const ObCalcColumnChecksumRequestArg::SingleItem &calc_item = arg.calc_items_.at(i);
|
||||
ObUniqueCheckingDag *dag = NULL;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(dag_scheduler->alloc_dag(dag))) {
|
||||
STORAGE_LOG(WARN, "fail to alloc dag", KR(tmp_ret));
|
||||
} else if (OB_TMP_FAIL(dag->init(arg.tenant_id_,
|
||||
calc_item.ls_id_,
|
||||
calc_item.tablet_id_,
|
||||
calc_item.calc_table_id_ == arg.target_table_id_,
|
||||
arg.target_table_id_,
|
||||
arg.schema_version_,
|
||||
arg.task_id_,
|
||||
arg.execution_id_,
|
||||
arg.snapshot_version_))) {
|
||||
STORAGE_LOG(WARN, "fail to init ObUniqueCheckingDag", KR(tmp_ret));
|
||||
} else if (OB_TMP_FAIL(dag->alloc_global_index_task_callback(calc_item.tablet_id_,
|
||||
arg.target_table_id_,
|
||||
arg.source_table_id_,
|
||||
arg.schema_version_,
|
||||
arg.task_id_,
|
||||
callback))) {
|
||||
STORAGE_LOG(WARN, "fail to alloc global index task callback", KR(tmp_ret));
|
||||
} else if (OB_TMP_FAIL(dag->alloc_unique_checking_prepare_task(callback))) {
|
||||
STORAGE_LOG(WARN, "fail to alloc unique checking prepare task", KR(tmp_ret));
|
||||
} else if (OB_TMP_FAIL(dag_scheduler->add_dag(dag))) {
|
||||
if (OB_EAGAIN != tmp_ret && OB_SIZE_OVERFLOW != tmp_ret) {
|
||||
STORAGE_LOG(WARN, "fail to add dag to queue", KR(tmp_ret));
|
||||
} else {
|
||||
tmp_ret = OB_EAGAIN;
|
||||
}
|
||||
}
|
||||
if (OB_SUCCESS != tmp_ret && NULL != dag) {
|
||||
dag_scheduler->free_dag(*dag);
|
||||
dag = NULL;
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(res.ret_codes_.push_back(tmp_ret))) {
|
||||
LOG_WARN("push back return code failed", K(ret), K(tmp_ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret) && NULL != dag) {
|
||||
dag_scheduler->free_dag(*dag);
|
||||
dag = NULL;
|
||||
}
|
||||
}
|
||||
LOG_INFO("receive column checksum request", K(arg));
|
||||
}
|
||||
@ -1010,18 +1036,34 @@ int ObService::check_modify_time_elapsed(
|
||||
ObLSHandle ls_handle;
|
||||
transaction::ObTransService *txs = MTL(transaction::ObTransService *);
|
||||
ObLSService *ls_service = MTL(ObLSService *);
|
||||
if (OB_FAIL(ls_service->get_ls(ObLSID(arg.ls_id_), ls_handle, ObLSGetMod::OBSERVER_MOD))) {
|
||||
LOG_WARN("get ls failed", K(ret), K(arg.ls_id_));
|
||||
} else if (OB_FAIL(ls_handle.get_ls()->check_modify_time_elapsed(arg.tablet_id_,
|
||||
arg.sstable_exist_ts_,
|
||||
result.pending_tx_id_))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
LOG_WARN("check schema version elapsed failed", K(ret), K(arg));
|
||||
if (OB_FAIL(result.results_.reserve(arg.tablets_.count()))) {
|
||||
LOG_WARN("reserve result array failed", K(ret), K(arg.tablets_.count()));
|
||||
}
|
||||
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < arg.tablets_.count(); ++i) {
|
||||
ObTabletHandle tablet_handle;
|
||||
ObLSHandle ls_handle;
|
||||
const ObLSID &ls_id = arg.tablets_.at(i).ls_id_;
|
||||
const ObTabletID &tablet_id = arg.tablets_.at(i).tablet_id_;
|
||||
ObCheckTransElapsedResult single_result;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::OBSERVER_MOD))) {
|
||||
LOG_WARN("get ls failed", K(tmp_ret), K(ls_id));
|
||||
} else if (OB_TMP_FAIL(ls_handle.get_ls()->check_modify_time_elapsed(tablet_id,
|
||||
arg.sstable_exist_ts_,
|
||||
single_result.pending_tx_id_))) {
|
||||
if (OB_EAGAIN != tmp_ret) {
|
||||
LOG_WARN("check schema version elapsed failed", K(tmp_ret), K(arg));
|
||||
}
|
||||
} else if (OB_TMP_FAIL(txs->get_max_commit_version(single_result.snapshot_))) {
|
||||
LOG_WARN("fail to get max commit version", K(tmp_ret));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
single_result.ret_code_ = tmp_ret;
|
||||
if (OB_FAIL(result.results_.push_back(single_result))) {
|
||||
LOG_WARN("push back single result failed", K(ret), K(i), K(single_result));
|
||||
}
|
||||
}
|
||||
} else if (OB_FAIL(txs->get_max_commit_version(result.snapshot_))) {
|
||||
LOG_WARN("fail to get max commit version", K(ret));
|
||||
} else {
|
||||
LOG_INFO("succeed to wait transaction end", K(arg));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1042,21 +1084,36 @@ int ObService::check_schema_version_elapsed(
|
||||
LOG_WARN("invalid argument", K(ret), K(arg));
|
||||
} else {
|
||||
MTL_SWITCH(arg.tenant_id_) {
|
||||
ObTabletHandle tablet_handle;
|
||||
ObLSHandle ls_handle;
|
||||
ObLSService *ls_service = nullptr;
|
||||
if (OB_ISNULL(ls_service = MTL(ObLSService *))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("error unexpected, get ls service failed", K(ret));
|
||||
} else if (OB_FAIL(ls_service->get_ls(ObLSID(arg.ls_id_), ls_handle, ObLSGetMod::OBSERVER_MOD))) {
|
||||
LOG_WARN("get ls failed", K(ret), K(arg.ls_id_));
|
||||
} else if (OB_FAIL(ls_handle.get_ls()->get_tablet(arg.data_tablet_id_, tablet_handle))) {
|
||||
LOG_WARN("fail to get tablet", K(ret), K(arg));
|
||||
} else if (OB_FAIL(tablet_handle.get_obj()->check_schema_version_elapsed(arg.schema_version_,
|
||||
arg.need_wait_trans_end_,
|
||||
result.snapshot_,
|
||||
result.pending_tx_id_))) {
|
||||
LOG_WARN("check schema version elapsed failed", K(ret), K(arg));
|
||||
} else if (OB_FAIL(result.results_.reserve(arg.tablets_.count()))) {
|
||||
LOG_WARN("reserve result array failed", K(ret), K(arg.tablets_.count()));
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < arg.tablets_.count(); ++i) {
|
||||
ObTabletHandle tablet_handle;
|
||||
ObLSHandle ls_handle;
|
||||
const ObLSID &ls_id = arg.tablets_.at(i).ls_id_;
|
||||
const ObTabletID &tablet_id = arg.tablets_.at(i).tablet_id_;
|
||||
ObCheckTransElapsedResult single_result;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::OBSERVER_MOD))) {
|
||||
LOG_WARN("get ls failed", K(tmp_ret), K(i), K(ls_id));
|
||||
} else if (OB_TMP_FAIL(ls_handle.get_ls()->get_tablet(tablet_id, tablet_handle))) {
|
||||
LOG_WARN("fail to get tablet", K(tmp_ret), K(i), K(ls_id), K(tablet_id));
|
||||
} else if (OB_TMP_FAIL(tablet_handle.get_obj()->check_schema_version_elapsed(arg.schema_version_,
|
||||
arg.need_wait_trans_end_,
|
||||
single_result.snapshot_,
|
||||
single_result.pending_tx_id_))) {
|
||||
LOG_WARN("check schema version elapsed failed", K(tmp_ret), K(arg), K(ls_id), K(tablet_id));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
single_result.ret_code_ = tmp_ret;
|
||||
if (OB_FAIL(result.results_.push_back(single_result))) {
|
||||
LOG_WARN("push back single result failed", K(ret), K(i), K(single_result));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -126,7 +126,7 @@ public:
|
||||
obrpc::ObGetMinSSTableSchemaVersionRes &result);
|
||||
// ObRpcSwitchSchemaP @RS DDL
|
||||
int switch_schema(const obrpc::ObSwitchSchemaArg &arg, obrpc::ObSwitchSchemaResult &result);
|
||||
int calc_column_checksum_request(const obrpc::ObCalcColumnChecksumRequestArg &arg);
|
||||
int calc_column_checksum_request(const obrpc::ObCalcColumnChecksumRequestArg &arg, obrpc::ObCalcColumnChecksumRequestRes &res);
|
||||
int build_ddl_single_replica_request(const obrpc::ObDDLBuildSingleReplicaRequestArg &arg);
|
||||
int write_ddl_sstable_commit_log(const obrpc::ObDDLWriteSSTableCommitLogArg &arg);
|
||||
int stop_partition_write(const obrpc::Int64 &switchover_timestamp, obrpc::Int64 &result);
|
||||
|
@ -336,8 +336,8 @@ int ObDDLTask::switch_status(ObDDLTaskStatus new_status, const int ret_code)
|
||||
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
|
||||
}
|
||||
if (OB_SUCC(ret) && old_status != real_new_status) {
|
||||
ROOTSERVICE_EVENT_ADD("ddl_scheduler", "switch_state", K_(tenant_id), K_(object_id), K_(target_object_id),
|
||||
"pre_state", old_status, "new_state", real_new_status, K_(snapshot_version));
|
||||
ROOTSERVICE_EVENT_ADD("ddl_scheduler", "switch_state", K_(tenant_id), K_(task_id), K_(object_id), K_(target_object_id),
|
||||
"new_state", real_new_status, K_(snapshot_version), ret_code_);
|
||||
task_status_ = real_new_status;
|
||||
}
|
||||
}
|
||||
@ -623,6 +623,136 @@ void ObDDLWaitTransEndCtx::reset()
|
||||
snapshot_array_.reset();
|
||||
}
|
||||
|
||||
struct SendItem final
|
||||
{
|
||||
public:
|
||||
bool operator < (const SendItem &other) const { return leader_addr_ < other.leader_addr_; }
|
||||
TO_STRING_KV(K_(leader_addr), K_(ls_id), K_(tablet_id), KP_(other_info));
|
||||
public:
|
||||
ObAddr leader_addr_;
|
||||
ObLSID ls_id_;
|
||||
ObTabletID tablet_id_;
|
||||
void *other_info_;
|
||||
};
|
||||
|
||||
int group_tablets_leader_addr(const uint64_t tenant_id, const ObIArray<ObTabletID> &tablet_ids, ObLocationService *location_service, ObArray<SendItem> &group_items)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
group_items.reuse();
|
||||
if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || nullptr == location_service)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(tablet_ids.count()));
|
||||
} else {
|
||||
const int64_t rpc_timeout = max(GCONF.rpc_timeout, 1000L * 1000L * 9L);
|
||||
if (OB_FAIL(group_items.reserve(tablet_ids.count()))) {
|
||||
LOG_WARN("reserve send array failed", K(ret), K(tablet_ids.count()));
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); ++i) {
|
||||
const ObTabletID &tablet_id = tablet_ids.at(i);
|
||||
SendItem item;
|
||||
if (OB_FAIL(ObDDLUtil::get_tablet_leader_addr(location_service,
|
||||
tenant_id,
|
||||
tablet_id,
|
||||
rpc_timeout,
|
||||
item.ls_id_,
|
||||
item.leader_addr_))) {
|
||||
LOG_WARN("get tablet leader addr failed", K(ret));
|
||||
} else if (FALSE_IT(item.tablet_id_ = tablet_id)) {
|
||||
} else if (OB_FAIL(group_items.push_back(item))) {
|
||||
LOG_WARN("push back send item failed", K(ret), K(item));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template<typename Proxy, typename Arg, typename Res>
|
||||
int check_trans_end(ObArray<SendItem> &send_array,
|
||||
Proxy &proxy,
|
||||
Arg &arg,
|
||||
Res *res,
|
||||
ObIArray<int> &ret_array,
|
||||
ObIArray<int64_t> &snapshot_array)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ret_array.reuse();
|
||||
snapshot_array.reuse();
|
||||
if (OB_UNLIKELY(send_array.empty())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret));
|
||||
} else {
|
||||
// group by leader addr and send batch rpc
|
||||
std::sort(send_array.begin(), send_array.end());
|
||||
|
||||
const int64_t rpc_timeout = max(GCONF.rpc_timeout, 1000L * 1000L * 9L);
|
||||
ObAddr last_addr;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < send_array.count(); ++i) {
|
||||
const SendItem &send_item = send_array.at(i);
|
||||
if (send_item.leader_addr_ != last_addr) {
|
||||
if (arg.tablets_.count() > 0) {
|
||||
if (OB_FAIL(proxy.call(last_addr, rpc_timeout, arg.tenant_id_, arg))) {
|
||||
LOG_WARN("send rpc failed", K(ret), K(arg), K(last_addr), K(arg.tenant_id_));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
arg.tablets_.reuse();
|
||||
last_addr = send_item.leader_addr_;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
ObLSTabletPair ls_tablet_pair;
|
||||
ls_tablet_pair.ls_id_ = send_item.ls_id_;
|
||||
ls_tablet_pair.tablet_id_ = send_item.tablet_id_;
|
||||
if (OB_FAIL(arg.tablets_.push_back(ls_tablet_pair))) {
|
||||
LOG_WARN("push back send item failed", K(ret), K(i), K(send_item));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && arg.tablets_.count() > 0) {
|
||||
if (OB_FAIL(proxy.call(last_addr, rpc_timeout, arg.tenant_id_, arg))) {
|
||||
LOG_WARN("send rpc failed", K(ret), K(arg), K(last_addr), K(arg.tenant_id_));
|
||||
}
|
||||
}
|
||||
|
||||
// collect result
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
common::ObArray<int> tmp_ret_array;
|
||||
if (OB_SUCCESS != (tmp_ret = proxy.wait_all(tmp_ret_array))) {
|
||||
LOG_WARN("rpc proxy wait failed", K(tmp_ret));
|
||||
ret = OB_SUCCESS == ret ? tmp_ret : ret;
|
||||
} else if (OB_SUCC(ret)) {
|
||||
const ObIArray<const Res *> &result_array = proxy.get_results();
|
||||
const ObIArray<Arg> &arg_array = proxy.get_args();
|
||||
const ObIArray<ObAddr> &dest_array = proxy.get_dests();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < result_array.count(); ++i) {
|
||||
const Res *cur_result = result_array.at(i);
|
||||
const Arg &cur_arg = arg_array.at(i);
|
||||
const ObAddr &cur_dest_addr = dest_array.at(i);
|
||||
if (OB_ISNULL(cur_result)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("result it null", K(ret), K(i), KP(cur_result));
|
||||
} else if (OB_FAIL(tmp_ret_array.at(i))) {
|
||||
LOG_WARN("check shema trans elapsed failed", K(ret), K(i), K(cur_dest_addr), K(cur_arg), KPC(cur_result));
|
||||
} else if (cur_arg.tablets_.count() != cur_result->results_.count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("the result count does not match the argument", K(ret), K(cur_arg), KPC(cur_result));
|
||||
} else {
|
||||
for (int64_t j = 0; OB_SUCC(ret) && j < cur_result->results_.count(); ++j) {
|
||||
const obrpc::ObLSTabletPair &send_item = cur_arg.tablets_.at(j);
|
||||
const obrpc::ObCheckTransElapsedResult &result_item = cur_result->results_.at(j);
|
||||
if (OB_FAIL(ret_array.push_back(result_item.ret_code_))) {
|
||||
LOG_WARN("push back ret code failed", K(ret), K(i), K(j), K(send_item), K(result_item));
|
||||
} else if (OB_FAIL(snapshot_array.push_back(result_item.snapshot_))) {
|
||||
LOG_WARN("push back snapshot failed", K(ret), K(i), K(j), K(send_item), K(result_item));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLWaitTransEndCtx::check_schema_trans_end(
|
||||
const int64_t schema_version,
|
||||
const common::ObIArray<common::ObTabletID> &tablet_ids,
|
||||
@ -636,52 +766,22 @@ int ObDDLWaitTransEndCtx::check_schema_trans_end(
|
||||
int ret = OB_SUCCESS;
|
||||
ret_array.reset();
|
||||
snapshot_array.reset();
|
||||
ObArray<SendItem> send_array;
|
||||
if (OB_UNLIKELY(schema_version <= 0 || tablet_ids.count() <= 0 || OB_INVALID_ID == tenant_id
|
||||
|| nullptr == rpc_proxy || nullptr == location_service)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(schema_version), K(tablet_ids.count()), K(tenant_id), KP(rpc_proxy), KP(location_service));
|
||||
} else if (OB_FAIL(group_tablets_leader_addr(tenant_id, tablet_ids, location_service, send_array))) {
|
||||
LOG_WARN("group tablet by leader addr failed", K(ret), K(tenant_id), K(tablet_ids.count()));
|
||||
} else {
|
||||
ObCheckSchemaVersionElapsedProxy proxy(*rpc_proxy,
|
||||
&obrpc::ObSrvRpcProxy::check_schema_version_elapsed);
|
||||
ObCheckSchemaVersionElapsedProxy proxy(*rpc_proxy, &obrpc::ObSrvRpcProxy::check_schema_version_elapsed);
|
||||
obrpc::ObCheckSchemaVersionElapsedArg arg;
|
||||
const int64_t rpc_timeout = max(GCONF.rpc_timeout, 1000L * 1000L * 9L);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); ++i) {
|
||||
const ObTabletID &tablet_id = tablet_ids.at(i);
|
||||
arg.tenant_id_ = tenant_id;
|
||||
arg.data_tablet_id_ = tablet_id;
|
||||
arg.schema_version_ = schema_version;
|
||||
arg.need_wait_trans_end_ = need_wait_trans_end;
|
||||
ObAddr leader_addr;
|
||||
if (OB_FAIL(ObDDLUtil::get_tablet_leader_addr(location_service, tenant_id, tablet_id, rpc_timeout, arg.ls_id_, leader_addr))) {
|
||||
LOG_WARN("get tablet leader addr failed", K(ret));
|
||||
} else if (OB_FAIL(proxy.call(leader_addr, rpc_timeout, tenant_id, arg))) {
|
||||
LOG_WARN("send rpc failed", K(ret), K(arg), K(leader_addr), K(tenant_id));
|
||||
}
|
||||
}
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
common::ObArray<int> tmp_ret_array;
|
||||
if (OB_SUCCESS != (tmp_ret = proxy.wait_all(tmp_ret_array))) {
|
||||
LOG_WARN("rpc proxy wait failed", K(tmp_ret));
|
||||
ret = OB_SUCCESS == ret ? tmp_ret : ret;
|
||||
} else if (OB_SUCC(ret)) {
|
||||
const ObIArray<const obrpc::ObCheckSchemaVersionElapsedResult *> &result_array = proxy.get_results();
|
||||
if (tmp_ret_array.count() != tablet_ids.count() || result_array.count() != tablet_ids.count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("result count not match", K(ret), K(tablet_ids.count()), K(tmp_ret_array.count()), K(result_array.count()));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < result_array.count(); ++i) {
|
||||
const ObTabletID &tablet_id = tablet_ids.at(i);
|
||||
const obrpc::ObCheckSchemaVersionElapsedResult *cur_result = result_array.at(i);
|
||||
if (OB_ISNULL(cur_result)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("result it null", K(ret), K(tablet_id), K(i), KP(cur_result));
|
||||
} else if (OB_FAIL(ret_array.push_back(tmp_ret_array.at(i)))) {
|
||||
LOG_WARN("push back ret code failed", K(ret), K(tablet_id), K(tmp_ret_array.at(i)));
|
||||
} else if (OB_FAIL(snapshot_array.push_back(cur_result->snapshot_))) {
|
||||
LOG_WARN("push back snapshot failed", K(ret), K(tablet_id), K(*cur_result));
|
||||
}
|
||||
}
|
||||
}
|
||||
obrpc::ObCheckSchemaVersionElapsedResult *res = nullptr;
|
||||
arg.tenant_id_ = tenant_id;
|
||||
arg.schema_version_ = schema_version;
|
||||
arg.need_wait_trans_end_ = need_wait_trans_end;
|
||||
if (OB_FAIL(check_trans_end(send_array, proxy, arg, res, ret_array, snapshot_array))) {
|
||||
LOG_WARN("check trans end failed", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -699,52 +799,22 @@ int ObDDLWaitTransEndCtx::check_sstable_trans_end(
|
||||
int ret = OB_SUCCESS;
|
||||
ret_array.reset();
|
||||
snapshot_array.reset();
|
||||
ObArray<SendItem> send_array;
|
||||
if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || sstable_exist_ts <= 0 || tablet_ids.count() <= 0
|
||||
|| nullptr == rpc_proxy || nullptr == location_service)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(sstable_exist_ts), K(tablet_ids.count()),
|
||||
KP(rpc_proxy), KP(location_service));
|
||||
} else if (OB_FAIL(group_tablets_leader_addr(tenant_id, tablet_ids, location_service, send_array))) {
|
||||
LOG_WARN("group tablet by leader addr failed", K(ret), K(tenant_id), K(tablet_ids.count()));
|
||||
} else {
|
||||
ObCheckCtxCreateTimestampElapsedProxy proxy(*rpc_proxy,
|
||||
&obrpc::ObSrvRpcProxy::check_modify_time_elapsed);
|
||||
ObCheckCtxCreateTimestampElapsedProxy proxy(*rpc_proxy, &obrpc::ObSrvRpcProxy::check_modify_time_elapsed);
|
||||
obrpc::ObCheckModifyTimeElapsedArg arg;
|
||||
int64_t rpc_timeout = max(GCONF.rpc_timeout, 1000L * 1000L * 9L);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); ++i) {
|
||||
const ObTabletID &tablet_id = tablet_ids.at(i);
|
||||
arg.tenant_id_ = tenant_id;
|
||||
arg.tablet_id_ = tablet_id;
|
||||
arg.sstable_exist_ts_ = sstable_exist_ts;
|
||||
ObAddr leader_addr;
|
||||
if (OB_FAIL(ObDDLUtil::get_tablet_leader_addr(location_service, tenant_id, tablet_id, rpc_timeout, arg.ls_id_, leader_addr))) {
|
||||
LOG_WARN("get tablet leader addr failed", K(ret));
|
||||
} else if (OB_FAIL(proxy.call(leader_addr, rpc_timeout, tenant_id, arg))) {
|
||||
LOG_WARN("send rpc failed", K(ret), K(arg), K(leader_addr), K(tenant_id));
|
||||
}
|
||||
}
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
common::ObArray<int> tmp_ret_array;
|
||||
if (OB_SUCCESS != (tmp_ret = proxy.wait_all(tmp_ret_array))) {
|
||||
LOG_WARN("rpc proxy wait failed", K(tmp_ret));
|
||||
ret = OB_SUCCESS == ret ? tmp_ret : ret;
|
||||
} else if (OB_SUCC(ret)) {
|
||||
const auto &result_array = proxy.get_results();
|
||||
if (tmp_ret_array.count() != tablet_ids.count() || result_array.count() != tablet_ids.count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("result count not match", K(ret), K(tablet_ids.count()), K(tmp_ret_array.count()), K(result_array.count()));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < result_array.count(); ++i) {
|
||||
const ObTabletID &tablet_id = tablet_ids.at(i);
|
||||
const auto *cur_result = result_array.at(i);
|
||||
if (OB_ISNULL(cur_result)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("result it null", K(ret), K(tablet_id), K(i), KP(cur_result));
|
||||
} else if (OB_FAIL(ret_array.push_back(tmp_ret_array.at(i)))) {
|
||||
LOG_WARN("push back ret code failed", K(ret), K(tablet_id), K(tmp_ret_array.at(i)));
|
||||
} else if (OB_FAIL(snapshot_array.push_back(cur_result->snapshot_))) {
|
||||
LOG_WARN("push back snapshot failed", K(ret), K(tablet_id), K(*cur_result));
|
||||
}
|
||||
}
|
||||
}
|
||||
obrpc::ObCheckModifyTimeElapsedResult *res = nullptr;
|
||||
arg.tenant_id_ = tenant_id;
|
||||
arg.sstable_exist_ts_ = sstable_exist_ts;
|
||||
if (OB_FAIL(check_trans_end(send_array, proxy, arg, res, ret_array, snapshot_array))) {
|
||||
LOG_WARN("check trans end failed", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -765,6 +835,8 @@ int ObDDLWaitTransEndCtx::try_wait(bool &is_trans_end, int64_t &snapshot_version
|
||||
ObArray<int64_t> tablet_pos_indexes;
|
||||
if (OB_FAIL(get_snapshot_check_list(need_check_tablets, tablet_pos_indexes))) {
|
||||
LOG_WARN("get snapshot check list failed", K(ret));
|
||||
} else if (need_check_tablets.empty()) {
|
||||
is_trans_end_ = true;
|
||||
} else {
|
||||
const int64_t check_count = need_check_tablets.count();
|
||||
ObArray<int> ret_codes;
|
||||
@ -1121,6 +1193,48 @@ int ObDDLWaitColumnChecksumCtx::refresh_zombie_task()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int send_batch_calc_rpc(obrpc::ObSrvRpcProxy &rpc_proxy,
|
||||
const ObAddr &leader_addr,
|
||||
const ObCalcColumnChecksumRequestArg &arg,
|
||||
ObCalcColumnChecksumRequestRes &res,
|
||||
ObIArray<SendItem> &send_array,
|
||||
const int64_t group_start_idx,
|
||||
const int64_t group_end_idx,
|
||||
common::SpinRWLock &item_lock,
|
||||
int64_t &send_succ_count)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t rpc_timeout = max(GCONF.rpc_timeout, 1000L * 1000L * 9L);
|
||||
if (OB_FAIL(rpc_proxy.to(leader_addr)
|
||||
.by(arg.tenant_id_)
|
||||
.timeout(rpc_timeout)
|
||||
.calc_column_checksum_request(arg, res))) {
|
||||
LOG_WARN("send rpc failed", K(ret), K(arg), K(leader_addr), K(arg.tenant_id_));
|
||||
} else if (res.ret_codes_.count() != arg.calc_items_.count() || res.ret_codes_.count() != (group_end_idx - group_start_idx)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("return codes count not match the argument", K(ret), K(arg.calc_items_.count()),
|
||||
K(res.ret_codes_.count()), "group_count", group_end_idx - group_start_idx);
|
||||
} else {
|
||||
LOG_INFO("send checksum validation task", K(arg));
|
||||
SpinWLockGuard guard(item_lock);
|
||||
for (int64_t j = group_start_idx, k = 0; j < group_end_idx; ++j, ++k) { // ignore ret
|
||||
PartitionColChecksumStat *item = reinterpret_cast<PartitionColChecksumStat *>(send_array.at(j).other_info_);
|
||||
int ret_code = res.ret_codes_.at(k);
|
||||
if (OB_SUCCESS == ret_code) {
|
||||
item->snapshot_ = arg.snapshot_version_;
|
||||
item->col_checksum_stat_ = CCS_INVALID;
|
||||
++send_succ_count;
|
||||
} else if (OB_EAGAIN == ret_code || OB_HASH_EXIST == ret_code) { // ignore
|
||||
LOG_INFO("send checksum rpc not success", K(ret), KPC(item));
|
||||
} else {
|
||||
ret = OB_SUCCESS == ret ? ret_code : ret; // keep first error code
|
||||
LOG_WARN("fail to calc column checksum request", K(ret_code), K(arg), KPC(item));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLWaitColumnChecksumCtx::send_calc_rpc(int64_t &send_succ_count)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1139,6 +1253,7 @@ int ObDDLWaitColumnChecksumCtx::send_calc_rpc(int64_t &send_succ_count)
|
||||
} else {
|
||||
ObLSID ls_id;
|
||||
const int64_t rpc_timeout = max(GCONF.rpc_timeout, 1000L * 1000L * 9L);
|
||||
ObArray<SendItem> send_array;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < stat_array_.count(); ++i) {
|
||||
PartitionColChecksumStat &item = stat_array_.at(i);
|
||||
ObAddr leader_addr;
|
||||
@ -1150,34 +1265,64 @@ int ObDDLWaitColumnChecksumCtx::send_calc_rpc(int64_t &send_succ_count)
|
||||
if (OB_FAIL(ObDDLUtil::get_tablet_leader_addr(location_service, tenant_id_, item.tablet_id_, rpc_timeout, ls_id, leader_addr))) {
|
||||
LOG_WARN("get tablet leader addr failed", K(ret));
|
||||
} else {
|
||||
ObCalcColumnChecksumRequestArg arg;
|
||||
arg.tenant_id_ = tenant_id_;
|
||||
arg.ls_id_ = ls_id;
|
||||
arg.tablet_id_ = item.tablet_id_;
|
||||
arg.target_table_id_ = target_table_id_;
|
||||
arg.schema_version_ = schema_version_;
|
||||
arg.execution_id_ = item.execution_id_;
|
||||
arg.snapshot_version_ = snapshot_version_;
|
||||
arg.source_table_id_ = source_table_id_;
|
||||
arg.calc_table_id_ = item.table_id_;
|
||||
arg.task_id_ = task_id_;
|
||||
if (OB_FAIL(root_service->get_rpc_proxy().to(leader_addr).by(tenant_id_).timeout(rpc_timeout).calc_column_checksum_request(arg))) {
|
||||
if (OB_EAGAIN == ret || OB_HASH_EXIST == ret) { // ignore
|
||||
LOG_INFO("send checksum rpc not success", K(ret), K(arg));
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("fail to calc column checksum request", K(ret), K(arg));
|
||||
SendItem send_item;
|
||||
send_item.leader_addr_ = leader_addr;
|
||||
send_item.ls_id_ = ls_id;
|
||||
send_item.tablet_id_ = item.tablet_id_;
|
||||
send_item.other_info_ = reinterpret_cast<void *>(&item);
|
||||
if (OB_FAIL(send_array.push_back(send_item))) {
|
||||
LOG_WARN("push send array failed", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
// group by leader addr and send batch rpc
|
||||
std::sort(send_array.begin(), send_array.end());
|
||||
|
||||
ObAddr last_addr;
|
||||
int64_t group_start_idx = 0;
|
||||
ObCalcColumnChecksumRequestArg arg;
|
||||
ObCalcColumnChecksumRequestRes res;
|
||||
arg.tenant_id_ = tenant_id_;
|
||||
arg.task_id_ = task_id_;
|
||||
arg.source_table_id_ = source_table_id_;
|
||||
arg.target_table_id_ = target_table_id_;
|
||||
arg.schema_version_ = schema_version_;
|
||||
arg.execution_id_ = execution_id_;
|
||||
arg.snapshot_version_ = snapshot_version_;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < send_array.count(); ++i) {
|
||||
const SendItem &send_item = send_array.at(i);
|
||||
if (send_item.leader_addr_ != last_addr) {
|
||||
if (arg.calc_items_.count() > 0) {
|
||||
if (OB_FAIL(send_batch_calc_rpc(root_service->get_rpc_proxy(), last_addr,
|
||||
arg, res, send_array, group_start_idx, i, lock_, send_succ_count))) {
|
||||
LOG_WARN("send batch calc rpc failed", K(ret));
|
||||
}
|
||||
} else {
|
||||
LOG_INFO("send checksum validation task", K(arg));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
SpinWLockGuard guard(lock_);
|
||||
item.snapshot_ = snapshot_version_;
|
||||
item.col_checksum_stat_ = CCS_INVALID;
|
||||
++send_succ_count;
|
||||
arg.calc_items_.reuse();
|
||||
res.ret_codes_.reuse();
|
||||
last_addr = send_item.leader_addr_;
|
||||
group_start_idx = i;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
ObCalcColumnChecksumRequestArg::SingleItem calc_item;
|
||||
calc_item.ls_id_ = send_item.ls_id_;
|
||||
calc_item.tablet_id_ = send_item.tablet_id_;
|
||||
calc_item.calc_table_id_ = reinterpret_cast<PartitionColChecksumStat *>(send_item.other_info_)->table_id_;
|
||||
if (OB_FAIL(arg.calc_items_.push_back(calc_item))) {
|
||||
LOG_WARN("push back send item failed", K(ret), K(i), K(send_item));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && arg.calc_items_.count() > 0) {
|
||||
if (OB_FAIL(send_batch_calc_rpc(root_service->get_rpc_proxy(), last_addr,
|
||||
arg, res, send_array, group_start_idx, send_array.count(), lock_, send_succ_count))) {
|
||||
LOG_WARN("send batch calc rpc failed", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2629,41 +2629,81 @@ DEF_TO_STRING(ObRemoveTabletRes)
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObRemoveTabletRes, ret_);
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObCalcColumnChecksumRequestArg::SingleItem, ls_id_, tablet_id_, calc_table_id_);
|
||||
|
||||
bool ObCalcColumnChecksumRequestArg::SingleItem::is_valid() const
|
||||
{
|
||||
return ls_id_.is_valid() && tablet_id_.is_valid() && OB_INVALID_ID != calc_table_id_;
|
||||
}
|
||||
|
||||
void ObCalcColumnChecksumRequestArg::SingleItem::reset()
|
||||
{
|
||||
ls_id_.reset();
|
||||
tablet_id_.reset();
|
||||
calc_table_id_ = OB_INVALID_ID;
|
||||
}
|
||||
|
||||
int ObCalcColumnChecksumRequestArg::SingleItem::assign(const SingleItem &other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ls_id_ = other.ls_id_;
|
||||
tablet_id_ = other.tablet_id_;
|
||||
calc_table_id_ = other.calc_table_id_;
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(
|
||||
ObCalcColumnChecksumRequestArg,
|
||||
tenant_id_,
|
||||
ls_id_,
|
||||
tablet_id_,
|
||||
target_table_id_,
|
||||
schema_version_,
|
||||
execution_id_,
|
||||
snapshot_version_,
|
||||
source_table_id_,
|
||||
calc_table_id_,
|
||||
task_id_);
|
||||
task_id_,
|
||||
calc_items_);
|
||||
|
||||
bool ObCalcColumnChecksumRequestArg::is_valid() const
|
||||
{
|
||||
return OB_INVALID_ID != tenant_id_ && ls_id_.is_valid() && tablet_id_.is_valid()
|
||||
&& OB_INVALID_ID != target_table_id_ && OB_INVALID_VERSION != schema_version_
|
||||
&& OB_INVALID_ID != execution_id_ && OB_INVALID_VERSION != snapshot_version_ && OB_INVALID_ID != source_table_id_
|
||||
&& OB_INVALID_ID != calc_table_id_ && task_id_ > 0;
|
||||
bool bret = OB_INVALID_ID != tenant_id_ && OB_INVALID_ID != target_table_id_
|
||||
&& OB_INVALID_VERSION != schema_version_ && OB_INVALID_ID != execution_id_
|
||||
&& OB_INVALID_VERSION != snapshot_version_ && OB_INVALID_ID != source_table_id_ && task_id_ > 0;
|
||||
for (int64_t i = 0; bret && i < calc_items_.count(); ++i) {
|
||||
bret = calc_items_.at(i).is_valid();
|
||||
}
|
||||
return bret;
|
||||
}
|
||||
|
||||
void ObCalcColumnChecksumRequestArg::reset()
|
||||
{
|
||||
tenant_id_ = OB_INVALID_ID;
|
||||
ls_id_.reset();
|
||||
tablet_id_.reset();
|
||||
target_table_id_ = OB_INVALID_ID;
|
||||
schema_version_ = OB_INVALID_VERSION;
|
||||
execution_id_ = OB_INVALID_ID;
|
||||
snapshot_version_ = OB_INVALID_VERSION;
|
||||
source_table_id_ = OB_INVALID_ID;
|
||||
calc_table_id_ = OB_INVALID_ID;
|
||||
task_id_ = 0;
|
||||
calc_items_.reset();
|
||||
}
|
||||
|
||||
int ObCalcColumnChecksumRequestArg::assign(const ObCalcColumnChecksumRequestArg &other)
|
||||
{
|
||||
int ret = common::OB_SUCCESS;
|
||||
tenant_id_ = other.tenant_id_;
|
||||
target_table_id_ = other.target_table_id_;
|
||||
schema_version_ = other.schema_version_;
|
||||
execution_id_ = other.execution_id_;
|
||||
snapshot_version_ = other.snapshot_version_;
|
||||
source_table_id_ = other.source_table_id_;
|
||||
task_id_ = other.task_id_;
|
||||
if (OB_FAIL(calc_items_.assign(other.calc_items_))) {
|
||||
LOG_WARN("assign calc_items failed", K(ret), K(other.calc_items_.count()));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObCalcColumnChecksumRequestRes, ret_codes_);
|
||||
|
||||
OB_SERIALIZE_MEMBER(
|
||||
ObCalcColumnChecksumResponseArg,
|
||||
tablet_id_,
|
||||
@ -3041,22 +3081,56 @@ DEF_TO_STRING(ObSwitchLeaderArg)
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObSwitchLeaderArg, ls_id_, role_, tenant_id_, dest_server_);
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObCheckSchemaVersionElapsedArg, tenant_id_, ls_id_, data_tablet_id_, schema_version_, need_wait_trans_end_);
|
||||
OB_SERIALIZE_MEMBER(ObLSTabletPair, ls_id_, tablet_id_);
|
||||
OB_SERIALIZE_MEMBER(ObCheckSchemaVersionElapsedArg, tenant_id_, schema_version_, need_wait_trans_end_, tablets_);
|
||||
|
||||
bool ObCheckSchemaVersionElapsedArg::is_valid() const
|
||||
{
|
||||
bool bret = OB_INVALID_ID != tenant_id_ && schema_version_ > 0 && !tablets_.empty();
|
||||
for (int64_t i = 0; bret && i < tablets_.count(); ++i) {
|
||||
bret = tablets_.at(i).is_valid();
|
||||
}
|
||||
return bret;
|
||||
}
|
||||
|
||||
void ObCheckSchemaVersionElapsedArg::reuse()
|
||||
{
|
||||
tenant_id_ = OB_INVALID_ID;
|
||||
ls_id_.reset();
|
||||
data_tablet_id_.reset();
|
||||
schema_version_ = 0;
|
||||
need_wait_trans_end_ = true;
|
||||
tablets_.reuse();
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObCheckModifyTimeElapsedArg, tenant_id_, ls_id_, tablet_id_, sstable_exist_ts_);
|
||||
bool ObCheckModifyTimeElapsedArg::is_valid() const
|
||||
{
|
||||
bool bret = OB_INVALID_ID != tenant_id_ && sstable_exist_ts_ > 0;
|
||||
for (int64_t i = 0; bret && i < tablets_.count(); ++i) {
|
||||
bret = tablets_.at(i).is_valid();
|
||||
}
|
||||
return bret;
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObCheckSchemaVersionElapsedResult, snapshot_);
|
||||
void ObCheckModifyTimeElapsedArg::reuse()
|
||||
{
|
||||
tenant_id_ = OB_INVALID_ID;
|
||||
sstable_exist_ts_ = 0;
|
||||
tablets_.reuse();
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObCheckModifyTimeElapsedArg, tenant_id_, sstable_exist_ts_, tablets_);
|
||||
|
||||
bool ObCheckSchemaVersionElapsedResult::is_valid() const
|
||||
{
|
||||
bool bret = !results_.empty();
|
||||
for (int64_t i = 0; bret && i < results_.count(); ++i) {
|
||||
bret = (common::OB_INVALID_TIMESTAMP != results_.at(i).snapshot_);
|
||||
}
|
||||
return bret;
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObCheckTransElapsedResult, ret_code_, snapshot_, pending_tx_id_);
|
||||
OB_SERIALIZE_MEMBER(ObCheckSchemaVersionElapsedResult, results_);
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObCheckModifyTimeElapsedResult, snapshot_);
|
||||
|
||||
OB_SERIALIZE_MEMBER(CandidateStatus, candidate_status_);
|
||||
|
||||
|
@ -2857,7 +2857,7 @@ private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObGetMinSSTableSchemaVersionRes);
|
||||
};
|
||||
|
||||
struct ObCalcColumnChecksumRequestArg
|
||||
struct ObCalcColumnChecksumRequestArg final
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
@ -2865,38 +2865,43 @@ public:
|
||||
~ObCalcColumnChecksumRequestArg() = default;
|
||||
bool is_valid() const;
|
||||
void reset();
|
||||
int assign(const ObCalcColumnChecksumRequestArg &other) {
|
||||
int ret = common::OB_SUCCESS;
|
||||
tenant_id_ = other.tenant_id_;
|
||||
ls_id_ = other.ls_id_;
|
||||
tablet_id_ = other.tablet_id_;
|
||||
target_table_id_ = other.target_table_id_;
|
||||
schema_version_ = other.schema_version_;
|
||||
execution_id_ = other.execution_id_;
|
||||
snapshot_version_ = other.snapshot_version_;
|
||||
source_table_id_ = other.source_table_id_;
|
||||
calc_table_id_ = other.calc_table_id_;
|
||||
task_id_ = other.task_id_;
|
||||
return ret;
|
||||
}
|
||||
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(tablet_id),
|
||||
K_(target_table_id), K_(schema_version), K_(execution_id),
|
||||
K_(snapshot_version), K_(source_table_id), K_(calc_table_id));
|
||||
int assign(const ObCalcColumnChecksumRequestArg &other);
|
||||
TO_STRING_KV(K_(tenant_id), K_(target_table_id), K_(schema_version), K_(execution_id),
|
||||
K_(snapshot_version), K_(source_table_id), K_(task_id), K_(calc_items));
|
||||
struct SingleItem final
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
SingleItem() { reset(); }
|
||||
~SingleItem() = default;
|
||||
bool is_valid() const;
|
||||
void reset();
|
||||
int assign(const SingleItem &other);
|
||||
TO_STRING_KV(K_(ls_id), K_(tablet_id), K_(calc_table_id));
|
||||
share::ObLSID ls_id_;
|
||||
common::ObTabletID tablet_id_;
|
||||
int64_t calc_table_id_;
|
||||
};
|
||||
public:
|
||||
uint64_t tenant_id_;
|
||||
share::ObLSID ls_id_;
|
||||
common::ObTabletID tablet_id_;
|
||||
uint64_t target_table_id_;
|
||||
int64_t schema_version_;
|
||||
uint64_t execution_id_;
|
||||
int64_t snapshot_version_;
|
||||
int64_t source_table_id_;
|
||||
int64_t calc_table_id_;
|
||||
int64_t task_id_;
|
||||
common::ObSEArray<SingleItem, 10> calc_items_;
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObCalcColumnChecksumRequestArg);
|
||||
};
|
||||
|
||||
struct ObCalcColumnChecksumRequestRes final
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
common::ObSEArray<int, 10> ret_codes_;
|
||||
};
|
||||
|
||||
struct ObCalcColumnChecksumResponseArg
|
||||
{
|
||||
OB_UNIS_VERSION(2);
|
||||
@ -3604,66 +3609,69 @@ public:
|
||||
bool force_refresh_;
|
||||
};
|
||||
|
||||
struct ObCheckSchemaVersionElapsedArg
|
||||
struct ObLSTabletPair final
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
bool is_valid() const { return ls_id_.is_valid() && tablet_id_.is_valid(); }
|
||||
TO_STRING_KV(K_(ls_id), K_(tablet_id));
|
||||
share::ObLSID ls_id_;
|
||||
common::ObTabletID tablet_id_;
|
||||
};
|
||||
|
||||
struct ObCheckSchemaVersionElapsedArg final
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObCheckSchemaVersionElapsedArg()
|
||||
: tenant_id_(), ls_id_(), data_tablet_id_(0), schema_version_(0), need_wait_trans_end_(true)
|
||||
: tenant_id_(), schema_version_(0), need_wait_trans_end_(true)
|
||||
{}
|
||||
bool is_valid() const {
|
||||
return OB_INVALID_ID != tenant_id_ && ls_id_.is_valid() && data_tablet_id_.is_valid() && schema_version_ > 0;
|
||||
}
|
||||
bool is_valid() const;
|
||||
void reuse();
|
||||
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(data_tablet_id), K_(schema_version), K_(need_wait_trans_end));
|
||||
TO_STRING_KV(K_(tenant_id), K_(schema_version), K_(need_wait_trans_end), K_(tablets));
|
||||
|
||||
uint64_t tenant_id_;
|
||||
share::ObLSID ls_id_;
|
||||
common::ObTabletID data_tablet_id_;
|
||||
int64_t schema_version_;
|
||||
bool need_wait_trans_end_;
|
||||
ObSEArray<ObLSTabletPair, 10> tablets_;
|
||||
};
|
||||
|
||||
struct ObCheckModifyTimeElapsedArg
|
||||
struct ObCheckModifyTimeElapsedArg final
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObCheckModifyTimeElapsedArg() : tenant_id_(OB_INVALID_ID), ls_id_(), tablet_id_(), sstable_exist_ts_(0) {}
|
||||
bool is_valid() const {
|
||||
return OB_INVALID_ID != tenant_id_ && ls_id_.is_valid() && tablet_id_.is_valid() && sstable_exist_ts_ > 0;
|
||||
}
|
||||
void reuse() {tenant_id_ = OB_INVALID_ID; ls_id_.reset(); tablet_id_.reset(); sstable_exist_ts_ = 0;}
|
||||
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(tablet_id), K_(sstable_exist_ts));
|
||||
ObCheckModifyTimeElapsedArg() : tenant_id_(OB_INVALID_ID), sstable_exist_ts_(0) {}
|
||||
bool is_valid() const;
|
||||
void reuse();
|
||||
TO_STRING_KV(K_(tenant_id), K_(sstable_exist_ts), K_(tablets));
|
||||
uint64_t tenant_id_;
|
||||
share::ObLSID ls_id_;
|
||||
common::ObTabletID tablet_id_;
|
||||
int64_t sstable_exist_ts_;
|
||||
ObSEArray<ObLSTabletPair, 10> tablets_;
|
||||
};
|
||||
|
||||
struct ObCheckTransElapsedResult final
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObCheckTransElapsedResult() : ret_code_(common::OB_SUCCESS), snapshot_(common::OB_INVALID_TIMESTAMP) {}
|
||||
TO_STRING_KV(K_(ret_code), K_(snapshot), K_(pending_tx_id));
|
||||
int ret_code_;
|
||||
int64_t snapshot_;
|
||||
transaction::ObTransID pending_tx_id_;
|
||||
};
|
||||
|
||||
struct ObCheckSchemaVersionElapsedResult
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObCheckSchemaVersionElapsedResult()
|
||||
: snapshot_(common::OB_INVALID_TIMESTAMP), pending_tx_id_() {}
|
||||
bool is_valid() const { return snapshot_ != common::OB_INVALID_TIMESTAMP; }
|
||||
void reuse() { snapshot_ = common::OB_INVALID_TIMESTAMP; pending_tx_id_.reset(); }
|
||||
TO_STRING_KV(K_(snapshot), K_(pending_tx_id));
|
||||
int64_t snapshot_;
|
||||
transaction::ObTransID pending_tx_id_;
|
||||
ObCheckSchemaVersionElapsedResult() {}
|
||||
bool is_valid() const;
|
||||
void reuse() { results_.reuse(); }
|
||||
TO_STRING_KV(K_(results));
|
||||
ObSEArray<ObCheckTransElapsedResult, 10> results_;
|
||||
};
|
||||
|
||||
struct ObCheckModifyTimeElapsedResult
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObCheckModifyTimeElapsedResult()
|
||||
: snapshot_(common::OB_INVALID_TIMESTAMP), pending_tx_id_() {}
|
||||
bool is_valid() const { return snapshot_ != common::OB_INVALID_TIMESTAMP; }
|
||||
void reuse() { snapshot_ = common::OB_INVALID_TIMESTAMP; pending_tx_id_.reset(); }
|
||||
TO_STRING_KV(K_(snapshot), K_(pending_tx_id));
|
||||
int64_t snapshot_;
|
||||
transaction::ObTransID pending_tx_id_;
|
||||
};
|
||||
typedef ObCheckSchemaVersionElapsedResult ObCheckModifyTimeElapsedResult;
|
||||
|
||||
class CandidateStatus
|
||||
{
|
||||
|
@ -117,7 +117,7 @@ public:
|
||||
RPC_S(PR5 force_switch_ilog_file, OB_FORCE_SWITCH_ILOG_FILE, (ObForceSwitchILogFileArg));
|
||||
RPC_S(PR5 force_set_all_as_single_replica, OB_FORCE_SET_ALL_AS_SINGLE_REPLICA, (ObForceSetAllAsSingleReplicaArg));
|
||||
RPC_S(PR5 force_set_server_list, OB_FORCE_SET_SERVER_LIST, (ObForceSetServerListArg));
|
||||
RPC_S(PR5 calc_column_checksum_request, OB_CALC_COLUMN_CHECKSUM_REQUEST, (ObCalcColumnChecksumRequestArg));
|
||||
RPC_S(PR5 calc_column_checksum_request, OB_CALC_COLUMN_CHECKSUM_REQUEST, (ObCalcColumnChecksumRequestArg), obrpc::ObCalcColumnChecksumRequestRes);
|
||||
RPC_AP(PR5 build_ddl_single_replica_request, OB_DDL_BUILD_SINGLE_REPLICA_REQUEST, (obrpc::ObDDLBuildSingleReplicaRequestArg), obrpc::ObDDLBuildSingleReplicaRequestResult);
|
||||
RPC_S(PR5 fetch_tablet_autoinc_seq_cache, OB_FETCH_TABLET_AUTOINC_SEQ_CACHE, (obrpc::ObFetchTabletSeqArg), obrpc::ObFetchTabletSeqRes);
|
||||
RPC_AP(PR5 batch_get_tablet_autoinc_seq, OB_BATCH_GET_TABLET_AUTOINC_SEQ, (obrpc::ObBatchGetTabletAutoincSeqArg), obrpc::ObBatchGetTabletAutoincSeqRes);
|
||||
|
@ -13,9 +13,9 @@
|
||||
#define USING_LOG_PREFIX SQL_ENG
|
||||
|
||||
#include "sql/engine/expr/ob_expr_to_outfile_row.h"
|
||||
#include <string.h>
|
||||
#include "lib/oblog/ob_log.h"
|
||||
#include "objit/common/ob_item_type.h"
|
||||
#include <string.h>
|
||||
#include "lib/oblog/ob_log.h"
|
||||
#include "objit/common/ob_item_type.h"
|
||||
#include "sql/session/ob_sql_session_info.h"
|
||||
#include "sql/engine/ob_exec_context.h"
|
||||
|
||||
@ -120,9 +120,9 @@ int ObExprToOutfileRow::calc_outfile_info(const ObExpr &expr,
|
||||
ObSQLSessionInfo *session = ctx.exec_ctx_.get_my_session();
|
||||
if (OB_ISNULL(session)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("session is null", K(ret));
|
||||
} else if (OB_ISNULL(out_info.print_params_.tz_info_ = session->get_timezone_info())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("session is null", K(ret));
|
||||
} else if (OB_ISNULL(out_info.print_params_.tz_info_ = session->get_timezone_info())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to get timezone info", K(ret));
|
||||
} else {
|
||||
out_info.print_params_.use_memcpy_ = true;
|
||||
@ -130,9 +130,9 @@ int ObExprToOutfileRow::calc_outfile_info(const ObExpr &expr,
|
||||
out_info.is_optional_ = expr.locate_param_datum(ctx, PARAM_OPTIONAL).get_bool();
|
||||
}
|
||||
|
||||
for (int i = 0; OB_SUCC(ret) && i < PARAM_SELECT_ITEM; ++i) {
|
||||
OZ(expr.locate_param_datum(ctx, i).to_obj(objs_array[i], expr.args_[i]->obj_meta_,
|
||||
expr.args_[i]->obj_datum_map_));
|
||||
for (int i = 0; OB_SUCC(ret) && i < PARAM_SELECT_ITEM; ++i) {
|
||||
OZ(expr.locate_param_datum(ctx, i).to_obj(objs_array[i], expr.args_[i]->obj_meta_,
|
||||
expr.args_[i]->obj_datum_map_));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
out_info.field_ = objs_array[PARAM_FIELD];
|
||||
|
Loading…
x
Reference in New Issue
Block a user