[BUG FIX]storage_val is not equal with sql_val, maybe catch a bug
This commit is contained in:
committed by
LINxiansheng
parent
f39690b55b
commit
66238908e3
@ -980,6 +980,17 @@ int ObParallelDfoScheduler::dispatch_sqc(
|
|||||||
} else {
|
} else {
|
||||||
LOG_WARN("fail to wait all async init sqc", K(ret), K(dfo), K(exec_ctx));
|
LOG_WARN("fail to wait all async init sqc", K(ret), K(dfo), K(exec_ctx));
|
||||||
}
|
}
|
||||||
|
// 对于正确process的sqc, 是需要sqc report的, 否则在后续的wait_running_dfo逻辑中不会等待此sqc结束
|
||||||
|
const ObSqcAsyncCB *cb = NULL;
|
||||||
|
const ObArray<ObSqcAsyncCB *> &callbacks = proxy.get_callbacks();
|
||||||
|
for (int i = 0; i < callbacks.count(); ++i) {
|
||||||
|
cb = callbacks.at(i);
|
||||||
|
if (OB_NOT_NULL(cb) && cb->is_processed() && OB_SUCCESS == cb->get_ret_code().rcode_ &&
|
||||||
|
OB_SUCCESS == cb->get_result().rc_) {
|
||||||
|
ObPxSqcMeta &sqc = *sqcs.at(i);
|
||||||
|
sqc.set_need_report(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
const ObArray<ObSqcAsyncCB*>& callbacks = proxy.get_callbacks();
|
const ObArray<ObSqcAsyncCB*>& callbacks = proxy.get_callbacks();
|
||||||
ARRAY_FOREACH(callbacks, idx) {
|
ARRAY_FOREACH(callbacks, idx) {
|
||||||
|
|||||||
@ -206,19 +206,6 @@ int ObPxSqcAsyncProxy::wait_all()
|
|||||||
LOG_WARN("call rpc failed", K(ret), K(callback.get_ret_code()));
|
LOG_WARN("call rpc failed", K(ret), K(callback.get_ret_code()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (callback.need_retry() && OB_SUCC(ret)) {
|
|
||||||
// need retry the task.
|
|
||||||
// reset: visit, eturn_cb_count_
|
|
||||||
callback.set_visited(false);
|
|
||||||
return_cb_count_--;
|
|
||||||
if (check_for_retry(callback)) {
|
|
||||||
callback.reset();
|
|
||||||
if (OB_FAIL(launch_one_rpc_request(idx, &callback))) {
|
|
||||||
LOG_WARN("retrying to send sqc rpc failed");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -244,18 +231,6 @@ void ObPxSqcAsyncProxy::destroy()
|
|||||||
results_.reuse();
|
results_.reuse();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ObPxSqcAsyncProxy::check_for_retry(ObSqcAsyncCB& callback)
|
|
||||||
{
|
|
||||||
bool retry = false;
|
|
||||||
int64_t timeout_us = phy_plan_ctx_->get_timeout_timestamp() - ObTimeUtility::current_time();
|
|
||||||
int64_t send_duration = ObTimeUtility::current_time() - callback.get_send_ts();
|
|
||||||
// avoid retry too mutch
|
|
||||||
if (timeout_us >= 100 * 1000L && send_duration >= 10 * 1000L) {
|
|
||||||
retry = true;
|
|
||||||
}
|
|
||||||
return retry;
|
|
||||||
}
|
|
||||||
|
|
||||||
void ObPxSqcAsyncProxy::fail_process()
|
void ObPxSqcAsyncProxy::fail_process()
|
||||||
{
|
{
|
||||||
LOG_WARN("async sqc fails, process the callbacks that have not yet got results",
|
LOG_WARN("async sqc fails, process the callbacks that have not yet got results",
|
||||||
|
|||||||
@ -57,7 +57,6 @@ public:
|
|||||||
is_timeout_ = false;
|
is_timeout_ = false;
|
||||||
is_invalid_ = false;
|
is_invalid_ = false;
|
||||||
is_visited_ = false;
|
is_visited_ = false;
|
||||||
need_retry_ = false;
|
|
||||||
}
|
}
|
||||||
void set_visited(bool value)
|
void set_visited(bool value)
|
||||||
{
|
{
|
||||||
@ -83,14 +82,6 @@ public:
|
|||||||
{
|
{
|
||||||
return is_processed_;
|
return is_processed_;
|
||||||
}
|
}
|
||||||
void set_retry(bool value)
|
|
||||||
{
|
|
||||||
need_retry_ = value;
|
|
||||||
}
|
|
||||||
bool need_retry() const
|
|
||||||
{
|
|
||||||
return need_retry_;
|
|
||||||
}
|
|
||||||
const obrpc::ObRpcResultCode get_ret_code() const
|
const obrpc::ObRpcResultCode get_ret_code() const
|
||||||
{
|
{
|
||||||
return rcode_;
|
return rcode_;
|
||||||
@ -113,7 +104,6 @@ private:
|
|||||||
bool is_timeout_;
|
bool is_timeout_;
|
||||||
bool is_invalid_;
|
bool is_invalid_;
|
||||||
bool is_visited_;
|
bool is_visited_;
|
||||||
bool need_retry_;
|
|
||||||
ObThreadCond &cond_;
|
ObThreadCond &cond_;
|
||||||
ObCurTraceId::TraceId trace_id_;
|
ObCurTraceId::TraceId trace_id_;
|
||||||
};
|
};
|
||||||
@ -160,7 +150,6 @@ private:
|
|||||||
void destroy();
|
void destroy();
|
||||||
// asynchronously request a single sqc rpc task
|
// asynchronously request a single sqc rpc task
|
||||||
int launch_one_rpc_request(int64_t idx, ObSqcAsyncCB* cb);
|
int launch_one_rpc_request(int64_t idx, ObSqcAsyncCB* cb);
|
||||||
bool check_for_retry(ObSqcAsyncCB& callback);
|
|
||||||
void fail_process();
|
void fail_process();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|||||||
Reference in New Issue
Block a user