fix dtl interm result leak

This commit is contained in:
obdev 2023-04-24 04:18:38 +00:00 committed by ob-robot
parent 0e5a7281f9
commit 16b713759f
23 changed files with 235 additions and 21 deletions

View File

@ -85,9 +85,9 @@ static int easy_decode_uint64(char *buf, const int64_t data_len, int64_t *pos, u
*val |= (((uint64_t)((*(buf + (*pos)++))) & 0xff)) << 24;
*val |= (((uint64_t)((*(buf + (*pos)++))) & 0xff)) << 16;
*val |= (((uint64_t)((*(buf + (*pos)++))) & 0xff)) << 8;
*val |= (((uint64_t)((*(buf + (*pos)++))) & 0xff));
}
*val |= (((uint64_t)((*(buf + (*pos)++))) & 0xff));
}
return ret;
}
@ -143,9 +143,9 @@ static int easy_decode_negotiation_msg(easy_negotiation_msg_t *ne_msg, char *rec
int64_t pos = 0;
if (NULL == ne_msg || NULL == recv_buf || NULL == decode_len) {
easy_error_log("easy_decode_negotiation_msg, invalid param!");
return EASY_ERROR;
}
easy_error_log("easy_decode_negotiation_msg, invalid param!");
return EASY_ERROR;
}
ret = easy_decode_uint64(recv_buf, recv_buf_len, &pos, &(ne_msg->msg_header.header_magic));
if (ret != EASY_OK) {
@ -297,6 +297,6 @@ int easy_send_negotiate_message(easy_connection_t *c)
}
void easy_consume_negotiation_msg(int fd, easy_io_t *eio)
{
net_consume_negotiation_msg(fd, eio->magic);
}
{
net_consume_negotiation_msg(fd, eio->magic);
}

View File

@ -518,6 +518,8 @@ PCODE_DEF(OB_DAS_ASYNC_ERASE_RESULT, 0x529) //erase das result with async rpc
PCODE_DEF(OB_RECOMPILE_ALL_VIEWS_BATCH, 0x52A) //reset status for view during upgrade
PCODE_DEF(OB_DAS_ASYNC_ACCESS, 0x52B) //das async rpc
PCODE_DEF(OB_TRY_ADD_DEP_INFOS_FOR_SYNONYM_BATCH, 0x52C) //add dependency for synonym during upgrade
PCODE_DEF(OB_CLEAN_DTL_INTERM_RESULT, 0x52D) //add dependency for synonym during upgrade
PCODE_DEF(OB_SQL_PCODE_END, 0x54F) // as a guardian
// for test schema

View File

@ -162,6 +162,7 @@ void oceanbase::observer::init_srv_xlator_for_others(ObSrvRpcXlator *xlator) {
RPC_PROCESSOR(ObInitTaskP, gctx_);
RPC_PROCESSOR(ObInitFastSqcP, gctx_);
RPC_PROCESSOR(ObPxTenantTargetMonitorP, gctx_);
RPC_PROCESSOR(ObPxCleanDtlIntermResP, gctx_);
// SQL Estimate
RPC_PROCESSOR(ObEstimatePartitionRowsP, gctx_);

View File

@ -4289,7 +4289,7 @@ int ObStaticEngineCG::generate_join_spec(ObLogJoin &op, ObJoinSpec &spec)
ObNestedLoopJoinSpec &nlj = static_cast<ObNestedLoopJoinSpec &>(spec);
if (op.enable_px_batch_rescan()) {
nlj.enable_px_batch_rescan_ = true;
nlj.group_size_ = ObNestedLoopJoinOp::PX_RESCAN_BATCH_ROW_COUNT;
nlj.group_size_ = PX_RESCAN_BATCH_ROW_COUNT;
} else {
nlj.enable_px_batch_rescan_ = false;
}

View File

@ -51,7 +51,7 @@ public:
// no need to serialize
DTL_CHAN_STATE state_;
TO_STRING_KV(KP_(chid), K_(type), K_(peer), K_(role), K_(tenant_id), K(state_));
TO_STRING_KV(K_(chid), K_(type), K_(peer), K_(role), K_(tenant_id), K(state_));
};
class ObDtlChSet
@ -70,6 +70,7 @@ public:
int64_t count() const { return ch_info_set_.count(); }
int assign(const ObDtlChSet &other);
void reset() { ch_info_set_.reset(); }
common::ObIArray<dtl::ObDtlChannelInfo> &get_ch_info_set() { return ch_info_set_; }
TO_STRING_KV(K_(exec_addr), K_(ch_info_set));
protected:
common::ObAddr exec_addr_;

