fix get_dh_msg state errors during reentry lead to hang/core
This commit is contained in:
@ -482,7 +482,7 @@ int ObMergeGroupByOp::process_parallel_rollup_key(ObRollupNDVInfo &ndv_info)
|
||||
piece.source_dfo_id_ = proxy.get_dfo_id();
|
||||
piece.target_dfo_id_ = proxy.get_dfo_id();
|
||||
piece.rollup_ndv_ = ndv_info;
|
||||
if (OB_FAIL(proxy.get_dh_msg(MY_SPEC.id_,
|
||||
if (OB_FAIL(proxy.get_dh_msg_sync(MY_SPEC.id_,
|
||||
dtl::DH_ROLLUP_KEY_WHOLE_MSG,
|
||||
piece,
|
||||
temp_whole_msg,
|
||||
|
||||
@ -324,11 +324,11 @@ int ObPDMLOpDataDriver::barrier(ObExecContext &ctx)
|
||||
piece.op_id_ = op_id_;
|
||||
piece.thread_id_ = GETTID();
|
||||
const ObBarrierWholeMsg *whole = nullptr;
|
||||
if (OB_FAIL(proxy.get_dh_msg(op_id_,
|
||||
dtl::DH_BARRIER_WHOLE_MSG,
|
||||
piece,
|
||||
whole,
|
||||
ctx.get_physical_plan_ctx()->get_timeout_timestamp()))) {
|
||||
if (OB_FAIL(proxy.get_dh_msg_sync(op_id_,
|
||||
dtl::DH_BARRIER_WHOLE_MSG,
|
||||
piece,
|
||||
whole,
|
||||
ctx.get_physical_plan_ctx()->get_timeout_timestamp()))) {
|
||||
LOG_WARN("fail get barrier msg", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
@ -298,6 +298,8 @@ int ObRDWFPieceMsgCtx::send_whole_msg(common::ObIArray<ObPxSqcMeta *> &sqcs)
|
||||
void ObRDWFPieceMsgCtx::reset_resource()
|
||||
{
|
||||
received_ = 0;
|
||||
infos_.reset();
|
||||
arena_alloc_.reset();
|
||||
}
|
||||
|
||||
int ObRDWFWholeMsg::assign(const ObRDWFWholeMsg &msg)
|
||||
|
||||
@ -49,12 +49,13 @@ public:
|
||||
}
|
||||
}
|
||||
ctxs_.reset();
|
||||
types_.reset();
|
||||
}
|
||||
int find_piece_ctx(uint64_t op_id, ObPieceMsgCtx *&ctx)
|
||||
int find_piece_ctx(uint64_t op_id, dtl::ObDtlMsgType type, ObPieceMsgCtx *&ctx)
|
||||
{
|
||||
int ret = common::OB_ENTRY_NOT_EXIST;;
|
||||
for (int i = 0; i < ctxs_.count(); ++i) {
|
||||
if (ctxs_.at(i)->op_id_ == op_id) {
|
||||
if (ctxs_.at(i)->op_id_ == op_id && types_.at(i) == type) {
|
||||
ret = common::OB_SUCCESS;
|
||||
ctx = ctxs_.at(i);
|
||||
break;
|
||||
@ -62,12 +63,17 @@ public:
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int add_piece_ctx(ObPieceMsgCtx *ctx)
|
||||
int add_piece_ctx(ObPieceMsgCtx *ctx, dtl::ObDtlMsgType type)
|
||||
{
|
||||
return ctxs_.push_back(ctx);
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(ctxs_.push_back(ctx))) {
|
||||
} else if (OB_FAIL(types_.push_back(type))) {
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
private:
|
||||
common::ObSEArray<ObPieceMsgCtx *, 2> ctxs_;
|
||||
common::ObSEArray<dtl::ObDtlMsgType, 2> types_;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@ -25,20 +25,27 @@ namespace sql
|
||||
class ObPxDatahubDataProvider
|
||||
{
|
||||
public:
|
||||
ObPxDatahubDataProvider()
|
||||
: op_id_(-1), msg_type_(dtl::TESTING), msg_set_(false), dh_msg_cnt_(0), rescan_cnt_(0)
|
||||
{
|
||||
}
|
||||
virtual int get_msg_nonblock(const dtl::ObDtlMsg *&msg, int64_t timeout_ts) = 0;
|
||||
virtual void reset() {}
|
||||
TO_STRING_KV(K_(op_id), K_(msg_type));
|
||||
uint64_t op_id_; // 注册本 provider 的算子 id,用于 provder 数组里寻址对应 provider
|
||||
dtl::ObDtlMsgType msg_type_;
|
||||
bool msg_set_;
|
||||
volatile int64_t dh_msg_cnt_;
|
||||
volatile int64_t rescan_cnt_;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
class ObWholeMsgProvider : public ObPxDatahubDataProvider
|
||||
{
|
||||
public:
|
||||
ObWholeMsgProvider() : msg_set_(false) {}
|
||||
ObWholeMsgProvider() {}
|
||||
virtual ~ObWholeMsgProvider() = default;
|
||||
virtual void reset() override { msg_.reset(); msg_set_ = false; }
|
||||
virtual void reset() override { msg_.reset();}
|
||||
int get_msg_nonblock(const dtl::ObDtlMsg *&msg, int64_t timeout_ts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -83,7 +90,6 @@ private:
|
||||
return ret;
|
||||
}
|
||||
private:
|
||||
bool msg_set_;
|
||||
T msg_;
|
||||
common::ObThreadCond msg_ready_cond_;
|
||||
};
|
||||
|
||||
@ -1025,7 +1025,7 @@ int ObPxTransmitOp::do_datahub_dynamic_sample(int64_t op_id, ObDynamicSamplePiec
|
||||
bool send_piece = true;
|
||||
if (OB_FAIL(proxy.make_sqc_sample_piece_msg(piece_msg, send_piece))) {
|
||||
LOG_WARN("fail to make sqc sample piece msg", K(ret));
|
||||
} else if (OB_FAIL(proxy.get_dh_msg(op_id,
|
||||
} else if (OB_FAIL(proxy.get_dh_msg_sync(op_id,
|
||||
DH_DYNAMIC_SAMPLE_WHOLE_MSG,
|
||||
proxy.get_piece_sample_msg(),
|
||||
temp_whole_msg,
|
||||
|
||||
@ -76,7 +76,7 @@ public:
|
||||
} else if (OB_ISNULL(source_dfo) || OB_ISNULL(target_dfo)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("NULL ptr or null session ptr", KP(source_dfo), KP(target_dfo), K(pkt), K(ret));
|
||||
} else if (OB_FAIL(coord_info.piece_msg_ctx_mgr_.find_piece_ctx(pkt.op_id_, piece_ctx))) {
|
||||
} else if (OB_FAIL(coord_info.piece_msg_ctx_mgr_.find_piece_ctx(pkt.op_id_, pkt.type(), piece_ctx))) {
|
||||
// 如果找不到则创建一个 ctx
|
||||
// NOTE: 这里新建一个 piece_ctx 的方式不会出现并发问题,
|
||||
// 因为 QC 是单线程消息循环,逐个处理 SQC 发来的消息
|
||||
@ -86,7 +86,7 @@ public:
|
||||
source_dfo->get_total_task_count(), piece_ctx))) {
|
||||
LOG_WARN("fail to alloc piece msg", K(ret));
|
||||
} else if (nullptr != piece_ctx) {
|
||||
if (OB_FAIL(coord_info.piece_msg_ctx_mgr_.add_piece_ctx(piece_ctx))) {
|
||||
if (OB_FAIL(coord_info.piece_msg_ctx_mgr_.add_piece_ctx(piece_ctx, pkt.type()))) {
|
||||
LOG_WARN("fail add barrier piece ctx", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
@ -25,6 +25,8 @@ using namespace oceanbase::common;
|
||||
using namespace oceanbase::sql;
|
||||
using namespace oceanbase::sql::dtl;
|
||||
|
||||
#define BREAK_TASK_CNT(a) ((a) + 1)
|
||||
|
||||
int ObBloomFilterSendCtx::generate_filter_indexes(
|
||||
int64_t each_group_size,
|
||||
int64_t channel_count)
|
||||
@ -638,3 +640,32 @@ int64_t ObPxSQCProxy::get_query_timeout_ts()
|
||||
}
|
||||
|
||||
int64_t ObPxSQCProxy::get_task_count() const { return sqc_ctx_.get_task_count(); }
|
||||
|
||||
int ObPxSQCProxy::sync_wait_all(ObPxDatahubDataProvider &provider)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t task_cnt = get_task_count();
|
||||
const int64_t idx = ATOMIC_AAF(&provider.dh_msg_cnt_, 1);
|
||||
const int64_t curr_rescan_cnt = provider.rescan_cnt_ + 1;
|
||||
int64_t loop_cnt = 0;
|
||||
// The whole message should be reset in next rescan, we reset it after last piece msg
|
||||
// firstly do sync wait until all piece threads are in loop
|
||||
do {
|
||||
++loop_cnt;
|
||||
if (task_cnt == idx % (BREAK_TASK_CNT(task_cnt))) { // last thread
|
||||
provider.msg_set_ = false;
|
||||
provider.reset(); // reset whole message
|
||||
ATOMIC_AAF(&provider.rescan_cnt_, 1);
|
||||
ATOMIC_AAF(&provider.dh_msg_cnt_, 1); // to break the loop
|
||||
} else {
|
||||
ob_usleep(1000);
|
||||
if (0 == loop_cnt % 64) {
|
||||
if (OB_FAIL(THIS_WORKER.check_status())) {
|
||||
LOG_WARN("failed to sync wait", K(ret), K(task_cnt), K(provider.dh_msg_cnt_));
|
||||
}
|
||||
}
|
||||
}
|
||||
} while (OB_SUCC(ret) && provider.dh_msg_cnt_ < BREAK_TASK_CNT(task_cnt) * curr_rescan_cnt);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -126,10 +126,8 @@ public:
|
||||
int64_t timeout_ts,
|
||||
bool is_transmit);
|
||||
|
||||
// for peek datahub whole msg
|
||||
template <class PieceMsg, class WholeMsg>
|
||||
int get_dh_msg(
|
||||
uint64_t op_id,
|
||||
int get_dh_msg_sync(uint64_t op_id,
|
||||
dtl::ObDtlMsgType msg_type,
|
||||
const PieceMsg &piece,
|
||||
const WholeMsg *&whole,
|
||||
@ -137,6 +135,16 @@ public:
|
||||
bool send_piece = true,
|
||||
bool need_wait_whole_msg = true);
|
||||
|
||||
template <class PieceMsg, class WholeMsg>
|
||||
int get_dh_msg(uint64_t op_id,
|
||||
dtl::ObDtlMsgType msg_type,
|
||||
const PieceMsg &piece,
|
||||
const WholeMsg *&whole,
|
||||
int64_t timeout_ts,
|
||||
bool send_piece = true,
|
||||
bool need_wait_whole_msg = true);
|
||||
|
||||
|
||||
// 用于 worker 汇报执行结果
|
||||
int report_task_finish_status(int64_t task_idx, int rc);
|
||||
|
||||
@ -168,6 +176,9 @@ public:
|
||||
int append_bf_send_ctx(int64_t &bf_send_ctx_idx);
|
||||
int64_t get_task_count() const;
|
||||
common::ObThreadCond &get_msg_ready_cond() { return msg_ready_cond_; }
|
||||
int64_t get_dh_msg_cnt() const;
|
||||
void atomic_inc_dh_msg_cnt();
|
||||
int64_t atomic_add_and_fetch_dh_msg_cnt();
|
||||
private:
|
||||
/* functions */
|
||||
int setup_loop_proc(ObSqcCtx &sqc_ctx);
|
||||
@ -180,6 +191,18 @@ private:
|
||||
int get_whole_msg_provider(uint64_t op_id, dtl::ObDtlMsgType msg_type, ObPxDatahubDataProvider *&provider);
|
||||
int64_t get_process_query_time();
|
||||
int64_t get_query_timeout_ts();
|
||||
int sync_wait_all(ObPxDatahubDataProvider &provider);
|
||||
// for peek datahub whole msg
|
||||
template <class PieceMsg, class WholeMsg>
|
||||
int inner_get_dh_msg(
|
||||
uint64_t op_id,
|
||||
dtl::ObDtlMsgType msg_type,
|
||||
const PieceMsg &piece,
|
||||
const WholeMsg *&whole,
|
||||
int64_t timeout_ts,
|
||||
bool need_sync,
|
||||
bool send_piece,
|
||||
bool need_wait_whole_msg);
|
||||
/* variables */
|
||||
public:
|
||||
ObSqcCtx &sqc_ctx_;
|
||||
@ -201,12 +224,41 @@ private:
|
||||
|
||||
|
||||
template <class PieceMsg, class WholeMsg>
|
||||
int ObPxSQCProxy::get_dh_msg(
|
||||
int ObPxSQCProxy::get_dh_msg_sync(uint64_t op_id,
|
||||
dtl::ObDtlMsgType msg_type,
|
||||
const PieceMsg &piece,
|
||||
const WholeMsg *&whole,
|
||||
int64_t timeout_ts,
|
||||
bool send_piece,
|
||||
bool need_wait_whole_msg)
|
||||
{
|
||||
return inner_get_dh_msg(op_id, msg_type, piece, whole,
|
||||
timeout_ts, true, send_piece,
|
||||
need_wait_whole_msg);
|
||||
}
|
||||
|
||||
template <class PieceMsg, class WholeMsg>
|
||||
int ObPxSQCProxy::get_dh_msg(uint64_t op_id,
|
||||
dtl::ObDtlMsgType msg_type,
|
||||
const PieceMsg &piece,
|
||||
const WholeMsg *&whole,
|
||||
int64_t timeout_ts,
|
||||
bool send_piece,
|
||||
bool need_wait_whole_msg)
|
||||
{
|
||||
return inner_get_dh_msg(op_id, msg_type, piece, whole,
|
||||
timeout_ts, false, send_piece,
|
||||
need_wait_whole_msg);
|
||||
}
|
||||
|
||||
template <class PieceMsg, class WholeMsg>
|
||||
int ObPxSQCProxy::inner_get_dh_msg(
|
||||
uint64_t op_id,
|
||||
dtl::ObDtlMsgType msg_type,
|
||||
const PieceMsg &piece,
|
||||
const WholeMsg *&whole,
|
||||
int64_t timeout_ts,
|
||||
bool need_sync,
|
||||
bool send_piece /*= true*/,
|
||||
bool need_wait_whole_msg /*= true*/)
|
||||
{
|
||||
@ -214,6 +266,8 @@ int ObPxSQCProxy::get_dh_msg(
|
||||
ObPxDatahubDataProvider *provider = nullptr;
|
||||
if (OB_FAIL(get_whole_msg_provider(op_id, msg_type, provider))) {
|
||||
SQL_LOG(WARN, "fail get provider", K(ret));
|
||||
} else if (need_sync && OB_FAIL(sync_wait_all(*provider))) {
|
||||
SQL_LOG(WARN, "failed to sync wait", K(ret));
|
||||
} else {
|
||||
if (send_piece) {
|
||||
ObLockGuard<ObSpinLock> lock_guard(dtl_lock_);
|
||||
@ -238,7 +292,7 @@ int ObPxSQCProxy::get_dh_msg(
|
||||
ret = process_dtl_msg(timeout_ts);
|
||||
SQL_LOG(DEBUG, "process dtl msg done", K(ret));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_EAGAIN == ret || OB_SUCCESS == ret) {
|
||||
const dtl::ObDtlMsg *msg = nullptr;
|
||||
if (OB_FAIL(p->get_msg_nonblock(msg, timeout_ts))) {
|
||||
SQL_LOG(TRACE, "fail get msg", K(timeout_ts), K(ret));
|
||||
|
||||
@ -36,7 +36,7 @@ ObSqcCtx::ObSqcCtx(ObPxRpcInitSqcArgs &sqc_arg) : msg_loop_(),
|
||||
interrupted_(false),
|
||||
bf_ch_provider_(sqc_proxy_.get_msg_ready_cond()),
|
||||
px_bloom_filter_msg_proc_(msg_proc_),
|
||||
opt_stats_gather_whole_msg_proc_(msg_proc_) {}
|
||||
opt_stats_gather_whole_msg_proc_(msg_proc_){}
|
||||
|
||||
int ObSqcCtx::add_whole_msg_provider(uint64_t op_id, dtl::ObDtlMsgType msg_type, ObPxDatahubDataProvider &provider)
|
||||
{
|
||||
|
||||
@ -2368,7 +2368,7 @@ int ObWindowFunctionOp::rd_fetch_patch()
|
||||
|
||||
const ObRDWFWholeMsg *whole_msg = NULL;
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(handler->get_sqc_proxy().get_dh_msg(
|
||||
} else if (OB_FAIL(handler->get_sqc_proxy().get_dh_msg_sync(
|
||||
MY_SPEC.id_, dtl::DH_RANGE_DIST_WF_PIECE_MSG, piece_msg, whole_msg,
|
||||
ctx_.get_physical_plan_ctx()->get_timeout_timestamp()))) {
|
||||
LOG_WARN("get range distribute window function msg failed", K(ret));
|
||||
@ -3169,7 +3169,7 @@ int ObWindowFunctionOp::get_participator_whole_msg(
|
||||
piece.target_dfo_id_ = proxy.get_dfo_id();
|
||||
piece.pby_hash_value_array_ = pby_hash_value_array;
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(proxy.get_dh_msg(MY_SPEC.id_, dtl::DH_SECOND_STAGE_REPORTING_WF_WHOLE_MSG,
|
||||
if (OB_FAIL(proxy.get_dh_msg_sync(MY_SPEC.id_, dtl::DH_SECOND_STAGE_REPORTING_WF_WHOLE_MSG,
|
||||
piece, temp_whole_msg, ctx_.get_physical_plan_ctx()->get_timeout_timestamp()))) {
|
||||
LOG_WARN("fail to get reporting wf whole msg", K(ret), K(piece), KPC(temp_whole_msg));
|
||||
} else if (OB_ISNULL(temp_whole_msg)) {
|
||||
|
||||
Reference in New Issue
Block a user