diff --git a/src/logservice/cdcservice/ob_cdc_fetcher.cpp b/src/logservice/cdcservice/ob_cdc_fetcher.cpp index 30b2b1d3cf..e30556c27f 100644 --- a/src/logservice/cdcservice/ob_cdc_fetcher.cpp +++ b/src/logservice/cdcservice/ob_cdc_fetcher.cpp @@ -27,6 +27,7 @@ using namespace oceanbase::palf; namespace cdc { +ERRSIM_POINT_DEF(ERRSIM_FETCH_LOG_RESP_ERROR); ObCdcFetcher::ObCdcFetcher() : is_inited_(false), @@ -124,8 +125,9 @@ int ObCdcFetcher::fetch_log(const ObCdcLSFetchLogReq &req, } if (OB_NOT_NULL(ls_ctx)) { - if (OB_FAIL(host_->revert_client_ls_ctx(ls_ctx))) { - LOG_WARN("failed to revert client ls ctx", K(req)); + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(host_->revert_client_ls_ctx(ls_ctx))) { + LOG_WARN_RET(tmp_ret, "failed to revert client ls ctx", K(req)); } else { ls_ctx = nullptr; } @@ -326,6 +328,10 @@ int ObCdcFetcher::do_fetch_log_(const ObCdcLSFetchLogReq &req, if (OB_FAIL(ret)) { LOG_WARN("fetch log fail", KR(ret), "CDC_Connector_PID", req.get_client_pid(), K(req), K(resp)); + } else if (ERRSIM_FETCH_LOG_RESP_ERROR) { + ret = ERRSIM_FETCH_LOG_RESP_ERROR; + LOG_WARN("ERRSIM fetch log fail", KR(ret), "CDC_Connector_PID", req.get_client_pid(), + K(req), K(resp)); } LOG_TRACE("do_fetch_log done", KR(ret), K(frt), K(req), K(resp)); @@ -1211,6 +1217,13 @@ int ObCdcFetcher::do_fetch_raw_log_(const obrpc::ObCdcFetchRawLogReq &req, K(need_retry), K(fetch_log_succ)); } + if (OB_FAIL(ret)) { + LOG_WARN("fetch raw log fail", K(req), K(resp)); + } else if (ERRSIM_FETCH_LOG_RESP_ERROR) { + ret = ERRSIM_FETCH_LOG_RESP_ERROR; + LOG_WARN("ERRSIM fetch raw log fail", K(req), K(resp)); + } + return ret; } diff --git a/src/logservice/logfetcher/ob_log_fetch_log_rpc.cpp b/src/logservice/logfetcher/ob_log_fetch_log_rpc.cpp index ac8caafaf6..395acce96d 100644 --- a/src/logservice/logfetcher/ob_log_fetch_log_rpc.cpp +++ b/src/logservice/logfetcher/ob_log_fetch_log_rpc.cpp @@ -549,7 +549,6 @@ void FetchLogARpc::stop() } ERRSIM_POINT_DEF(ERRSIM_FETCH_LOG_TIME_OUT); -ERRSIM_POINT_DEF(ERRSIM_FETCH_LOG_RESP_ERROR); int FetchLogARpc::next_result(FetchLogARpcResult *&result, bool &rpc_is_flying) { int ret = OB_SUCCESS; @@ -587,7 +586,7 @@ int FetchLogARpc::next_result(FetchLogARpcResult *&result, bool &rpc_is_flying) } } } - if (OB_SUCC(ret) && (ERRSIM_FETCH_LOG_TIME_OUT || ERRSIM_FETCH_LOG_RESP_ERROR)) { + if (OB_SUCC(ret) && (ERRSIM_FETCH_LOG_TIME_OUT)) { process_errsim_code_(result); LOG_TRACE("process errsim code"); } @@ -902,11 +901,6 @@ void FetchLogARpc::process_errsim_code_(FetchLogARpcResult *result) result->trace_id_ = *ObCurTraceId::get_trace_id(); LOG_TRACE("errsim fetch log time out", K(result)); } - if (ERRSIM_FETCH_LOG_RESP_ERROR) { - result->resp_.set_err(ERRSIM_FETCH_LOG_RESP_ERROR); - result->trace_id_ = *ObCurTraceId::get_trace_id(); - LOG_TRACE("errsim fetch log resp error", K(result)); - } } int FetchLogARpc::launch_async_rpc_(RpcRequest &rpc_req,