View File

@ -122,8 +122,6 @@ public:
int fill_cur_row_rescan_param();
int calc_other_conds(bool &is_match);
public:
static const int64_t PX_RESCAN_BATCH_ROW_COUNT = 8192;
private:
// state operation and transfer function type.
typedef int (ObNestedLoopJoinOp::*state_operation_func_type)();

View File

@ -90,6 +90,10 @@ private:
int fetch_rows(const int64_t row_cnt);
int setup_loop_proc() override;
virtual void clean_dfos_dtl_interm_result() override
{
msg_proc_.clean_dtl_interm_result(ctx_);
}
private:
ObPxFifoCoordOpEventListener listener_;
ObSerialDfoScheduler serial_scheduler_;

View File

@ -145,6 +145,10 @@ private:
int init_store_rows(int64_t n_ways);
int setup_readers();
void destroy_readers();
virtual void clean_dfos_dtl_interm_result() override
{
msg_proc_.clean_dtl_interm_result(ctx_);
}
private:
ObPxMSCoordOpEventListener listener_;
ObSerialDfoScheduler serial_scheduler_;

View File

@ -128,6 +128,10 @@ private:
int setup_readers();
void destroy_readers();
int next_row(ObReceiveRowReader &reader, bool &wait_next_msg);
virtual void clean_dfos_dtl_interm_result() override
{
msg_proc_.clean_dtl_interm_result(ctx_);
}
private:
ObPxOrderedCoordOpEventListener listener_;
ObSerialDfoScheduler serial_scheduler_;

View File

