batch cherry-pick 31x bugfix to opensource branch

This commit is contained in:
raywill
2021-07-27 17:27:15 +08:00
committed by wangzelin.wzl
parent 2969037a8b
commit 9c76339e62
27 changed files with 174 additions and 49 deletions

View File

@ -222,7 +222,8 @@ ObExecContext::ObExecContext()
calc_type_(CALC_NORMAL),
fixed_id_(OB_INVALID_ID),
expr_partition_id_(OB_INVALID_ID),
iters_(256, allocator_)
iters_(256, allocator_),
check_status_times_(0)
{}
ObExecContext::~ObExecContext()
@ -748,6 +749,15 @@ int ObExecContext::check_status()
return ret;
}
int ObExecContext::fast_check_status(const int64_t n)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY((check_status_times_++ & n) == n)) {
ret = check_status();
}
return ret;
}
uint64_t ObExecContext::get_min_cluster_version() const
{
return task_executor_ctx_.get_min_cluster_version();

View File

@ -387,6 +387,8 @@ public:
ObRawExprFactory* get_expr_factory();
int check_status();
int fast_check_status(const int64_t n = 0xFF);
void set_outline_params_wrapper(const share::schema::ObOutlineParamsWrapper* params)
{
outline_params_wrapper_ = params;
@ -807,8 +809,8 @@ protected:
int64_t fixed_id_; // fixed part id or fixed subpart ids
// for expr values op use
int64_t expr_partition_id_;
ObSEArray<common::ObNewRowIterator*, 1, common::ObIAllocator&> iters_;
int64_t check_status_times_;
private:
DISALLOW_COPY_AND_ASSIGN(ObExecContext);
};

View File

@ -106,7 +106,7 @@ int ObPxFifoCoordOp::inner_get_next_row()
int64_t timeout_us = 0;
clear_evaluated_flag();
if (FALSE_IT(timeout_us = phy_plan_ctx->get_timeout_timestamp() - get_timestamp())) {
} else if (OB_FAIL(THIS_WORKER.check_status())) {
} else if (OB_FAIL(ctx_.fast_check_status())) {
LOG_WARN("fail check status, maybe px query timeout", K(ret));
// TODO: cleanup
} else if (OB_FAIL(msg_loop_.process_one(timeout_us))) {

View File

@ -208,7 +208,7 @@ int ObPxMSCoordOp::inner_get_next_row()
int64_t nth_channel = OB_INVALID_INDEX_INT64;
clear_evaluated_flag();
if (FALSE_IT(timeout_us = phy_plan_ctx->get_timeout_timestamp() - get_timestamp())) {
} else if (OB_FAIL(THIS_WORKER.check_status())) {
} else if (OB_FAIL(ctx_.fast_check_status())) {
LOG_WARN("fail check status, maybe px query timeout", K(ret));
// TODO: cleanup
} else if (OB_FAIL(msg_loop_.process_one_if(&receive_order_, timeout_us, nth_channel))) {

View File

@ -445,7 +445,7 @@ int ObPxFifoReceive::inner_get_next_row(ObExecContext& ctx, const common::ObNewR
LOG_WARN("get row from channel timeout", K(ret));
} else {
usleep(1 * 1000);
int tmp_ret = THIS_WORKER.check_status();
int tmp_ret = ctx.fast_check_status();
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("wait to receive row interrupted", K(tmp_ret), K(ret));
ret = tmp_ret;

View File

@ -120,8 +120,8 @@ public:
{
if (0 == ts_cnt_ % 1000) {
ts_ = common::ObTimeUtility::current_time();
++ts_cnt_;
}
++ts_cnt_;
return ts_;
}
ObPxTaskChSet& get_ch_set()

View File

@ -402,7 +402,7 @@ int ObPxFifoReceiveOp::inner_get_next_row()
LOG_WARN("get row from channel timeout", K(ret));
} else {
usleep(1 * 1000);
int tmp_ret = THIS_WORKER.check_status();
int tmp_ret = ctx_.fast_check_status();
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("wait to receive row interrupted", K(tmp_ret), K(ret));
ret = tmp_ret;

View File

@ -725,7 +725,7 @@ int ObPxCoord::wait_all_running_dfos_exit(ObExecContext& ctx) const
} else if (all_dfo_terminate) {
wait_msg = false;
LOG_TRACE("all dfo has been terminate", K(ret));
} else if (OB_FAIL(THIS_WORKER.check_status())) {
} else if (OB_FAIL(ctx.fast_check_status())) {
LOG_WARN("fail check status, maybe px query timeout", K(ret));
} else if (OB_FAIL(loop.process_one_if(&control_channels, timeout_us, nth_channel))) {
if (OB_EAGAIN == ret) {

View File

@ -664,7 +664,7 @@ int ObPxCoordOp::wait_all_running_dfos_exit()
} else if (all_dfo_terminate) {
wait_msg = false;
LOG_TRACE("all dfo has been terminate", K(ret));
} else if (OB_FAIL(THIS_WORKER.check_status())) {
} else if (OB_FAIL(ctx_.fast_check_status())) {
LOG_WARN("fail check status, maybe px query timeout", K(ret));
} else if (OB_FAIL(loop.process_one_if(&control_channels, timeout_us, nth_channel))) {
if (OB_EAGAIN == ret) {

View File

@ -134,8 +134,10 @@ int ObPxFifoCoord::inner_get_next_row(ObExecContext& ctx, const common::ObNewRow
if (OB_ISNULL(phy_plan_ctx = GET_PHY_PLAN_CTX(ctx))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("phy plan ctx NULL", K(ret));
} else if (FALSE_IT(timeout_us = phy_plan_ctx->get_timeout_timestamp() - px_ctx->get_timestamp())) {
} else if (OB_FAIL(THIS_WORKER.check_status())) {
} else if ((timeout_us = phy_plan_ctx->get_timeout_timestamp() - px_ctx->get_timestamp()) < 0) {
ret = OB_TIMEOUT;
LOG_WARN("query timeout", K(ret), K(timeout_us), "timeout_ts", THIS_WORKER.get_timeout_ts());
} else if (OB_FAIL(ctx.fast_check_status())) {
LOG_WARN("fail check status, maybe px query timeout", K(ret));
// TODO: cleanup
} else if (OB_FAIL(loop.process_one(timeout_us))) {

View File

@ -183,7 +183,7 @@ int ObPxMergeSortCoord::inner_get_next_row(ObExecContext& ctx, const common::ObN
ret = OB_ERR_UNEXPECTED;
LOG_WARN("phy plan ctx NULL", K(ret));
} else if (FALSE_IT(timeout_us = phy_plan_ctx->get_timeout_timestamp() - px_ctx->get_timestamp())) {
} else if (OB_FAIL(THIS_WORKER.check_status())) {
} else if (OB_FAIL(ctx.fast_check_status())) {
LOG_WARN("fail check status, maybe px query timeout", K(ret));
} else if (OB_FAIL(loop.process_one_if(&px_ctx->receive_order_, timeout_us, nth_channel))) {
if (OB_EAGAIN == ret) {

View File

@ -129,14 +129,7 @@ int ObPxMsgProc::on_sqc_init_msg(ObExecContext& ctx, const ObPxInitSqcResultMsg&
ObDfo* edge = NULL;
ObPxSqcMeta* sqc = NULL;
if (OB_SUCCESS != pkt.rc_) {
ret = pkt.rc_;
update_error_code(coord_info_.first_error_code_, pkt.rc_);
LOG_WARN("fail init sqc", K(pkt), K(ret));
} else if (pkt.task_count_ <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("task count returned by sqc invalid. expect 1 or more", K(pkt), K(ret));
} else if (OB_FAIL(coord_info_.dfo_mgr_.find_dfo_edge(pkt.dfo_id_, edge))) {
if (OB_FAIL(coord_info_.dfo_mgr_.find_dfo_edge(pkt.dfo_id_, edge))) {
LOG_WARN("fail find dfo", K(pkt), K(ret));
} else if (OB_ISNULL(edge)) {
ret = OB_ERR_UNEXPECTED;
@ -146,11 +139,24 @@ int ObPxMsgProc::on_sqc_init_msg(ObExecContext& ctx, const ObPxInitSqcResultMsg&
} else if (OB_ISNULL(sqc)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL ptr", KP(sqc), K(ret));
} else if (OB_FAIL(sqc->get_partitions_info().assign(pkt.partitions_info_))) {
LOG_WARN("Failed to assign partitions info", K(ret));
} else {
sqc->set_task_count(pkt.task_count_);
sqc->set_thread_inited(true);
if (OB_SUCCESS != pkt.rc_) {
ret = pkt.rc_;
update_error_code(coord_info_.first_error_code_, pkt.rc_);
LOG_WARN("fail init sqc, please check remote server log for details",
"remote_server",
sqc->get_exec_addr(),
K(pkt),
KP(ret));
} else if (pkt.task_count_ <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("task count returned by sqc invalid. expect 1 or more", K(pkt), K(ret));
} else if (OB_FAIL(sqc->get_partitions_info().assign(pkt.partitions_info_))) {
LOG_WARN("Failed to assign partitions info", K(ret));
} else {
sqc->set_task_count(pkt.task_count_);
sqc->set_thread_inited(true);
}
}
if (OB_SUCC(ret)) {

View File

@ -307,20 +307,16 @@ int ObOptimizer::check_pdml_supported_feature(const ObDMLStmt& stmt, const ObSQL
LOG_TRACE("dml has constraint, old engine, disable pdml", K(ret));
is_use_pdml = false;
} else {
// check global unique index, update(row movement)
int global_index_cnt = pdml_stmt.get_all_table_columns().at(0).index_dml_infos_.count();
for (int idx = 0; idx < global_index_cnt && OB_SUCC(ret) && is_use_pdml; idx++) {
const ObIArray<ObColumnRefRawExpr*>& column_exprs =
pdml_stmt.get_all_table_columns().at(0).index_dml_infos_.at(idx).column_exprs_;
bool has_unique_index = false;
LOG_TRACE("check pdml unique index", K(column_exprs));
if (OB_FAIL(check_unique_index(column_exprs, has_unique_index))) {
LOG_WARN("failed to check has unique index", K(ret));
} else if (has_unique_index) {
LOG_TRACE("dml has unique index, disable pdml", K(ret));
is_use_pdml = false;
break;
}
// check enabling parallel with local unique index
// 1. disable parallel insert. because parallel unique check not supported
// 2. disable parallel update. only if the unqiue column is updated.
// for now, we blinedly disable PDML if table has unique local index
uint64_t main_table_tid = pdml_stmt.get_all_table_columns().at(0).index_dml_infos_.at(0).index_tid_;
bool with_unique_local_idx = false;
if (OB_FAIL(schema_guard->check_has_local_unique_index(main_table_tid, with_unique_local_idx))) {
LOG_WARN("fail check if table with local unqiue index", K(main_table_tid), K(ret));
} else if (with_unique_local_idx) {
is_use_pdml = false;
}
}
LOG_TRACE("check use all pdml feature", K(ret), K(is_use_pdml));

View File

@ -22,9 +22,15 @@ namespace sql {
struct IndexDMLInfo {
public:
IndexDMLInfo()
{
reset();
}
: table_id_(common::OB_INVALID_ID),
loc_table_id_(common::OB_INVALID_ID),
index_tid_(common::OB_INVALID_ID),
rowkey_cnt_(0),
part_cnt_(common::OB_INVALID_ID),
all_part_num_(0),
need_filter_null_(false),
distinct_algo_(T_DISTINCT_NONE)
{}
inline void reset()
{
table_id_ = common::OB_INVALID_ID;

View File

@ -62,8 +62,8 @@ const char* ObStmtHint::UNNEST_HINT = "UNNEST";
const char* ObStmtHint::NO_UNNEST_HINT = "NO_UNNEST";
const char* ObStmtHint::PLACE_GROUP_BY_HINT = "PLACE_GROUP_BY";
const char* ObStmtHint::NO_PLACE_GROUP_BY_HINT = "NO_PLACE_GROUP_BY";
const char* ObStmtHint::ENABLE_PARALLEL_DML_HINT = "ENABLE_PARALLEL_DML_HINT";
const char* ObStmtHint::DISABLE_PARALLEL_DML_HINT = "DISABLE_PARALLEL_DML_HINT";
const char* ObStmtHint::ENABLE_PARALLEL_DML_HINT = "ENABLE_PARALLEL_DML";
const char* ObStmtHint::DISABLE_PARALLEL_DML_HINT = "DISABLE_PARALLEL_DML";
const char* ObStmtHint::TRACING_HINT = "TRACING";
const char* ObStmtHint::STAT_HINT = "STAT";
const char* ObStmtHint::PX_JOIN_FILTER_HINT = "PX_JOIN_FILTER";