When scheduling with single DOP, if an error occurs while writing interm results, report the error immediately

This commit is contained in:
qingsuijiu 2024-03-07 04:51:17 +00:00 committed by ob-robot
parent 92fca7def1
commit 2b8c8e08a7
3 changed files with 46 additions and 6 deletions

View File

@ -344,6 +344,8 @@ int ObDTLIntermResultManager::atomic_append_block(ObDTLIntermResultKey &key, ObA
int ret = OB_SUCCESS;
if (OB_FAIL(map_.atomic_refactored(key, call))) {
LOG_WARN("fail to get row store in result manager", K(ret));
} else if (OB_FAIL(call.ret_)) {
LOG_WARN("ObAtomicAppendBlockCall fail", K(ret), K(call.ret_));
} else {
LOG_DEBUG("debug append block to interm result info", K(key));
}
@ -355,6 +357,8 @@ int ObDTLIntermResultManager::atomic_append_part_block(ObDTLIntermResultKey &key
int ret = OB_SUCCESS;
if (OB_FAIL(map_.atomic_refactored(key, call))) {
LOG_WARN("fail to get row store in result manager", K(ret));
} else if (OB_FAIL(call.ret_)) {
LOG_WARN("ObAtomicAppendPartBlockCall fail", K(ret), K(call.ret_));
} else {
LOG_DEBUG("debug append part block to interm result info", K(key));
}

View File

@ -66,6 +66,22 @@ int ObDtlRpcChannel::SendMsgCB::process()
const ObDtlRpcDataResponse &resp = result_;
// if request queue is full or serialize faild, then rcode is set, and rpc process is not called
int tmp_ret = OB_SUCCESS != rcode_.rcode_ ? rcode_.rcode_ : resp.recode_;
if (tmp_ret != OB_SUCCESS) {
int ret = OB_SUCCESS;
LOG_DEBUG("rpc channel callback receive error ret", K(rcode_.rcode_ ), K(resp.recode_));
ObDtlChannel *chan = nullptr;
if (OB_FAIL(DTL.get_channel(channel_id_, chan))) {
LOG_WARN("SendMsgCB get channel failed", K(ret));
} else if (OB_ISNULL(chan)) {
LOG_WARN("chan is nullptr", K(ret));
} else {
ObDtlRpcChannel *rpc_chan = reinterpret_cast<ObDtlRpcChannel*>(chan);
rpc_chan->set_cb_ret(tmp_ret);
if (OB_FAIL(DTL.release_channel(rpc_chan))) {
LOG_WARN("release channel failed", K(ret));
}
}
}
int ret = response_.on_finish(resp.is_block_, tmp_ret);
if (OB_FAIL(ret)) {
LOG_WARN("set finish failed", K_(trace_id), K(ret));
@ -79,7 +95,7 @@ rpc::frame::ObReqTransport::AsyncCB *ObDtlRpcChannel::SendMsgCB::clone(
SendMsgCB *cb = NULL;
void *mem = alloc(sizeof(*this));
if (NULL != mem) {
cb = new(mem)SendMsgCB(response_, trace_id_, timeout_ts_);
cb = new(mem)SendMsgCB(response_, trace_id_, timeout_ts_, channel_id_);
}
return cb;
}
@ -177,7 +193,7 @@ ObDtlRpcChannel::ObDtlRpcChannel(
const uint64_t id,
const ObAddr &peer,
DtlChannelType type)
: ObDtlBasicChannel(tenant_id, id, peer, type), recv_sqc_fin_res_(false)
: ObDtlBasicChannel(tenant_id, id, peer, type), recv_sqc_fin_res_(false), cb_ret_(OB_SUCCESS)
{}
ObDtlRpcChannel::ObDtlRpcChannel(
@ -186,7 +202,7 @@ ObDtlRpcChannel::ObDtlRpcChannel(
const ObAddr &peer,
const int64_t hash_val,
DtlChannelType type)
: ObDtlBasicChannel(tenant_id, id, peer, hash_val, type), recv_sqc_fin_res_(false)
: ObDtlBasicChannel(tenant_id, id, peer, hash_val, type), recv_sqc_fin_res_(false), cb_ret_(OB_SUCCESS)
{}
ObDtlRpcChannel::~ObDtlRpcChannel()
@ -314,7 +330,7 @@ int ObDtlRpcChannel::send_message(ObDtlLinkedBuffer *&buf)
// The peer may not setup when the first message arrive,
// we wait first message return and retry until peer setup.
int64_t timeout_us = buf->timeout_ts() - ObTimeUtility::current_time();
SendMsgCB cb(msg_response_, *cur_trace_id, buf->timeout_ts());
SendMsgCB cb(msg_response_, *cur_trace_id, buf->timeout_ts(), get_id());
if (timeout_us <= 0) {
ret = OB_TIMEOUT;
LOG_WARN("send dtl message timeout", K(ret), K(peer_),
@ -350,6 +366,18 @@ int ObDtlRpcChannel::send_message(ObDtlLinkedBuffer *&buf)
return ret;
}
int ObDtlRpcChannel::send(const ObDtlMsg &msg, int64_t timeout_ts,
ObEvalCtx *eval_ctx, bool is_eof)
{
int ret = OB_SUCCESS;
if (msg.is_data_msg() && OB_FAIL(cb_ret_)) {
LOG_WARN("ObDtlRpcChannel callback failed", K(cb_ret_));
} else if (OB_FAIL(ObDtlBasicChannel::send(msg, timeout_ts, eval_ctx, is_eof))) {
LOG_WARN("ObDtlBasicChannel send failed", K(ret));
}
return ret;
}
} // dtl
} // sql
} // oceanbase

View File

@ -48,9 +48,11 @@ class ObDtlRpcChannel
class SendMsgCB : public obrpc::ObDtlRpcProxy::AsyncCB<obrpc::OB_DTL_SEND>
{
public:
explicit SendMsgCB(SendMsgResponse &response, const common::ObCurTraceId::TraceId trace_id, const int64_t timeout_ts)
explicit SendMsgCB(SendMsgResponse &response, const common::ObCurTraceId::TraceId trace_id,
const int64_t timeout_ts, const uint64_t channel_id)
: response_(response),
timeout_ts_(timeout_ts)
timeout_ts_(timeout_ts),
channel_id_(channel_id)
{
trace_id_.set(trace_id);
}
@ -63,6 +65,7 @@ class ObDtlRpcChannel
SendMsgResponse &response_;
common::ObCurTraceId::TraceId trace_id_;
int64_t timeout_ts_;
uint64_t channel_id_;
};
class SendBCMsgCB : public obrpc::ObDtlRpcProxy::AsyncCB<obrpc::OB_DTL_BC_SEND>
@ -99,9 +102,14 @@ public:
virtual int feedup(ObDtlLinkedBuffer *&buffer) override;
virtual int send_message(ObDtlLinkedBuffer *&buf);
virtual int send(const ObDtlMsg &msg, int64_t timeout_ts,
ObEvalCtx *eval_ctx = nullptr, bool is_eof = false) override;
bool recv_sqc_fin_res() { return recv_sqc_fin_res_; }
void set_cb_ret(int cb_ret) { cb_ret_ = cb_ret; }
private:
bool recv_sqc_fin_res_;
int cb_ret_;
};
} // dtl