Reset PX DataHub whole msg in rescan scenario
This commit is contained in:
@ -298,6 +298,8 @@ int ObRDWFPieceMsgCtx::send_whole_msg(common::ObIArray<ObPxSqcMeta *> &sqcs)
|
||||
void ObRDWFPieceMsgCtx::reset_resource()
|
||||
{
|
||||
received_ = 0;
|
||||
infos_.reset();
|
||||
arena_alloc_.reset();
|
||||
}
|
||||
|
||||
int ObRDWFWholeMsg::assign(const ObRDWFWholeMsg &msg)
|
||||
|
||||
@ -25,20 +25,26 @@ namespace sql
|
||||
class ObPxDatahubDataProvider
|
||||
{
|
||||
public:
|
||||
ObPxDatahubDataProvider()
|
||||
: op_id_(-1), msg_type_(dtl::TESTING), send_msg_cnt_(0), msg_set_(false)
|
||||
{
|
||||
}
|
||||
virtual int get_msg_nonblock(const dtl::ObDtlMsg *&msg, int64_t timeout_ts) = 0;
|
||||
virtual void reset() {}
|
||||
virtual void reset() { msg_set_ = false; }
|
||||
TO_STRING_KV(K_(op_id), K_(msg_type));
|
||||
uint64_t op_id_; // 注册本 provider 的算子 id,用于 provder 数组里寻址对应 provider
|
||||
dtl::ObDtlMsgType msg_type_;
|
||||
volatile int64_t send_msg_cnt_;
|
||||
bool msg_set_;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
class ObWholeMsgProvider : public ObPxDatahubDataProvider
|
||||
{
|
||||
public:
|
||||
ObWholeMsgProvider() : msg_set_(false) {}
|
||||
ObWholeMsgProvider() {}
|
||||
virtual ~ObWholeMsgProvider() = default;
|
||||
virtual void reset() override { msg_.reset(); msg_set_ = false; }
|
||||
virtual void reset() override { msg_.reset(); ObPxDatahubDataProvider::reset(); }
|
||||
int get_msg_nonblock(const dtl::ObDtlMsg *&msg, int64_t timeout_ts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -83,7 +89,6 @@ private:
|
||||
return ret;
|
||||
}
|
||||
private:
|
||||
bool msg_set_;
|
||||
T msg_;
|
||||
common::ObThreadCond msg_ready_cond_;
|
||||
};
|
||||
|
||||
@ -217,7 +217,6 @@ int ObPxSQCProxy::get_dh_msg(
|
||||
} else {
|
||||
if (send_piece) {
|
||||
ObLockGuard<ObSpinLock> lock_guard(dtl_lock_);
|
||||
// TODO: LOCK sqc channel
|
||||
dtl::ObDtlChannel *ch = sqc_arg_.sqc_.get_sqc_channel();
|
||||
if (OB_ISNULL(ch)) {
|
||||
ret = common::OB_ERR_UNEXPECTED;
|
||||
@ -226,6 +225,19 @@ int ObPxSQCProxy::get_dh_msg(
|
||||
SQL_LOG(WARN, "fail push data to channel", K(ret));
|
||||
} else if (OB_FAIL(ch->flush())) {
|
||||
SQL_LOG(WARN, "fail flush dtl data", K(ret));
|
||||
} else {
|
||||
// The whole message should be reset in next rescan, we reset it after last piece msg
|
||||
// send in sending piece msg and receiving whole msg scenario (need send && wait whole msg).
|
||||
if (need_wait_whole_msg) {
|
||||
const int64_t task_cnt = get_task_count();
|
||||
if (provider->send_msg_cnt_ % task_cnt == 0) {
|
||||
provider->msg_set_ = false;
|
||||
}
|
||||
provider->send_msg_cnt_ += 1;
|
||||
if (provider->send_msg_cnt_ % task_cnt == 0) {
|
||||
provider->reset(); // reset whole message
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user