fix hash local allocate sqcs and build channel map bug

This commit is contained in:
sdc 2024-04-10 08:45:28 +00:00 committed by ob-robot
parent 7d11a2a10d
commit df4583a145
6 changed files with 134 additions and 14 deletions

View File

@ -27,7 +27,7 @@ OB_SERIALIZE_MEMBER(ObDtlChSet, exec_addr_, ch_info_set_);
OB_SERIALIZE_MEMBER(ObDtlTask, jobid_, taskid_, cis_, chans_cnt_);
OB_SERIALIZE_MEMBER(ObDtlExecServer, total_task_cnt_, exec_addrs_, prefix_task_counts_);
OB_SERIALIZE_MEMBER(ObDtlChTotalInfo, start_channel_id_, transmit_exec_server_,
receive_exec_server_, channel_count_, tenant_id_);
receive_exec_server_, channel_count_, tenant_id_, is_local_shuffle_);
int ObDtlChSet::add_channel_info(const ObDtlChannelInfo &info)
@ -98,6 +98,7 @@ int ObDtlChTotalInfo::assign(const ObDtlChTotalInfo &other)
tenant_id_ = other.tenant_id_;
OZ(transmit_exec_server_.assign(other.transmit_exec_server_));
OZ(receive_exec_server_.assign(other.receive_exec_server_));
is_local_shuffle_ = other.is_local_shuffle_;
return ret;
}

View File

@ -115,7 +115,7 @@ class ObDtlChTotalInfo
public:
ObDtlChTotalInfo()
: start_channel_id_(0), transmit_exec_server_(), receive_exec_server_(),
channel_count_(0), tenant_id_(common::OB_INVALID_ID)
channel_count_(0), tenant_id_(common::OB_INVALID_ID), is_local_shuffle_(false)
{}
int assign(const ObDtlChTotalInfo &other);
void reset()
@ -135,13 +135,15 @@ public:
K_(transmit_exec_server),
K_(receive_exec_server),
K_(channel_count),
K_(tenant_id));
K_(tenant_id),
K_(is_local_shuffle));
public:
int64_t start_channel_id_;
ObDtlExecServer transmit_exec_server_;
ObDtlExecServer receive_exec_server_;
int64_t channel_count_; // 理论上要等于 tranmit_total_task_cnt_ * receive_total_task_cnt_
uint64_t tenant_id_;
bool is_local_shuffle_;
};
class ObDtlTask

View File

@ -295,7 +295,7 @@ int ObDfo::fill_channel_info_by_sqc(
ch_servers.total_task_cnt_ = 0;
OZ(ch_servers.prefix_task_counts_.push_back(ch_servers.total_task_cnt_));
OZ(ch_servers.add_exec_addr(sqc.get_exec_addr()));
ch_servers.total_task_cnt_ = 1;
ch_servers.total_task_cnt_ = sqc.get_task_count();
return ret;
}

View File

@ -705,7 +705,6 @@ void ObSerialDfoScheduler::clean_dtl_interm_result(ObExecContext &exec_ctx)
int ObParallelDfoScheduler::do_schedule_dfo(ObExecContext &exec_ctx, ObDfo &dfo) const
{
int ret = OB_SUCCESS;
ObArray<ObPxSqcMeta *> sqcs;
if (OB_FAIL(dfo.get_sqcs(sqcs))) {
LOG_WARN("fail get qc-sqc channel for QC", K(ret));
@ -1469,6 +1468,13 @@ int ObParallelDfoScheduler::schedule_pair(ObExecContext &exec_ctx,
child, parent))) {
LOG_WARN("fail alloc addr by data distribution", K(parent), K(child), K(ret));
}
} else if (child.is_slave_mapping()) {
if (OB_UNLIKELY(ObPQDistributeMethod::HASH != child.get_dist_method())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected dist method for slave mapping", K(ret), K(parent), K(child));
} else if (OB_FAIL(ObPXServerAddrUtil::alloc_by_child_distribution(child, parent))) {
LOG_WARN("alloc by child distribution failed", K(ret));
}
} else if (OB_FAIL(ObPXServerAddrUtil::alloc_by_random_distribution(exec_ctx, child, parent))) {
LOG_WARN("fail alloc addr by data distribution", K(parent), K(child), K(ret));
}

View File

