[FEAT MERGE] support auto dop

This commit is contained in:
obdev
2023-04-28 15:11:52 +00:00
committed by ob-robot
parent 642f1c7d84
commit b41dc0ebdd
106 changed files with 3815 additions and 2844 deletions

View File

@ -23,6 +23,7 @@
#include "sql/engine/px/exchange/ob_px_repart_transmit_op.h"
#include "sql/optimizer/ob_px_resource_analyzer.h"
#include "sql/engine/px/ob_px_scheduler.h"
#include "sql/engine/px/ob_px_coord_op.h"
using namespace oceanbase::common;
using namespace oceanbase::sql;
@ -165,8 +166,58 @@ int ObDfoSchedDepthGenerator::try_set_dfo_block(ObExecContext &exec_ctx, ObDfo &
return ret;
}
int ObDfoWorkerAssignment::calc_admited_worker_count(const ObIArray<ObDfo*> &dfos,
ObExecContext &exec_ctx,
const ObOpSpec &root_op_spec,
int64_t &px_expected,
int64_t &px_minimal,
int64_t &px_admited)
{
int ret = OB_SUCCESS;
px_admited = 0;
const ObTaskExecutorCtx *task_exec_ctx = NULL;
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("task exec ctx NULL", K(ret));
} else if (OB_FAIL(ObDfoWorkerAssignment::get_dfos_worker_count(dfos, true, px_minimal))) {
LOG_WARN("failed to get dfos worker count", K(ret));
} else {
// px 级, 表示 optimizer 计算的数量,当前 px 理论上需要多少线程
px_expected = static_cast<const ObPxCoordSpec*>(&root_op_spec)->get_expected_worker_count();
// query 级, 表示 optimizer 计算的数量
const int64_t query_expected = task_exec_ctx->get_expected_worker_cnt();
// query 级, 表示调度需要最小数量
const int64_t query_minimal = task_exec_ctx->get_minimal_worker_cnt();
// query 级, 表示 admission 实际分配的数量
const int64_t query_admited = task_exec_ctx->get_admited_worker_cnt();
if (query_expected > 0 && 0 >= query_admited) {
ret = OB_ERR_INSUFFICIENT_PX_WORKER;
LOG_WARN("not enough thread resource", K(ret), K(px_expected), K(query_admited), K(query_expected));
} else if (0 == query_expected) {
// note: 对于单表、dop=1的查询,会走 fast dfo,此时 query_expected = 0
px_admited = 0;
} else if (query_admited >= query_expected) {
px_admited = px_expected;
} else if (OB_UNLIKELY(query_minimal <= 0)) {
// compatible with version before 4.2
px_admited = static_cast<int64_t>((double) query_admited * (double)px_expected / (double) query_expected);
} else if (OB_UNLIKELY(query_admited < query_minimal || query_expected <= query_minimal)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected query admited worker count", K(ret), K(query_minimal), K(query_admited), K(query_expected));
} else {
const int64_t extra_worker = query_admited - query_minimal;
const int64_t extra_expected = px_expected - px_minimal;
px_admited = px_minimal + extra_expected * extra_worker / (query_expected - query_minimal);
}
LOG_TRACE("calc px worker count", K(query_expected), K(query_minimal), K(query_admited),
K(px_expected), K(px_minimal), K(px_admited));
}
return ret;
}
int ObDfoWorkerAssignment::assign_worker(ObDfoMgr &dfo_mgr,
int64_t expected_worker_count,
int64_t minimal_worker_count,
int64_t admited_worker_count)
{
int ret = OB_SUCCESS;
@ -221,18 +272,38 @@ int ObDfoWorkerAssignment::assign_worker(ObDfoMgr &dfo_mgr,
// 实际分配的 worker 数一定不大于 dop,但可能小于 dop 给定值
// admited_worker_count在rpc作为worker的场景下,值为0.
double scale_rate = 1.0;
if (admited_worker_count < 0 || expected_worker_count <= 0) {
bool match_expected = false;
bool compatible_before_420 = false;
if (OB_UNLIKELY(admited_worker_count < 0 || expected_worker_count <= 0 || minimal_worker_count <= 0)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("should have at least one worker",
K(admited_worker_count), K(expected_worker_count), K(ret));
} else if (0 <= admited_worker_count && admited_worker_count < expected_worker_count) {
LOG_WARN("should have at least one worker", K(ret), K(admited_worker_count),
K(expected_worker_count), K(minimal_worker_count));
} else if (admited_worker_count >= expected_worker_count) {
match_expected = true;
} else if (minimal_worker_count <= 0) {
// compatible with version before 4.2
compatible_before_420 = true;
scale_rate = static_cast<double>(admited_worker_count) / static_cast<double>(expected_worker_count);
} else if (0 <= admited_worker_count || minimal_worker_count == admited_worker_count) {
scale_rate = 0.0;
} else if (OB_UNLIKELY(minimal_worker_count > admited_worker_count
|| minimal_worker_count >= expected_worker_count)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected params", K(ret), K(minimal_worker_count), K(admited_worker_count), K(expected_worker_count));
} else {
scale_rate = static_cast<double>(admited_worker_count - minimal_worker_count)
/ static_cast<double>(expected_worker_count - minimal_worker_count);
}
ARRAY_FOREACH_X(dfos, idx, cnt, OB_SUCC(ret)) {
ObDfo *child = dfos.at(idx);
int64_t val = std::max(
1L,
static_cast<int64_t>(static_cast<double>(child->get_dop()) * scale_rate));
int64_t val = 0;
if (match_expected) {
val = child->get_dop();
} else if (compatible_before_420) {
val = std::max(1L, static_cast<int64_t>(static_cast<double>(child->get_dop()) * scale_rate));
} else {
val = 1L + static_cast<int64_t>(std::max(static_cast<double>(child->get_dop() - 1), 0.0) * scale_rate);
}
child->set_assigned_worker_count(val);
if (child->is_single() && val > 1) {
ret = OB_ERR_UNEXPECTED;
@ -245,17 +316,40 @@ int ObDfoWorkerAssignment::assign_worker(ObDfoMgr &dfo_mgr,
// 因为上面取了 max,所以可能实际 assigned 的会超出 admission 数,这时应该报错
int64_t total_assigned = 0;
if (OB_FAIL(ret)) {
} else if (OB_FAIL(get_dfos_worker_count(dfos, false, total_assigned))) {
LOG_WARN("failed to get dfos worker count", K(ret));
} else if (total_assigned > admited_worker_count && admited_worker_count != 0) {
// 意味着某些 dfo 理论上一个线程都分不到
ret = OB_ERR_PARALLEL_SERVERS_TARGET_NOT_ENOUGH;
LOG_WARN("total assigned worker to dfos is more than admited_worker_count",
K(total_assigned),
K(admited_worker_count),
K(minimal_worker_count),
K(expected_worker_count),
K(ret));
}
return ret;
}
int ObDfoWorkerAssignment::get_dfos_worker_count(const ObIArray<ObDfo*> &dfos,
const bool get_minimal,
int64_t &total_assigned)
{
int ret = OB_SUCCESS;
total_assigned = 0;
ARRAY_FOREACH_X(dfos, idx, cnt, OB_SUCC(ret)) {
const ObDfo *child = dfos.at(idx);
const ObDfo *parent = child->parent();
// 计算当前 dfo 和“孩子们”一起调度时消耗的线程数
// 找到 expected worker cnt 值最大的一组
if (OB_ISNULL(parent)) {
if (OB_ISNULL(parent) || OB_ISNULL(child)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("dfo edges expect to have parent", K(*child), K(ret));
LOG_WARN("dfo edges expect to have parent", KPC(parent), KPC(child), K(ret));
} else {
int64_t assigned =
parent->get_assigned_worker_count() + child->get_assigned_worker_count();
int64_t child_assigned = get_minimal ? 1 : child->get_assigned_worker_count();
int64_t parent_assigned = get_minimal ? 1 : parent->get_assigned_worker_count();
int64_t assigned = parent_assigned + child_assigned;
// 局部右深树的场景,depend_sibling 和当前 child dfo 都会被调度
/* Why need extra flag has_depend_sibling_? Why not use NULL != depend_sibling_?
* dfo5 (dop=2)
@ -274,29 +368,20 @@ int ObDfoWorkerAssignment::assign_worker(ObDfoMgr &dfo_mgr,
if (child->has_depend_sibling()) {
while (NULL != child->depend_sibling()) {
child = child->depend_sibling();
if (max_depend_sibling_assigned_worker < child->get_assigned_worker_count()) {
max_depend_sibling_assigned_worker = child->get_assigned_worker_count();
child_assigned = get_minimal ? 1 : child->get_assigned_worker_count();
if (max_depend_sibling_assigned_worker < child_assigned) {
max_depend_sibling_assigned_worker = child_assigned;
}
}
}
assigned += max_depend_sibling_assigned_worker;
if (assigned > total_assigned) {
total_assigned = assigned;
LOG_TRACE("update total assigned", K(idx), K(parent->get_assigned_worker_count()),
K(child->get_assigned_worker_count()), K(max_depend_sibling_assigned_worker), K(total_assigned));
LOG_TRACE("update total assigned", K(idx), K(get_minimal), K(parent_assigned),
K(child_assigned), K(max_depend_sibling_assigned_worker), K(total_assigned));
}
}
}
if (OB_SUCC(ret) && total_assigned > admited_worker_count && admited_worker_count != 0) {
// 意味着某些 dfo 理论上一个线程都分不到
ret = OB_ERR_PARALLEL_SERVERS_TARGET_NOT_ENOUGH;
LOG_USER_ERROR(OB_ERR_PARALLEL_SERVERS_TARGET_NOT_ENOUGH, total_assigned);
LOG_WARN("fail assign worker to dfos",
K(total_assigned),
K(admited_worker_count),
K(expected_worker_count),
K(ret));
}
return ret;
}
@ -316,14 +401,15 @@ void ObDfoMgr::destroy()
int ObDfoMgr::init(ObExecContext &exec_ctx,
const ObOpSpec &root_op_spec,
int64_t expected_worker_count,
int64_t admited_worker_count,
const ObDfoInterruptIdGen &dfo_int_gen,
ObPxCoordInfo &px_coord_info)
{
int ret = OB_SUCCESS;
root_dfo_ = NULL;
ObDfo *rpc_dfo = nullptr;
int64_t px_expected = 0;
int64_t px_minimal = 0;
int64_t px_admited = 0;
if (inited_) {
ret = OB_INIT_TWICE;
LOG_WARN("dfo mgr init twice", K(ret));
@ -336,11 +422,15 @@ int ObDfoMgr::init(ObExecContext &exec_ctx,
LOG_WARN("fail init dfo mgr", K(ret));
} else if (OB_FAIL(ObDfoSchedDepthGenerator::generate_sched_depth(exec_ctx, *this))) {
LOG_WARN("fail init dfo mgr", K(ret));
} else if (OB_FAIL(ObDfoWorkerAssignment::assign_worker(*this,
expected_worker_count,
admited_worker_count))) {
LOG_WARN("fail assign worker to dfos",
K(admited_worker_count), K(expected_worker_count), K(ret));
} else if (OB_FAIL(ObDfoWorkerAssignment::calc_admited_worker_count(get_all_dfos(),
exec_ctx,
root_op_spec,
px_expected,
px_minimal,
px_admited))) {
LOG_WARN("fail to calc admited worler count", K(ret));
} else if (OB_FAIL(ObDfoWorkerAssignment::assign_worker(*this, px_expected, px_minimal, px_admited))) {
LOG_WARN("fail assign worker to dfos", K(ret), K(px_expected), K(px_minimal), K(px_admited));
} else {
inited_ = true;
}