@ -545,10 +545,19 @@ int ObPxReceiveOp::erase_dtl_interm_result()
LOG_WARN("fail get channel info", K(ret));
} else {
key.channel_id_ = ci.chid_;
key.batch_id_ = ctx_.get_px_batch_id();
if (OB_FAIL(ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) {
LOG_TRACE("fail to release recieve internal result", K(ret));
for (int64_t batch_id = ctx_.get_px_batch_id();
batch_id < PX_RESCAN_BATCH_ROW_COUNT && OB_SUCC(ret); batch_id++) {
key.batch_id_ = batch_id;
if (OB_FAIL(ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) {
if (OB_HASH_NOT_EXIST == ret) {
ret = OB_SUCCESS;
break;
} else {
LOG_WARN("fail to release recieve internal result", K(ret), K(key));
}
}
}
LOG_TRACE("receive erase dtl interm res", K(i), K(get_spec().get_id()), K(ci), K(ctx_.get_px_batch_id()));
}
}
return ret;

View File

@ -77,6 +77,8 @@ OB_SERIALIZE_MEMBER(ObSqcTableLocationKey,
tablet_id_,
is_dml_,
is_loc_uncertain_);
OB_SERIALIZE_MEMBER(ObPxCleanDtlIntermResInfo, ch_total_info_, sqc_id_, task_count_);
OB_SERIALIZE_MEMBER(ObPxCleanDtlIntermResArgs, info_, batch_size_);
int ObPxSqcMeta::assign(const ObPxSqcMeta &other)
{

View File

@ -273,9 +273,8 @@ public:
K_(qc_ch_info), K_(sqc_ch_info),
K_(task_count), K_(max_task_count), K_(min_task_count),
K_(thread_inited), K_(thread_finish), K_(px_int_id),
K_(is_fulltree), K_(is_rpc_worker), K_(transmit_use_interm_result),
K_(recieve_use_interm_result), K(temp_table_ctx_), K_(server_not_alive),
K_(adjoining_root_dfo), K_(is_single_tsc_leaf_dfo));
K_(transmit_use_interm_result),
K_(recieve_use_interm_result), K_(serial_receive_channels));
private:
uint64_t execution_id_;
uint64_t qc_id_;
@ -579,7 +578,9 @@ public:
K_(px_bloom_filter_mode),
K_(px_bf_id),
K_(pkey_table_loc_id),
K_(tsc_op_cnt));
K_(tsc_op_cnt),
K_(transmit_ch_sets),
K_(receive_ch_sets_map));
private:
DISALLOW_COPY_AND_ASSIGN(ObDfo);
@ -717,6 +718,47 @@ public:
ObSqcSerializeCache ser_cache_;
};
struct ObPxCleanDtlIntermResInfo
{
OB_UNIS_VERSION(1);
public:
ObPxCleanDtlIntermResInfo() : ch_total_info_(), sqc_id_(common::OB_INVALID_ID), task_count_(0) {}
ObPxCleanDtlIntermResInfo(dtl::ObDtlChTotalInfo &ch_info, int64_t sqc_id, int64_t task_count) :
ch_total_info_(ch_info), sqc_id_(sqc_id), task_count_(task_count)
{}
~ObPxCleanDtlIntermResInfo() { }
void reset()
{
ch_total_info_.reset();
sqc_id_ = common::OB_INVALID_ID;
task_count_ = 0;
}
TO_STRING_KV(K_(ch_total_info), K_(sqc_id), K_(task_count));
public:
dtl::ObDtlChTotalInfo ch_total_info_;
int64_t sqc_id_;
int64_t task_count_;
};
class ObPxCleanDtlIntermResArgs
{
OB_UNIS_VERSION(1);
public:
ObPxCleanDtlIntermResArgs() : info_(), batch_size_(0) {}
~ObPxCleanDtlIntermResArgs() { }
void reset()
{
info_.reset();
batch_size_ = 0;
}
TO_STRING_KV(K_(info), K_(batch_size));
public:
ObSEArray<ObPxCleanDtlIntermResInfo, 8> info_;
uint64_t batch_size_;
};
class ObPxTask
{

View File

@ -627,6 +627,84 @@ int ObSerialDfoScheduler::do_schedule_dfo(ObExecContext &ctx, ObDfo &dfo) const
}
void ObSerialDfoScheduler::clean_dtl_interm_result(ObExecContext &exec_ctx)
{
int ret = OB_SUCCESS;
const ObIArray<ObDfo *> &all_dfos = coord_info_.dfo_mgr_.get_all_dfos();
ObDfo *last_dfo = all_dfos.at(all_dfos.count() - 1);
if (OB_NOT_NULL(last_dfo) && last_dfo->is_scheduled() && OB_NOT_NULL(last_dfo->parent())
&& last_dfo->parent()->is_root_dfo()) {
// all dfo scheduled, do nothing.
LOG_TRACE("all dfo scheduled.");
} else {
const ObDfo *root = coord_info_.dfo_mgr_.get_root_dfo();
int64_t batch_size = coord_info_.get_rescan_param_count();
ObSEArray<ObDfo*, 8> dfos;
common::hash::ObHashMap<ObAddr, ObPxCleanDtlIntermResArgs *> map;
ObIAllocator &allocator = exec_ctx.get_allocator();
for (int64_t i = 0; i < all_dfos.count(); i++) {
ObDfo *dfo = all_dfos.at(i);
ObDfo *parent = NULL;
if (OB_NOT_NULL(dfo) && dfo->is_scheduled() && NULL != (parent = dfo->parent())
&& !parent->is_root_dfo() && !parent->is_scheduled()) {
// if current dfo is scheduled but parent dfo is not scheduled.
for (int64_t j = 0; j < parent->get_sqcs_count(); j++) {
ObPxSqcMeta &sqc = parent->get_sqcs().at(j);
int64_t msg_idx = 0;
for (; msg_idx < sqc.get_serial_receive_channels().count(); msg_idx++) {
if (sqc.get_serial_receive_channels().at(msg_idx).get_child_dfo_id() == dfo->get_dfo_id()) {
break;
}
}
if (OB_LIKELY(msg_idx < sqc.get_serial_receive_channels().count())) {
ObPxCleanDtlIntermResArgs *arg = NULL;
if (!map.created() && OB_FAIL(map.create(8, "CleanDtlRes"))) {
LOG_WARN("create map failed", K(ret));
} else if (OB_FAIL(map.get_refactored(sqc.get_exec_addr(), arg))) {
if (OB_HASH_NOT_EXIST == ret) {
void *buf = NULL;
if (OB_ISNULL(buf = allocator.alloc(sizeof(ObPxCleanDtlIntermResArgs)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc failed", K(ret));
} else {
arg = new(buf) ObPxCleanDtlIntermResArgs();
arg->batch_size_ = coord_info_.get_rescan_param_count();
if (OB_FAIL(map.set_refactored(sqc.get_exec_addr(), arg))) {
LOG_WARN("set refactored failed", K(ret));
}
}
} else {
LOG_WARN("get refactored failed", K(ret));
}
}
if (OB_SUCC(ret) && OB_NOT_NULL(arg)) {
ObPxReceiveDataChannelMsg &msg = sqc.get_serial_receive_channels().at(msg_idx);
if (OB_FAIL(arg->info_.push_back(ObPxCleanDtlIntermResInfo(msg.get_ch_total_info(),
sqc.get_sqc_id(), sqc.get_task_count())))) {
LOG_WARN("push back failed", K(ret));
}
}
}
}
}
}
// ignore allocate, set_refactored and push_back failure.
// send rpc to addrs inserted into the map successfully.
if (OB_UNLIKELY(!map.empty())) {
LOG_TRACE("clean dtl res map", K(map.size()));
ObSQLSessionInfo *session = exec_ctx.get_my_session();
uint64_t tenant_id = OB_NOT_NULL(session) ? session->get_effective_tenant_id() : OB_SYS_TENANT_ID;
auto iter = map.begin();
for (; iter != map.end(); iter++) {
if (OB_FAIL(coord_info_.rpc_proxy_.to(iter->first).by(tenant_id).clean_dtl_interm_result(*iter->second, NULL))) {
LOG_WARN("send clean dtl interm result rpc failed", K(ret), K(iter->first), KPC(iter->second));
}
LOG_TRACE("clean dtl res map", K(iter->first), K(*(iter->second)));
}
}
}
}
// -------------分割线-----------
// 启动 DFO 的 SQC 线程

View File

@ -41,6 +41,7 @@ public:
ObDfo &child,
ObDfo &parent,
bool is_parallel_scheduler = true) const;
virtual void clean_dtl_interm_result(ObExecContext &ctx) = 0;
int build_data_xchg_ch(ObExecContext &ctx, ObDfo &child, ObDfo &parent) const;
int build_data_mn_xchg_ch(ObExecContext &ctx, ObDfo &child, ObDfo &parent) const;
virtual int init_all_dfo_channel(ObExecContext &ctx) const;
@ -69,6 +70,8 @@ public:
virtual int init_all_dfo_channel(ObExecContext &ctx) const;
virtual int dispatch_dtl_data_channel_info(ObExecContext &ctx, ObDfo &child, ObDfo &parent) const;
virtual int try_schedule_next_dfo(ObExecContext &ctx) const;
virtual void clean_dtl_interm_result(ObExecContext &ctx) override;
private:
int build_transmit_recieve_channel(ObExecContext &ctx, ObDfo *dfo) const;
int init_dfo_channel(ObExecContext &ctx, ObDfo *child, ObDfo *parent) const;
@ -91,6 +94,7 @@ public:
{}
virtual int dispatch_dtl_data_channel_info(ObExecContext &ctx, ObDfo &child, ObDfo &parent) const;
virtual int try_schedule_next_dfo(ObExecContext &ctx) const;
virtual void clean_dtl_interm_result(ObExecContext &ctx) override { UNUSED(ctx); }
private:
int dispatch_transmit_channel_info(ObExecContext &ctx, ObDfo &child, ObDfo &parent) const;
int dispatch_receive_channel_info(ObExecContext &ctx, ObDfo &child, ObDfo &parent) const;

View File

@ -567,6 +567,7 @@ int ObPxCoordOp::inner_close()
LOG_WARN("release dtl channel failed", K(release_channel_ret));
}
ctx_.del_extra_check(server_alive_checker_);
clean_dfos_dtl_interm_result();
LOG_TRACE("byebye. exit QC Coord");
return ret;
}