@ -3049,21 +3049,31 @@ int ObSlaveMapUtil::build_mn_channel_per_sqcs(
if (OB_ISNULL(dfo_ch_total_infos)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("transmit or receive mn channel info is null", KP(dfo_ch_total_infos));
} else if (OB_UNLIKELY(child.get_sqcs_count() != parent.get_sqcs_count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sqc count not match in slave mapping plan", K(ret), K(parent), K(child));
} else {
OZ(dfo_ch_total_infos->prepare_allocate(sqc_count));
if (OB_SUCC(ret)) {
for (int64_t i = 0; i < sqc_count && OB_SUCC(ret); ++i) {
ObDtlChTotalInfo &transmit_ch_info = dfo_ch_total_infos->at(i);
OZ(ObDfo::fill_channel_info_by_sqc(transmit_ch_info.transmit_exec_server_, child.get_sqcs()));
OZ(ObDfo::fill_channel_info_by_sqc(transmit_ch_info.receive_exec_server_, parent.get_sqcs()));
transmit_ch_info.channel_count_ = transmit_ch_info.transmit_exec_server_.total_task_cnt_
* transmit_ch_info.receive_exec_server_.total_task_cnt_;
transmit_ch_info.start_channel_id_ = ObDtlChannel::generate_id(transmit_ch_info.channel_count_)
- transmit_ch_info.channel_count_ + 1;
transmit_ch_info.tenant_id_ = tenant_id;
transmit_ch_info.is_local_shuffle_ = true;
if (OB_UNLIKELY(parent.get_sqcs().at(i).get_exec_addr() != child.get_sqcs().at(i).get_exec_addr())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("addr not match", K(ret));
} else {
OZ(ObDfo::fill_channel_info_by_sqc(transmit_ch_info.transmit_exec_server_, child.get_sqcs().at(i)));
OZ(ObDfo::fill_channel_info_by_sqc(transmit_ch_info.receive_exec_server_, parent.get_sqcs().at(i)));
transmit_ch_info.channel_count_ = transmit_ch_info.transmit_exec_server_.total_task_cnt_
* transmit_ch_info.receive_exec_server_.total_task_cnt_;
transmit_ch_info.start_channel_id_ = ObDtlChannel::generate_id(transmit_ch_info.channel_count_)
- transmit_ch_info.channel_count_ + 1;
transmit_ch_info.tenant_id_ = tenant_id;
}
}
}
}
LOG_DEBUG("build mn channel per sqcs", K(parent), K(child), KPC(dfo_ch_total_infos));
return ret;
}
@ -3618,7 +3628,7 @@ int ObSlaveMapUtil::get_pkey_table_locations(int64_t table_location_key,
return ret;
}
int ObDtlChannelUtil::get_receive_dtl_channel_set(
int ObDtlChannelUtil::get_mn_receive_dtl_channel_set(
const int64_t sqc_id,
const int64_t task_id,
ObDtlChTotalInfo &ch_total_info,
@ -3647,6 +3657,7 @@ int ObDtlChannelUtil::get_receive_dtl_channel_set(
LOG_WARN("fail reserve memory for channels", K(ret),
"channels", ch_total_info.transmit_exec_server_.total_task_cnt_);
}
// 遍历transmit的所有server,逐个构建当前这个receive task和它们的channel
for (int64_t i = 0; i < prefix_task_counts.count() && OB_SUCC(ret); ++i) {
int64_t prefix_task_count = 0;
if (i + 1 == prefix_task_counts.count()) {
@ -3656,6 +3667,8 @@ int ObDtlChannelUtil::get_receive_dtl_channel_set(
}
ObAddr &dst_addr = ch_total_info.transmit_exec_server_.exec_addrs_.at(i);
bool is_local = dst_addr == GCONF.self_addr_;
// [pre_prefix_task_count, prefix_task_count)表示transmit的第i个sqc中的transmit tasks,
// 在所有sqcs的transmit tasks中的编号
for (int64_t j = pre_prefix_task_count; j < prefix_task_count && OB_SUCC(ret); ++j) {
ObDtlChannelInfo ch_info;
chid = base_chid + receive_task_cnt * j;
@ -3674,10 +3687,45 @@ int ObDtlChannelUtil::get_receive_dtl_channel_set(
K(ch_total_info.transmit_exec_server_.total_task_cnt_), K(sqc_id), K(task_id));
}
}
LOG_DEBUG("get mn receive dtl channel set", K(sqc_id), K(task_id), K(ch_total_info), K(ch_set));
return ret;
}
int ObDtlChannelUtil::get_transmit_dtl_channel_set(
int ObDtlChannelUtil::get_sm_receive_dtl_channel_set(
const int64_t sqc_id,
const int64_t task_id,
ObDtlChTotalInfo &ch_total_info,
ObDtlChSet &ch_set)
{
int ret = OB_SUCCESS;
UNUSED(sqc_id);
int64_t receive_task_cnt = ch_total_info.receive_exec_server_.total_task_cnt_;
int64_t transmit_task_cnt = ch_total_info.transmit_exec_server_.total_task_cnt_;
if (OB_UNLIKELY(1 != ch_total_info.receive_exec_server_.exec_addrs_.count()
|| 1 != ch_total_info.transmit_exec_server_.exec_addrs_.count()
|| ch_total_info.receive_exec_server_.exec_addrs_.at(0) !=
ch_total_info.transmit_exec_server_.exec_addrs_.at(0)
|| ch_total_info.receive_exec_server_.exec_addrs_.at(0) != GCONF.self_addr_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected exec addrs count", K(ret), K(ch_total_info), K(GCONF.self_addr_));
} else if (OB_FAIL(ch_set.reserve(transmit_task_cnt))) {
LOG_WARN("fail reserve memory for channels", K(ret), K(transmit_task_cnt));
} else {
ObAddr &dst_addr = ch_total_info.transmit_exec_server_.exec_addrs_.at(0);
bool is_local = true;
int64_t chid = 0;
for (int64_t i = 0; i < transmit_task_cnt && OB_SUCC(ret); ++i) {
ObDtlChannelInfo ch_info;
chid = ch_total_info.start_channel_id_ + task_id + receive_task_cnt * i;
ObDtlChannelGroup::make_receive_channel(ch_total_info.tenant_id_, dst_addr, chid, ch_info, is_local);
OZ(ch_set.add_channel_info(ch_info));
}
}
LOG_DEBUG("get sm receive dtl channel set", K(sqc_id), K(task_id), K(ch_total_info), K(ch_set));
return ret;
}
int ObDtlChannelUtil::get_mn_transmit_dtl_channel_set(
const int64_t sqc_id,
const int64_t task_id,
ObDtlChTotalInfo &ch_total_info,
@ -3726,6 +3774,41 @@ int ObDtlChannelUtil::get_transmit_dtl_channel_set(
K(ch_total_info.transmit_exec_server_.total_task_cnt_));
}
}
LOG_DEBUG("get transmit dtl channel set", K(sqc_id), K(task_id), K(ch_total_info), K(ch_set));
return ret;
}
int ObDtlChannelUtil::get_sm_transmit_dtl_channel_set(
const int64_t sqc_id,
const int64_t task_id,
ObDtlChTotalInfo &ch_total_info,
ObDtlChSet &ch_set)
{
int ret = OB_SUCCESS;
UNUSED(sqc_id);
int64_t receive_task_cnt = ch_total_info.receive_exec_server_.total_task_cnt_;
int64_t transmit_task_cnt = ch_total_info.transmit_exec_server_.total_task_cnt_;
if (OB_UNLIKELY(1 != ch_total_info.receive_exec_server_.exec_addrs_.count()
|| 1 != ch_total_info.transmit_exec_server_.exec_addrs_.count()
|| ch_total_info.receive_exec_server_.exec_addrs_.at(0) !=
ch_total_info.transmit_exec_server_.exec_addrs_.at(0)
|| ch_total_info.receive_exec_server_.exec_addrs_.at(0) != GCONF.self_addr_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected exec addrs count", K(ret), K(ch_total_info), K(GCONF.self_addr_));
} else if (OB_FAIL(ch_set.reserve(receive_task_cnt))) {
LOG_WARN("fail reserve memory for channels", K(ret), K(receive_task_cnt));
} else {
ObAddr &dst_addr = ch_total_info.receive_exec_server_.exec_addrs_.at(0);
bool is_local = true;
int64_t chid = 0;
for (int64_t i = 0; i < transmit_task_cnt && OB_SUCC(ret); ++i) {
ObDtlChannelInfo ch_info;
chid = ch_total_info.start_channel_id_ + receive_task_cnt * task_id + i;
ObDtlChannelGroup::make_transmit_channel(ch_total_info.tenant_id_, dst_addr, chid, ch_info, is_local);
OZ(ch_set.add_channel_info(ch_info));
}
}
LOG_DEBUG("get sm receive dtl channel set", K(sqc_id), K(task_id), K(ch_total_info), K(ch_set));
return ret;
}

View File

@ -515,11 +515,39 @@ public:
DTLChannelPredFunc pred,
dtl::ObDtlFlowControl *dfc = nullptr);
static int get_receive_dtl_channel_set(
const int64_t sqc_id,
const int64_t task_id,
dtl::ObDtlChTotalInfo &ch_total_info,
dtl::ObDtlChSet &ch_set) {
return ch_total_info.is_local_shuffle_ ?
get_sm_receive_dtl_channel_set(sqc_id, task_id, ch_total_info, ch_set) :
get_mn_receive_dtl_channel_set(sqc_id, task_id, ch_total_info, ch_set);
}
static int get_mn_receive_dtl_channel_set(
const int64_t sqc_id,
const int64_t task_id,
dtl::ObDtlChTotalInfo &ch_total_info,
dtl::ObDtlChSet &ch_set);
static int get_sm_receive_dtl_channel_set(
const int64_t sqc_id,
const int64_t task_id,
dtl::ObDtlChTotalInfo &ch_total_info,
dtl::ObDtlChSet &ch_set);
static int get_transmit_dtl_channel_set(
const int64_t sqc_id,
const int64_t task_id,
dtl::ObDtlChTotalInfo &ch_total_info,
dtl::ObDtlChSet &ch_set) {
return ch_total_info.is_local_shuffle_ ?
get_sm_transmit_dtl_channel_set(sqc_id, task_id, ch_total_info, ch_set) :
get_mn_transmit_dtl_channel_set(sqc_id, task_id, ch_total_info, ch_set);
}
static int get_mn_transmit_dtl_channel_set(
const int64_t sqc_id,
const int64_t task_id,
dtl::ObDtlChTotalInfo &ch_total_info,
dtl::ObDtlChSet &ch_set);
static int get_sm_transmit_dtl_channel_set(
const int64_t sqc_id,
const int64_t task_id,
dtl::ObDtlChTotalInfo &ch_total_info,