Remove the duplicated logic for handling remote side errors in the RPC channel.
This commit is contained in:
		@ -346,7 +346,7 @@ int ObDTLIntermResultManager::atomic_append_block(ObDTLIntermResultKey &key, ObA
 | 
			
		||||
  if (OB_FAIL(map_.atomic_refactored(key, call))) {
 | 
			
		||||
    LOG_WARN("fail to get row store in result manager", K(ret));
 | 
			
		||||
  } else if (OB_FAIL(call.ret_)) {
 | 
			
		||||
    LOG_WARN("ObAtomicAppendBlockCall fail", K(ret), K(call.ret_));
 | 
			
		||||
    LOG_WARN("ObAtomicAppendBlockCall fail", K(ret));
 | 
			
		||||
  } else {
 | 
			
		||||
    LOG_DEBUG("debug append block to interm result info", K(key));
 | 
			
		||||
  }
 | 
			
		||||
@ -359,7 +359,7 @@ int ObDTLIntermResultManager::atomic_append_part_block(ObDTLIntermResultKey &key
 | 
			
		||||
  if (OB_FAIL(map_.atomic_refactored(key, call))) {
 | 
			
		||||
    LOG_WARN("fail to get row store in result manager", K(ret));
 | 
			
		||||
  } else if (OB_FAIL(call.ret_)) {
 | 
			
		||||
    LOG_WARN("ObAtomicAppendPartBlockCall fail", K(ret), K(call.ret_));
 | 
			
		||||
    LOG_WARN("ObAtomicAppendPartBlockCall fail", K(ret));
 | 
			
		||||
  } else {
 | 
			
		||||
    LOG_DEBUG("debug append part block to interm result info", K(key));
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
@ -66,22 +66,6 @@ int ObDtlRpcChannel::SendMsgCB::process()
 | 
			
		||||
  const ObDtlRpcDataResponse &resp = result_;
 | 
			
		||||
  // if request queue is full or serialize faild, then rcode is set, and rpc process is not called
 | 
			
		||||
  int tmp_ret = OB_SUCCESS != rcode_.rcode_ ? rcode_.rcode_ : resp.recode_;
 | 
			
		||||
  if (tmp_ret != OB_SUCCESS) {
 | 
			
		||||
    int ret = OB_SUCCESS;
 | 
			
		||||
    LOG_DEBUG("rpc channel callback receive error ret", K(rcode_.rcode_ ), K(resp.recode_));
 | 
			
		||||
    ObDtlChannel *chan = nullptr;
 | 
			
		||||
    if (OB_FAIL(DTL.get_channel(channel_id_, chan))) {
 | 
			
		||||
      LOG_WARN("SendMsgCB get channel failed", K(ret));
 | 
			
		||||
    } else if (OB_ISNULL(chan)) {
 | 
			
		||||
      LOG_WARN("chan is nullptr", K(ret));
 | 
			
		||||
    } else {
 | 
			
		||||
      ObDtlRpcChannel *rpc_chan = reinterpret_cast<ObDtlRpcChannel*>(chan);
 | 
			
		||||
      rpc_chan->set_cb_ret(tmp_ret);
 | 
			
		||||
      if (OB_FAIL(DTL.release_channel(rpc_chan))) {
 | 
			
		||||
        LOG_WARN("release channel failed", K(ret));
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  int ret = response_.on_finish(resp.is_block_, tmp_ret);
 | 
			
		||||
  if (OB_FAIL(ret)) {
 | 
			
		||||
    LOG_WARN("set finish failed", K_(trace_id), K(ret));
 | 
			
		||||
@ -95,7 +79,7 @@ rpc::frame::ObReqTransport::AsyncCB *ObDtlRpcChannel::SendMsgCB::clone(
 | 
			
		||||
  SendMsgCB *cb = NULL;
 | 
			
		||||
  void *mem = alloc(sizeof(*this));
 | 
			
		||||
  if (NULL != mem) {
 | 
			
		||||
    cb = new(mem)SendMsgCB(response_, trace_id_, timeout_ts_, channel_id_);
 | 
			
		||||
    cb = new(mem)SendMsgCB(response_, trace_id_, timeout_ts_);
 | 
			
		||||
  }
 | 
			
		||||
  return cb;
 | 
			
		||||
}
 | 
			
		||||
@ -193,7 +177,7 @@ ObDtlRpcChannel::ObDtlRpcChannel(
 | 
			
		||||
    const uint64_t id,
 | 
			
		||||
    const ObAddr &peer,
 | 
			
		||||
    DtlChannelType type)
 | 
			
		||||
    : ObDtlBasicChannel(tenant_id, id, peer, type), recv_sqc_fin_res_(false), cb_ret_(OB_SUCCESS)
 | 
			
		||||
    : ObDtlBasicChannel(tenant_id, id, peer, type), recv_sqc_fin_res_(false)
 | 
			
		||||
{}
 | 
			
		||||
 | 
			
		||||
ObDtlRpcChannel::ObDtlRpcChannel(
 | 
			
		||||
@ -202,7 +186,7 @@ ObDtlRpcChannel::ObDtlRpcChannel(
 | 
			
		||||
    const ObAddr &peer,
 | 
			
		||||
    const int64_t hash_val,
 | 
			
		||||
    DtlChannelType type)
 | 
			
		||||
    : ObDtlBasicChannel(tenant_id, id, peer, hash_val, type), recv_sqc_fin_res_(false), cb_ret_(OB_SUCCESS)
 | 
			
		||||
    : ObDtlBasicChannel(tenant_id, id, peer, hash_val, type), recv_sqc_fin_res_(false)
 | 
			
		||||
{}
 | 
			
		||||
 | 
			
		||||
ObDtlRpcChannel::~ObDtlRpcChannel()
 | 
			
		||||
@ -330,7 +314,7 @@ int ObDtlRpcChannel::send_message(ObDtlLinkedBuffer *&buf)
 | 
			
		||||
    // The peer may not setup when the first message arrive,
 | 
			
		||||
    // we wait first message return and retry until peer setup.
 | 
			
		||||
    int64_t timeout_us = buf->timeout_ts() - ObTimeUtility::current_time();
 | 
			
		||||
    SendMsgCB cb(msg_response_, *cur_trace_id, buf->timeout_ts(), get_id());
 | 
			
		||||
    SendMsgCB cb(msg_response_, *cur_trace_id, buf->timeout_ts());
 | 
			
		||||
    if (timeout_us <= 0) {
 | 
			
		||||
      ret = OB_TIMEOUT;
 | 
			
		||||
      LOG_WARN("send dtl message timeout", K(ret), K(peer_),
 | 
			
		||||
@ -366,18 +350,6 @@ int ObDtlRpcChannel::send_message(ObDtlLinkedBuffer *&buf)
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObDtlRpcChannel::send(const ObDtlMsg &msg, int64_t timeout_ts,
 | 
			
		||||
    ObEvalCtx *eval_ctx, bool is_eof)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (msg.is_data_msg() && OB_FAIL(cb_ret_)) {
 | 
			
		||||
    LOG_WARN("ObDtlRpcChannel callback failed", K(cb_ret_));
 | 
			
		||||
  } else if (OB_FAIL(ObDtlBasicChannel::send(msg, timeout_ts, eval_ctx, is_eof))) {
 | 
			
		||||
    LOG_WARN("ObDtlBasicChannel send failed", K(ret));
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
}  // dtl
 | 
			
		||||
}  // sql
 | 
			
		||||
}  // oceanbase
 | 
			
		||||
 | 
			
		||||
@ -48,11 +48,9 @@ class ObDtlRpcChannel
 | 
			
		||||
  class SendMsgCB : public obrpc::ObDtlRpcProxy::AsyncCB<obrpc::OB_DTL_SEND>
 | 
			
		||||
  {
 | 
			
		||||
  public:
 | 
			
		||||
    explicit SendMsgCB(SendMsgResponse &response, const common::ObCurTraceId::TraceId trace_id,
 | 
			
		||||
                       const int64_t timeout_ts, const uint64_t channel_id)
 | 
			
		||||
    explicit SendMsgCB(SendMsgResponse &response, const common::ObCurTraceId::TraceId trace_id, const int64_t timeout_ts)
 | 
			
		||||
        : response_(response),
 | 
			
		||||
          timeout_ts_(timeout_ts),
 | 
			
		||||
          channel_id_(channel_id)
 | 
			
		||||
          timeout_ts_(timeout_ts)
 | 
			
		||||
    {
 | 
			
		||||
      trace_id_.set(trace_id);
 | 
			
		||||
    }
 | 
			
		||||
@ -65,7 +63,6 @@ class ObDtlRpcChannel
 | 
			
		||||
    SendMsgResponse &response_;
 | 
			
		||||
    common::ObCurTraceId::TraceId trace_id_;
 | 
			
		||||
    int64_t timeout_ts_;
 | 
			
		||||
    uint64_t channel_id_;
 | 
			
		||||
  };
 | 
			
		||||
 | 
			
		||||
  class SendBCMsgCB : public obrpc::ObDtlRpcProxy::AsyncCB<obrpc::OB_DTL_BC_SEND>
 | 
			
		||||
@ -102,14 +99,9 @@ public:
 | 
			
		||||
  virtual int feedup(ObDtlLinkedBuffer *&buffer) override;
 | 
			
		||||
  virtual int send_message(ObDtlLinkedBuffer *&buf);
 | 
			
		||||
 | 
			
		||||
  virtual int send(const ObDtlMsg &msg, int64_t timeout_ts,
 | 
			
		||||
      ObEvalCtx *eval_ctx = nullptr, bool is_eof = false) override;
 | 
			
		||||
 | 
			
		||||
  bool recv_sqc_fin_res() { return recv_sqc_fin_res_; }
 | 
			
		||||
  void set_cb_ret(int cb_ret) { cb_ret_ = cb_ret; }
 | 
			
		||||
private:
 | 
			
		||||
  bool recv_sqc_fin_res_;
 | 
			
		||||
  int cb_ret_;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
}  // dtl
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user