Replacing ObArray with ObSEArray in ObIDetectCallback

This commit is contained in:
obdev
2023-06-28 09:18:00 +00:00
committed by ob-robot
parent 09dd119b6e
commit ffa0b12f34
6 changed files with 25 additions and 24 deletions

View File

@ -21,7 +21,7 @@ namespace common {
const int64_t DM_INTERRUPT_MSG_MAX_LENGTH = 128;
ObIDetectCallback::ObIDetectCallback(uint64_t tenant_id, const ObArray<ObPeerTaskState> &peer_states)
ObIDetectCallback::ObIDetectCallback(uint64_t tenant_id, const ObIArray<ObPeerTaskState> &peer_states)
: ref_count_(0)
{
int ret = OB_SUCCESS;
@ -173,9 +173,9 @@ int ObDmInterruptQcCall::mock_sqc_finish_msg(sql::ObPxSqcMeta &sqc)
}
ObQcDetectCB::ObQcDetectCB(uint64_t tenant_id,
const ObArray<ObPeerTaskState> &peer_states,
const ObIArray<ObPeerTaskState> &peer_states,
const ObInterruptibleTaskID &tid, sql::ObDfo &dfo,
const ObArray<sql::dtl::ObDtlChannel *> &dtl_channels)
const ObIArray<sql::dtl::ObDtlChannel *> &dtl_channels)
: ObIDetectCallback(tenant_id, peer_states), tid_(tid), dfo_(dfo)
{
// if ObIDetectCallback constructed succ

View File

@ -17,6 +17,7 @@
#include "share/detect/ob_detectable_id.h"
#include "sql/engine/px/ob_dfo.h"
#include "sql/engine/px/p2p_datahub/ob_p2p_dh_share_info.h"
#include "lib/container/ob_se_array.h"
namespace oceanbase {
@ -70,7 +71,7 @@ class ObIDetectCallback
{
public:
// constructor for pass peer_states from derived class
explicit ObIDetectCallback(uint64_t tenant_id, const ObArray<ObPeerTaskState> &peer_states);
explicit ObIDetectCallback(uint64_t tenant_id, const ObIArray<ObPeerTaskState> &peer_states);
virtual void destroy()
{
peer_states_.reset();
@ -79,7 +80,7 @@ public:
virtual int64_t get_detect_callback_type() const = 0;
virtual bool reentrant() const { return false; }
ObArray<ObPeerTaskState> &get_peer_states() { return peer_states_; }
ObIArray<ObPeerTaskState> &get_peer_states() { return peer_states_; }
// set peer state to finished and get the old state
virtual int atomic_set_finished(const common::ObAddr &addr, ObTaskState *state=nullptr);
@ -99,7 +100,7 @@ public:
inline int64_t to_string(char *buf, const int64_t len) const { return 0; }
private:
int64_t ref_count_;
ObArray<ObPeerTaskState> peer_states_;
ObSEArray<ObPeerTaskState, 8, common::ModulePageAllocator, true> peer_states_;
protected:
common::ObAddr from_svr_addr_; // in which server the task is detected as finished
common::ObCurTraceId::TraceId trace_id_;
@ -109,8 +110,8 @@ protected:
class ObQcDetectCB : public ObIDetectCallback
{
public:
ObQcDetectCB(uint64_t tenant_id, const ObArray<ObPeerTaskState> &peer_states, const ObInterruptibleTaskID &tid, sql::ObDfo &dfo,
const ObArray<sql::dtl::ObDtlChannel *> &dtl_channels);
ObQcDetectCB(uint64_t tenant_id, const ObIArray<ObPeerTaskState> &peer_states, const ObInterruptibleTaskID &tid, sql::ObDfo &dfo,
const ObIArray<sql::dtl::ObDtlChannel *> &dtl_channels);
void destroy() override;
int do_callback() override;
int64_t get_detect_callback_type() const override { return (int64_t)DetectCallBackType::QC_DETECT_CB; }
@ -121,13 +122,13 @@ private:
ObInterruptibleTaskID tid_;
sql::ObDfo &dfo_;
int64_t timeout_ts_;
ObArray<sql::dtl::ObDtlChannel *> dtl_channels_;
ObSEArray<sql::dtl::ObDtlChannel *, 8, common::ModulePageAllocator, true> dtl_channels_;
};
class ObSqcDetectCB : public ObIDetectCallback
{
public:
ObSqcDetectCB(uint64_t tenant_id, const ObArray<ObPeerTaskState> &peer_states, const ObInterruptibleTaskID &tid)
ObSqcDetectCB(uint64_t tenant_id, const ObIArray<ObPeerTaskState> &peer_states, const ObInterruptibleTaskID &tid)
: ObIDetectCallback(tenant_id, peer_states), tid_(tid) {}
int do_callback() override;
@ -139,7 +140,7 @@ private:
class ObSingleDfoDetectCB : public ObIDetectCallback
{
public:
ObSingleDfoDetectCB(uint64_t tenant_id, const ObArray<ObPeerTaskState> &peer_states, const sql::dtl::ObDTLIntermResultKey &key)
ObSingleDfoDetectCB(uint64_t tenant_id, const ObIArray<ObPeerTaskState> &peer_states, const sql::dtl::ObDTLIntermResultKey &key)
: ObIDetectCallback(tenant_id, peer_states), key_(key) {}
int do_callback() override;
@ -151,7 +152,7 @@ private:
class ObTempTableDetectCB : public ObIDetectCallback
{
public:
ObTempTableDetectCB(uint64_t tenant_id, const ObArray<ObPeerTaskState> &peer_states, const sql::dtl::ObDTLIntermResultKey &key)
ObTempTableDetectCB(uint64_t tenant_id, const ObIArray<ObPeerTaskState> &peer_states, const sql::dtl::ObDTLIntermResultKey &key)
: ObIDetectCallback(tenant_id, peer_states), key_(key) {}
int do_callback() override;
@ -163,7 +164,7 @@ private:
class ObP2PDataHubDetectCB : public ObIDetectCallback
{
public:
ObP2PDataHubDetectCB(uint64_t tenant_id, const ObArray<ObPeerTaskState> &peer_states, const sql::ObP2PDhKey &key)
ObP2PDataHubDetectCB(uint64_t tenant_id, const ObIArray<ObPeerTaskState> &peer_states, const sql::ObP2PDhKey &key)
: ObIDetectCallback(tenant_id, peer_states), key_(key) {}
int do_callback() override;

View File

@ -531,7 +531,7 @@ void ObDetectManager::ObDetectReqGetCall::operator()(hash::HashMapPair<ObDetecta
while (OB_NOT_NULL(cb_node)) {
if (!cb_node->is_executed()) {
ObArray<ObPeerTaskState> &peer_states = cb_node->cb_->get_peer_states();
ObIArray<ObPeerTaskState> &peer_states = cb_node->cb_->get_peer_states();
ARRAY_FOREACH_NORET(peer_states, idx) {
ObPeerTaskState &peer_state = peer_states.at(idx);
// only detect running tasks

View File

@ -79,8 +79,8 @@ void ObDetectManagerUtils::qc_unregister_detectable_id_from_dm(const ObDetectabl
}
int ObDetectManagerUtils::qc_register_check_item_into_dm(ObDfo &dfo,
const ObArray<ObPeerTaskState> &peer_states,
const ObArray<ObDtlChannel *> &dtl_channels)
const ObIArray<ObPeerTaskState> &peer_states,
const ObIArray<ObDtlChannel *> &dtl_channels)
{
int ret = OB_SUCCESS;
ObDetectableId sqc_detectable_id = dfo.get_px_detectable_ids().sqc_detectable_id_;
@ -162,7 +162,7 @@ int ObDetectManagerUtils::sqc_register_into_dm(ObPxSqcHandler *sqc_handler, ObPx
} else {
// 2.register a check item for detecting qc,
// can be failed and not return error code, only drop the ability to detect qc
ObArray<ObPeerTaskState> peer_states;
ObSEArray<ObPeerTaskState, 1> peer_states;
if (OB_SUCCESS != peer_states.push_back(ObPeerTaskState(sqc.get_qc_addr()))) {
LIB_LOG(WARN, "[DM] failed to push_back", K(ret), K(sqc_detectable_id), K(qc_detectable_id));
} else {
@ -222,7 +222,7 @@ int ObDetectManagerUtils::single_dfo_register_check_item_into_dm(const common::O
if (register_dm_info.is_valid()) {
uint64_t node_sequence_id = 0;
common::ObSingleDfoDetectCB *cb = nullptr;
ObArray<ObPeerTaskState> peer_states;
ObSEArray<ObPeerTaskState, 1> peer_states;
if (OB_FAIL(peer_states.push_back(ObPeerTaskState{register_dm_info.addr_}))) {
LIB_LOG(WARN, "[DM] failed to push_back to peer_states", K(ret), K(register_dm_info), K(key));
}
@ -254,7 +254,7 @@ int ObDetectManagerUtils::temp_table_register_check_item_into_dm(const common::O
int ret = OB_SUCCESS;
uint64_t node_sequence_id = 0;
common::ObTempTableDetectCB *cb = nullptr;
ObArray<ObPeerTaskState> peer_states;
ObSEArray<ObPeerTaskState, 1> peer_states;
if (OB_FAIL(peer_states.push_back(ObPeerTaskState{qc_addr}))) {
LIB_LOG(WARN, "[DM] failed to push_back", K(ret), K(qc_detectable_id),
K(qc_addr), K(dtl_int_key));
@ -304,7 +304,7 @@ int ObDetectManagerUtils::p2p_datahub_register_check_item_into_dm(const common::
const common::ObDetectableId &qc_detectable_id = register_dm_info.detectable_id_;
const common::ObAddr &qc_addr = register_dm_info.addr_;
ObArray<ObPeerTaskState> peer_states;
ObSEArray<ObPeerTaskState, 1> peer_states;
if (OB_FAIL(peer_states.push_back(ObPeerTaskState{qc_addr}))) {
LIB_LOG(WARN, "[DM] failed to push_back", K(ret), K(qc_detectable_id),
K(qc_addr), K(p2p_key));

View File

@ -46,8 +46,8 @@ public:
static void qc_unregister_detectable_id_from_dm(const common::ObDetectableId &detectable_id,
bool &register_detectable_id);
static int qc_register_check_item_into_dm(sql::ObDfo &dfo,
const ObArray<common::ObPeerTaskState> &peer_states,
const ObArray<sql::dtl::ObDtlChannel *> &dtl_channels);
const ObIArray<common::ObPeerTaskState> &peer_states,
const ObIArray<sql::dtl::ObDtlChannel *> &dtl_channels);
static void qc_unregister_check_item_from_dm(sql::ObDfo *dfo, ObDetectManager* dm=nullptr);
static void qc_unregister_all_check_items_from_dm(const ObIArray<sql::ObDfo *> &dfos);

View File

@ -1141,8 +1141,8 @@ int ObParallelDfoScheduler::dispatch_sqc(ObExecContext &exec_ctx,
sqc.set_adjoining_root_dfo(true);
}
}
ObArray<ObPeerTaskState> peer_states;
ObArray<dtl::ObDtlChannel *> dtl_channels;
ObSEArray<ObPeerTaskState, 8> peer_states;
ObSEArray<dtl::ObDtlChannel *, 8> dtl_channels;
// 分发 sqc 可能需要重试,
// 分发 sqc 的 rpc 成功,但 sqc 上无法分配最小个数的 worker 线程,`dispatch_sqc`内部进行重试,