View File

@ -135,6 +135,8 @@ protected:
int init_batch_info();
int batch_rescan();
int erase_dtl_interm_result();
// send rpc to clean dtl interm result of not scheduled dfos.
virtual void clean_dfos_dtl_interm_result() = 0;
protected:
common::ObArenaAllocator allocator_;
common::ObArenaAllocator row_allocator_;

View File

@ -650,3 +650,40 @@ int ObPxTenantTargetMonitorP::process()
}
return ret;
}
int ObPxCleanDtlIntermResP::process()
{
int ret = OB_SUCCESS;
dtl::ObDTLIntermResultKey key;
int64_t batch_size = 0 == arg_.batch_size_ ? 1 : arg_.batch_size_;
for (int64_t i = 0; i < arg_.info_.count(); i++) {
ObPxCleanDtlIntermResInfo &info = arg_.info_.at(i);
for (int64_t task_id = 0; task_id < info.task_count_; task_id++) {
ObPxTaskChSet ch_set;
if (OB_FAIL(ObDtlChannelUtil::get_receive_dtl_channel_set(info.sqc_id_, task_id,
info.ch_total_info_, ch_set))) {
LOG_WARN("get receive dtl channel set failed", K(ret));
} else {
LOG_TRACE("ObPxCleanDtlIntermResP process", K(i), K(arg_.batch_size_), K(info), K(task_id), K(ch_set));
for (int64_t ch_idx = 0; ch_idx < ch_set.count(); ch_idx++) {
key.channel_id_ = ch_set.get_ch_info_set().at(ch_idx).chid_;
for (int64_t batch_id = 0; batch_id < batch_size && OB_SUCC(ret); batch_id++) {
key.batch_id_= batch_id;
if (OB_FAIL(dtl::ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) {
if (OB_HASH_NOT_EXIST == ret) {
// interm result is written from batch_id = 0 to batch_size,
// if some errors happen when batch_id = i, no interm result of batch_id > i will be written.
// so if erase failed, just break and continue to erase interm result of next channel.
ret = OB_SUCCESS;
break;
} else {
LOG_WARN("fail to release recieve internal result", K(ret), K(ret));
}
}
}
}
}
}
}
return ret;
}

View File

@ -193,6 +193,18 @@ private:
const observer::ObGlobalContext &global_ctx_;
};
class ObPxCleanDtlIntermResP
: public obrpc::ObRpcProcessor<obrpc::ObPxRpcProxy::ObRpc<obrpc::OB_CLEAN_DTL_INTERM_RESULT> >
{
public:
ObPxCleanDtlIntermResP(const observer::ObGlobalContext &gctx)
{}
virtual ~ObPxCleanDtlIntermResP() = default;
virtual int init() final { return OB_SUCCESS; }
virtual void destroy() final {}
virtual int process() final;
};
} // sql
} // oceanbase

View File

@ -38,6 +38,7 @@ public:
RPC_AP(PR5 fast_init_sqc, OB_PX_FAST_INIT_SQC, (sql::ObPxRpcInitSqcArgs), sql::ObPxRpcInitSqcResponse);
// px资源监控
RPC_S(PR5 fetch_statistics, OB_PX_TARGET_REQUEST, (sql::ObPxRpcFetchStatArgs), sql::ObPxRpcFetchStatResponse);
RPC_AP(PR5 clean_dtl_interm_result, OB_CLEAN_DTL_INTERM_RESULT, (sql::ObPxCleanDtlIntermResArgs));
};
} // obrpc

View File

@ -128,6 +128,16 @@ int ObPxMsgProc::startup_msg_loop(ObExecContext &ctx)
return ret;
}
void ObPxMsgProc::clean_dtl_interm_result(ObExecContext &ctx)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(scheduler_)) {
LOG_WARN("dfo scheduler is null");
} else {
scheduler_->clean_dtl_interm_result(ctx);
}
}
// 1. 根据 pkt 信息找到对应 dfo, sqc,标记当前 sqc 线程分配完成
// 2. 判断该 dfo 下是否所有 sqc 都分配线程完成
// 如果完成,则标记 dfo 为 thread_inited, 进入第 3 步,否则结束处理

View File

@ -178,6 +178,7 @@ public:
int on_piece_msg(ObExecContext &ctx, const ObInitChannelPieceMsg &pkt);
int on_piece_msg(ObExecContext &ctx, const ObReportingWFPieceMsg &pkt);
int on_piece_msg(ObExecContext &ctx, const ObOptStatsGatherPieceMsg &pkt);
void clean_dtl_interm_result(ObExecContext &ctx);
// end DATAHUB msg processing
private:
int do_cleanup_dfo(ObDfo &dfo);

View File

@ -29,6 +29,7 @@ namespace oceanbase
{
namespace sql
{
const int64_t PX_RESCAN_BATCH_ROW_COUNT = 8192;
class ObIExtraStatusCheck;
enum ObBcastOptimization {
BC_TO_WORKER,

View File

@ -15,6 +15,7 @@
#include "sql/engine/ob_operator.h"
#include "sql/engine/basic/ob_chunk_datum_store.h"
#include "sql/engine/px/ob_px_util.h"
namespace oceanbase
{
@ -215,7 +216,6 @@ public:
public:
ObBatchRescanCtl &get_batch_rescan_ctl() { return batch_rescan_ctl_; }
static const int64_t PX_RESCAN_BATCH_ROW_COUNT = 8192;
int handle_next_batch_with_px_rescan(const int64_t op_max_batch_size);
int handle_next_batch_with_group_rescan(const int64_t op_max_batch_size);
private: