From 9c76339e62463666d7a37ab2337b0e2b9897e95a Mon Sep 17 00:00:00 2001 From: raywill Date: Tue, 27 Jul 2021 17:27:15 +0800 Subject: [PATCH] batch cherry-pick 31x bugfix to opensource branch --- .../virtual_table/ob_show_processlist.cpp | 15 +++++++++ .../virtual_table/ob_show_processlist.h | 8 +++-- .../ob_virtual_sql_plan_monitor.h | 2 +- .../ob_inner_table_schema.10001_10050.cpp | 19 ++++++++++++ .../ob_inner_table_schema.15001_15050.cpp | 15 +++++++++ .../inner_table/ob_inner_table_schema_def.py | 1 + src/share/schema/ob_schema_getter_guard.cpp | 31 +++++++++++++++++++ src/share/schema/ob_schema_getter_guard.h | 1 + src/share/schema/ob_schema_struct.h | 7 +++++ src/share/schema/ob_table_schema.h | 7 +++++ .../ob_system_variable_factory.cpp | 7 +++-- .../ob_system_variable_factory.h | 2 +- src/sql/engine/ob_exec_context.cpp | 12 ++++++- src/sql/engine/ob_exec_context.h | 4 ++- .../px/exchange/ob_px_fifo_coord_op.cpp | 2 +- .../engine/px/exchange/ob_px_ms_coord_op.cpp | 2 +- src/sql/engine/px/exchange/ob_px_receive.cpp | 2 +- src/sql/engine/px/exchange/ob_px_receive.h | 2 +- .../engine/px/exchange/ob_px_receive_op.cpp | 2 +- src/sql/engine/px/ob_px_coord.cpp | 2 +- src/sql/engine/px/ob_px_coord_op.cpp | 2 +- src/sql/engine/px/ob_px_fifo_coord.cpp | 6 ++-- src/sql/engine/px/ob_px_merge_sort_coord.cpp | 2 +- src/sql/engine/px/ob_px_scheduler.cpp | 30 +++++++++++------- src/sql/optimizer/ob_optimizer.cpp | 24 ++++++-------- src/sql/resolver/dml/ob_del_upd_stmt.h | 12 +++++-- src/sql/resolver/dml/ob_sql_hint.cpp | 4 +-- 27 files changed, 174 insertions(+), 49 deletions(-) diff --git a/src/observer/virtual_table/ob_show_processlist.cpp b/src/observer/virtual_table/ob_show_processlist.cpp index 47bbac784..c3b3186c3 100644 --- a/src/observer/virtual_table/ob_show_processlist.cpp +++ b/src/observer/virtual_table/ob_show_processlist.cpp @@ -277,6 +277,20 @@ bool ObShowProcesslist::FillScanner::operator()(sql::ObSQLSessionMgr::Key key, O } break; } + case TRACE_ID: { + if (obmysql::OB_MYSQL_COM_QUERY == sess_info->get_mysql_cmd() || + obmysql::OB_MYSQL_COM_STMT_EXECUTE == sess_info->get_mysql_cmd() || + obmysql::OB_MYSQL_COM_STMT_PREPARE == sess_info->get_mysql_cmd()) { + int len = sess_info->get_last_trace_id().to_string(trace_id_, sizeof(trace_id_)); + cur_row_->cells_[cell_idx].set_varchar(trace_id_, len); + cur_row_->cells_[cell_idx].set_collation_type(default_collation); + } else { + // when cmd=Sleep, we don't want to display its last query trace id + // as it is weird, not the meaning for 'processlist' + cur_row_->cells_[cell_idx].set_null(); + } + break; + } default: { ret = OB_ERR_UNEXPECTED; SERVER_LOG(WARN, "invalid column id", K(ret), K(cell_idx), K(i), K(output_column_ids_), K(col_id)); @@ -301,6 +315,7 @@ void ObShowProcesslist::FillScanner::reset() scanner_ = NULL; cur_row_ = NULL; my_session_ = NULL; + trace_id_[0] = '\0'; output_column_ids_.reset(); } diff --git a/src/observer/virtual_table/ob_show_processlist.h b/src/observer/virtual_table/ob_show_processlist.h index 94c283095..a828b44fc 100644 --- a/src/observer/virtual_table/ob_show_processlist.h +++ b/src/observer/virtual_table/ob_show_processlist.h @@ -56,12 +56,15 @@ private: USER_HOST, TRANS_ID, THREAD_ID, - SSL_CIPHER + SSL_CIPHER, + TRACE_ID }; class FillScanner { public: FillScanner() : allocator_(NULL), scanner_(NULL), cur_row_(NULL), my_session_(NULL), output_column_ids_() - {} + { + trace_id_[0] = '\0'; + } virtual ~FillScanner() {} bool operator()(sql::ObSQLSessionMgr::Key key, sql::ObSQLSessionInfo* sess_info); @@ -80,6 +83,7 @@ private: sql::ObSQLSessionInfo* my_session_; share::schema::ObSchemaGetterGuard* schema_guard_; ObSEArray output_column_ids_; + char trace_id_[common::OB_MAX_TRACE_ID_BUFFER_SIZE]; DISALLOW_COPY_AND_ASSIGN(FillScanner); }; sql::ObSQLSessionMgr* session_mgr_; diff --git a/src/observer/virtual_table/ob_virtual_sql_plan_monitor.h b/src/observer/virtual_table/ob_virtual_sql_plan_monitor.h index 674085202..39c167e7e 100644 --- a/src/observer/virtual_table/ob_virtual_sql_plan_monitor.h +++ b/src/observer/virtual_table/ob_virtual_sql_plan_monitor.h @@ -115,7 +115,7 @@ private: common::ObString ipstr_; int32_t port_; char server_ip_[common::MAX_IP_ADDR_LENGTH + 2]; - char trace_id_[128]; + char trace_id_[common::OB_MAX_TRACE_ID_BUFFER_SIZE]; bool is_first_get_; bool is_use_index_; common::ObSEArray tenant_id_array_; diff --git a/src/share/inner_table/ob_inner_table_schema.10001_10050.cpp b/src/share/inner_table/ob_inner_table_schema.10001_10050.cpp index 3a979e4b0..7f30e4c9c 100644 --- a/src/share/inner_table/ob_inner_table_schema.10001_10050.cpp +++ b/src/share/inner_table/ob_inner_table_schema.10001_10050.cpp @@ -1655,6 +1655,25 @@ int ObInnerTableSchema::all_virtual_processlist_schema(ObTableSchema &table_sche true, //is_nullable false); //is_autoincrement } + + if (OB_SUCC(ret)) { + ObObj trace_id_default; + trace_id_default.set_varchar(ObString::make_string("")); + ADD_COLUMN_SCHEMA_T("trace_id", // column_name + ++column_id, // column_id + 0, // rowkey_id + 0, // index_id + 0, // part_key_pos + ObVarcharType, // column_type + CS_TYPE_INVALID, // column_collation_type + OB_MAX_TRACE_ID_BUFFER_SIZE, // column_length + -1, // column_precision + -1, // column_scale + true, // is_nullable + false, // is_autoincrement + trace_id_default, + trace_id_default); // default_value + } if (OB_SUCC(ret)) { table_schema.get_part_option().set_part_func_type(PARTITION_FUNC_TYPE_HASH); if (OB_FAIL(table_schema.get_part_option().set_part_expr("hash (addr_to_partition_id(svr_ip, svr_port))"))) { diff --git a/src/share/inner_table/ob_inner_table_schema.15001_15050.cpp b/src/share/inner_table/ob_inner_table_schema.15001_15050.cpp index 053716692..a53fab6ca 100644 --- a/src/share/inner_table/ob_inner_table_schema.15001_15050.cpp +++ b/src/share/inner_table/ob_inner_table_schema.15001_15050.cpp @@ -13449,6 +13449,21 @@ int ObInnerTableSchema::all_virtual_processlist_ora_schema(ObTableSchema &table_ true, //is_nullable false); //is_autoincrement } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("TRACE_ID", // column_name + ++column_id, // column_id + 0, // rowkey_id + 0, // index_id + 0, // part_key_pos + ObVarcharType, // column_type + CS_TYPE_UTF8MB4_BIN, // column_collation_type + OB_MAX_TRACE_ID_BUFFER_SIZE, // column_length + 2, // column_precision + -1, // column_scale + true, // is_nullable + false); // is_autoincrement + } if (OB_SUCC(ret)) { table_schema.get_part_option().set_part_func_type(PARTITION_FUNC_TYPE_HASH); if (OB_FAIL(table_schema.get_part_option().set_part_expr("hash (SVR_IP, SVR_PORT)"))) { diff --git a/src/share/inner_table/ob_inner_table_schema_def.py b/src/share/inner_table/ob_inner_table_schema_def.py index 396408edd..7e55cf822 100644 --- a/src/share/inner_table/ob_inner_table_schema_def.py +++ b/src/share/inner_table/ob_inner_table_schema_def.py @@ -4680,6 +4680,7 @@ def_table_schema( ('trans_id', 'uint'), ('thread_id', 'uint'), ('ssl_cipher', 'varchar:OB_MAX_COMMAND_LENGTH', 'true'), + ('trace_id', 'varchar:OB_MAX_TRACE_ID_BUFFER_SIZE', 'true', ''), ], partition_columns = ['svr_ip', 'svr_port'], ) diff --git a/src/share/schema/ob_schema_getter_guard.cpp b/src/share/schema/ob_schema_getter_guard.cpp index 255e0fe31..f4a7a303a 100644 --- a/src/share/schema/ob_schema_getter_guard.cpp +++ b/src/share/schema/ob_schema_getter_guard.cpp @@ -250,6 +250,37 @@ int ObSchemaGetterGuard::get_can_read_index_array(uint64_t table_id, uint64_t* i return ret; } +int ObSchemaGetterGuard::check_has_local_unique_index(uint64_t table_id, bool& has_local_unique_index) +{ + int ret = OB_SUCCESS; + const ObTableSchema* table_schema = NULL; + ObSEArray simple_index_infos; + const ObSimpleTableSchemaV2* index_schema = NULL; + has_local_unique_index = false; + if (OB_FAIL(get_table_schema(table_id, table_schema))) { + LOG_WARN("failed to get table schema", K(ret), K(table_id)); + } else if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("cannot get table schema for table ", K(table_id)); + } else if (OB_FAIL(table_schema->get_simple_index_infos_without_delay_deleted_tid(simple_index_infos))) { + LOG_WARN("get simple_index_infos without delay_deleted_tid failed", K(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < simple_index_infos.count(); ++i) { + if (OB_FAIL(get_table_schema(simple_index_infos.at(i).table_id_, index_schema))) { + LOG_WARN("failed to get table schema", K(ret), K(simple_index_infos.at(i).table_id_)); + } else if (OB_ISNULL(index_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("cannot get index table schema for table ", K(simple_index_infos.at(i).table_id_)); + } else if (OB_UNLIKELY(index_schema->is_final_invalid_index())) { + // invalid index status, need ingore + } else if (index_schema->is_local_unique_index_table()) { + has_local_unique_index = true; + break; + } + } + return ret; +} + int ObSchemaGetterGuard::get_tenant_id(const ObString& tenant_name, uint64_t& tenant_id) { int ret = OB_SUCCESS; diff --git a/src/share/schema/ob_schema_getter_guard.h b/src/share/schema/ob_schema_getter_guard.h index 5b71eab17..16a70a5b1 100644 --- a/src/share/schema/ob_schema_getter_guard.h +++ b/src/share/schema/ob_schema_getter_guard.h @@ -185,6 +185,7 @@ public: */ int get_can_read_index_array(uint64_t table_id, uint64_t* index_tid_array, int64_t& size, bool with_mv, bool with_global_index = true, bool with_domain_index = true); + int check_has_local_unique_index(uint64_t table_id, bool& has_local_unique_index); bool is_tenant_schema_valid(const int64_t tenant_id) const; /* diff --git a/src/share/schema/ob_schema_struct.h b/src/share/schema/ob_schema_struct.h index f4b4bb92a..d55c8e5f6 100644 --- a/src/share/schema/ob_schema_struct.h +++ b/src/share/schema/ob_schema_struct.h @@ -206,8 +206,15 @@ enum ObIndexType { INDEX_TYPE_UNIQUE_GLOBAL = 4, INDEX_TYPE_PRIMARY = 5, INDEX_TYPE_DOMAIN_CTXCAT = 6, + /* create table t1(c1 int primary key, c2 int); + * create index i1 on t1(c2) + * i1 is a global index. + * But we regard i1 as a local index for better access performance. + * Since it is non-partitioned, it's safe to do so. + */ INDEX_TYPE_NORMAL_GLOBAL_LOCAL_STORAGE = 7, INDEX_TYPE_UNIQUE_GLOBAL_LOCAL_STORAGE = 8, + INDEX_TYPE_MAX = 9, }; diff --git a/src/share/schema/ob_table_schema.h b/src/share/schema/ob_table_schema.h index b10aad122..b19069b6c 100644 --- a/src/share/schema/ob_table_schema.h +++ b/src/share/schema/ob_table_schema.h @@ -613,6 +613,7 @@ public: inline bool is_global_local_index_table() const; inline bool is_global_normal_index_table() const; inline bool is_global_unique_index_table() const; + inline bool is_local_unique_index_table() const; inline bool is_domain_index() const; inline static bool is_domain_index(ObIndexType index_type); inline bool is_index_local_storage() const; @@ -1745,6 +1746,12 @@ inline bool ObSimpleTableSchemaV2::is_global_unique_index_table() const return INDEX_TYPE_UNIQUE_GLOBAL == index_type_; } +inline bool ObSimpleTableSchemaV2::is_local_unique_index_table() const +{ + return INDEX_TYPE_UNIQUE_LOCAL == index_type_ + || INDEX_TYPE_UNIQUE_GLOBAL_LOCAL_STORAGE == index_type_; +} + inline bool ObSimpleTableSchemaV2::is_global_local_index_table() const { return INDEX_TYPE_NORMAL_GLOBAL_LOCAL_STORAGE == index_type_ || INDEX_TYPE_UNIQUE_GLOBAL_LOCAL_STORAGE == index_type_; diff --git a/src/share/system_variable/ob_system_variable_factory.cpp b/src/share/system_variable/ob_system_variable_factory.cpp index 5341350fc..f1398b356 100644 --- a/src/share/system_variable/ob_system_variable_factory.cpp +++ b/src/share/system_variable/ob_system_variable_factory.cpp @@ -19,6 +19,9 @@ using namespace oceanbase::common; namespace oceanbase { namespace share { + +const int32_t ObSysVarFactory::OB_MAX_SYS_VAR_ID; + const char* ObSysVarBinlogRowImage::BINLOG_ROW_IMAGE_NAMES[] = {"MINIMAL", "NOBLOB", "FULL", 0}; const char* ObSysVarQueryCacheType::QUERY_CACHE_TYPE_NAMES[] = {"OFF", "ON", "DEMAND", 0}; const char* ObSysVarObReadConsistency::OB_READ_CONSISTENCY_NAMES[] = {"", "FROZEN", "WEAK", "STRONG", 0}; @@ -695,9 +698,9 @@ int ObSysVarFactory::calc_sys_var_store_idx(ObSysVarClassType sys_var_id, int64_ if (ObSysVarsToIdxMap::has_invalid_sys_var_id()) { ret = OB_INVALID_ARGUMENT; LOG_ERROR("has invalid sys var id", K(ret), K(ObSysVarsToIdxMap::has_invalid_sys_var_id())); - } else if (OB_UNLIKELY(var_id < 0)) { + } else if (OB_UNLIKELY(var_id < 0 || var_id >= ObSysVarFactory::OB_MAX_SYS_VAR_ID)) { ret = OB_INVALID_ARGUMENT; - LOG_ERROR("invalid sys var id", K(ret), K(var_id)); + LOG_ERROR("invalid sys var id", K(ret), K(var_id), K(ObSysVarFactory::OB_MAX_SYS_VAR_ID)); } else { real_idx = ObSysVarsToIdxMap::get_store_idx(var_id); if (real_idx < 0) { diff --git a/src/share/system_variable/ob_system_variable_factory.h b/src/share/system_variable/ob_system_variable_factory.h index bfd582721..d055818be 100644 --- a/src/share/system_variable/ob_system_variable_factory.h +++ b/src/share/system_variable/ob_system_variable_factory.h @@ -2670,4 +2670,4 @@ private: } // namespace share } // namespace oceanbase -#endif // OCEANBASE_SHARE_SYSTEM_VARIABLE_OB_SYSTEM_VARIABLE_FACTORY_ \ No newline at end of file +#endif // OCEANBASE_SHARE_SYSTEM_VARIABLE_OB_SYSTEM_VARIABLE_FACTORY_ diff --git a/src/sql/engine/ob_exec_context.cpp b/src/sql/engine/ob_exec_context.cpp index 84dedad67..f89ccb7d9 100644 --- a/src/sql/engine/ob_exec_context.cpp +++ b/src/sql/engine/ob_exec_context.cpp @@ -222,7 +222,8 @@ ObExecContext::ObExecContext() calc_type_(CALC_NORMAL), fixed_id_(OB_INVALID_ID), expr_partition_id_(OB_INVALID_ID), - iters_(256, allocator_) + iters_(256, allocator_), + check_status_times_(0) {} ObExecContext::~ObExecContext() @@ -748,6 +749,15 @@ int ObExecContext::check_status() return ret; } +int ObExecContext::fast_check_status(const int64_t n) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY((check_status_times_++ & n) == n)) { + ret = check_status(); + } + return ret; +} + uint64_t ObExecContext::get_min_cluster_version() const { return task_executor_ctx_.get_min_cluster_version(); diff --git a/src/sql/engine/ob_exec_context.h b/src/sql/engine/ob_exec_context.h index 05f74ef07..2059af974 100644 --- a/src/sql/engine/ob_exec_context.h +++ b/src/sql/engine/ob_exec_context.h @@ -387,6 +387,8 @@ public: ObRawExprFactory* get_expr_factory(); int check_status(); + int fast_check_status(const int64_t n = 0xFF); + void set_outline_params_wrapper(const share::schema::ObOutlineParamsWrapper* params) { outline_params_wrapper_ = params; @@ -807,8 +809,8 @@ protected: int64_t fixed_id_; // fixed part id or fixed subpart ids // for expr values op use int64_t expr_partition_id_; - ObSEArray iters_; + int64_t check_status_times_; private: DISALLOW_COPY_AND_ASSIGN(ObExecContext); }; diff --git a/src/sql/engine/px/exchange/ob_px_fifo_coord_op.cpp b/src/sql/engine/px/exchange/ob_px_fifo_coord_op.cpp index d73fb48fc..d82c89b6d 100644 --- a/src/sql/engine/px/exchange/ob_px_fifo_coord_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_fifo_coord_op.cpp @@ -106,7 +106,7 @@ int ObPxFifoCoordOp::inner_get_next_row() int64_t timeout_us = 0; clear_evaluated_flag(); if (FALSE_IT(timeout_us = phy_plan_ctx->get_timeout_timestamp() - get_timestamp())) { - } else if (OB_FAIL(THIS_WORKER.check_status())) { + } else if (OB_FAIL(ctx_.fast_check_status())) { LOG_WARN("fail check status, maybe px query timeout", K(ret)); // TODO: cleanup } else if (OB_FAIL(msg_loop_.process_one(timeout_us))) { diff --git a/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp b/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp index f0cc8144f..7229b9f44 100644 --- a/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp @@ -208,7 +208,7 @@ int ObPxMSCoordOp::inner_get_next_row() int64_t nth_channel = OB_INVALID_INDEX_INT64; clear_evaluated_flag(); if (FALSE_IT(timeout_us = phy_plan_ctx->get_timeout_timestamp() - get_timestamp())) { - } else if (OB_FAIL(THIS_WORKER.check_status())) { + } else if (OB_FAIL(ctx_.fast_check_status())) { LOG_WARN("fail check status, maybe px query timeout", K(ret)); // TODO: cleanup } else if (OB_FAIL(msg_loop_.process_one_if(&receive_order_, timeout_us, nth_channel))) { diff --git a/src/sql/engine/px/exchange/ob_px_receive.cpp b/src/sql/engine/px/exchange/ob_px_receive.cpp index 1f4433892..5cac69041 100644 --- a/src/sql/engine/px/exchange/ob_px_receive.cpp +++ b/src/sql/engine/px/exchange/ob_px_receive.cpp @@ -445,7 +445,7 @@ int ObPxFifoReceive::inner_get_next_row(ObExecContext& ctx, const common::ObNewR LOG_WARN("get row from channel timeout", K(ret)); } else { usleep(1 * 1000); - int tmp_ret = THIS_WORKER.check_status(); + int tmp_ret = ctx.fast_check_status(); if (OB_SUCCESS != tmp_ret) { LOG_WARN("wait to receive row interrupted", K(tmp_ret), K(ret)); ret = tmp_ret; diff --git a/src/sql/engine/px/exchange/ob_px_receive.h b/src/sql/engine/px/exchange/ob_px_receive.h index af163678a..38cfd772b 100644 --- a/src/sql/engine/px/exchange/ob_px_receive.h +++ b/src/sql/engine/px/exchange/ob_px_receive.h @@ -120,8 +120,8 @@ public: { if (0 == ts_cnt_ % 1000) { ts_ = common::ObTimeUtility::current_time(); - ++ts_cnt_; } + ++ts_cnt_; return ts_; } ObPxTaskChSet& get_ch_set() diff --git a/src/sql/engine/px/exchange/ob_px_receive_op.cpp b/src/sql/engine/px/exchange/ob_px_receive_op.cpp index 9f69ad600..f6d82c597 100644 --- a/src/sql/engine/px/exchange/ob_px_receive_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_receive_op.cpp @@ -402,7 +402,7 @@ int ObPxFifoReceiveOp::inner_get_next_row() LOG_WARN("get row from channel timeout", K(ret)); } else { usleep(1 * 1000); - int tmp_ret = THIS_WORKER.check_status(); + int tmp_ret = ctx_.fast_check_status(); if (OB_SUCCESS != tmp_ret) { LOG_WARN("wait to receive row interrupted", K(tmp_ret), K(ret)); ret = tmp_ret; diff --git a/src/sql/engine/px/ob_px_coord.cpp b/src/sql/engine/px/ob_px_coord.cpp index b5a9b022e..e826de92e 100644 --- a/src/sql/engine/px/ob_px_coord.cpp +++ b/src/sql/engine/px/ob_px_coord.cpp @@ -725,7 +725,7 @@ int ObPxCoord::wait_all_running_dfos_exit(ObExecContext& ctx) const } else if (all_dfo_terminate) { wait_msg = false; LOG_TRACE("all dfo has been terminate", K(ret)); - } else if (OB_FAIL(THIS_WORKER.check_status())) { + } else if (OB_FAIL(ctx.fast_check_status())) { LOG_WARN("fail check status, maybe px query timeout", K(ret)); } else if (OB_FAIL(loop.process_one_if(&control_channels, timeout_us, nth_channel))) { if (OB_EAGAIN == ret) { diff --git a/src/sql/engine/px/ob_px_coord_op.cpp b/src/sql/engine/px/ob_px_coord_op.cpp index 969927bb2..e0c302437 100644 --- a/src/sql/engine/px/ob_px_coord_op.cpp +++ b/src/sql/engine/px/ob_px_coord_op.cpp @@ -664,7 +664,7 @@ int ObPxCoordOp::wait_all_running_dfos_exit() } else if (all_dfo_terminate) { wait_msg = false; LOG_TRACE("all dfo has been terminate", K(ret)); - } else if (OB_FAIL(THIS_WORKER.check_status())) { + } else if (OB_FAIL(ctx_.fast_check_status())) { LOG_WARN("fail check status, maybe px query timeout", K(ret)); } else if (OB_FAIL(loop.process_one_if(&control_channels, timeout_us, nth_channel))) { if (OB_EAGAIN == ret) { diff --git a/src/sql/engine/px/ob_px_fifo_coord.cpp b/src/sql/engine/px/ob_px_fifo_coord.cpp index de493e346..8bf59abb0 100644 --- a/src/sql/engine/px/ob_px_fifo_coord.cpp +++ b/src/sql/engine/px/ob_px_fifo_coord.cpp @@ -134,8 +134,10 @@ int ObPxFifoCoord::inner_get_next_row(ObExecContext& ctx, const common::ObNewRow if (OB_ISNULL(phy_plan_ctx = GET_PHY_PLAN_CTX(ctx))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("phy plan ctx NULL", K(ret)); - } else if (FALSE_IT(timeout_us = phy_plan_ctx->get_timeout_timestamp() - px_ctx->get_timestamp())) { - } else if (OB_FAIL(THIS_WORKER.check_status())) { + } else if ((timeout_us = phy_plan_ctx->get_timeout_timestamp() - px_ctx->get_timestamp()) < 0) { + ret = OB_TIMEOUT; + LOG_WARN("query timeout", K(ret), K(timeout_us), "timeout_ts", THIS_WORKER.get_timeout_ts()); + } else if (OB_FAIL(ctx.fast_check_status())) { LOG_WARN("fail check status, maybe px query timeout", K(ret)); // TODO: cleanup } else if (OB_FAIL(loop.process_one(timeout_us))) { diff --git a/src/sql/engine/px/ob_px_merge_sort_coord.cpp b/src/sql/engine/px/ob_px_merge_sort_coord.cpp index 33c29685b..e2eb515f1 100644 --- a/src/sql/engine/px/ob_px_merge_sort_coord.cpp +++ b/src/sql/engine/px/ob_px_merge_sort_coord.cpp @@ -183,7 +183,7 @@ int ObPxMergeSortCoord::inner_get_next_row(ObExecContext& ctx, const common::ObN ret = OB_ERR_UNEXPECTED; LOG_WARN("phy plan ctx NULL", K(ret)); } else if (FALSE_IT(timeout_us = phy_plan_ctx->get_timeout_timestamp() - px_ctx->get_timestamp())) { - } else if (OB_FAIL(THIS_WORKER.check_status())) { + } else if (OB_FAIL(ctx.fast_check_status())) { LOG_WARN("fail check status, maybe px query timeout", K(ret)); } else if (OB_FAIL(loop.process_one_if(&px_ctx->receive_order_, timeout_us, nth_channel))) { if (OB_EAGAIN == ret) { diff --git a/src/sql/engine/px/ob_px_scheduler.cpp b/src/sql/engine/px/ob_px_scheduler.cpp index 86f2c1d74..74159925b 100644 --- a/src/sql/engine/px/ob_px_scheduler.cpp +++ b/src/sql/engine/px/ob_px_scheduler.cpp @@ -129,14 +129,7 @@ int ObPxMsgProc::on_sqc_init_msg(ObExecContext& ctx, const ObPxInitSqcResultMsg& ObDfo* edge = NULL; ObPxSqcMeta* sqc = NULL; - if (OB_SUCCESS != pkt.rc_) { - ret = pkt.rc_; - update_error_code(coord_info_.first_error_code_, pkt.rc_); - LOG_WARN("fail init sqc", K(pkt), K(ret)); - } else if (pkt.task_count_ <= 0) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("task count returned by sqc invalid. expect 1 or more", K(pkt), K(ret)); - } else if (OB_FAIL(coord_info_.dfo_mgr_.find_dfo_edge(pkt.dfo_id_, edge))) { + if (OB_FAIL(coord_info_.dfo_mgr_.find_dfo_edge(pkt.dfo_id_, edge))) { LOG_WARN("fail find dfo", K(pkt), K(ret)); } else if (OB_ISNULL(edge)) { ret = OB_ERR_UNEXPECTED; @@ -146,11 +139,24 @@ int ObPxMsgProc::on_sqc_init_msg(ObExecContext& ctx, const ObPxInitSqcResultMsg& } else if (OB_ISNULL(sqc)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("NULL ptr", KP(sqc), K(ret)); - } else if (OB_FAIL(sqc->get_partitions_info().assign(pkt.partitions_info_))) { - LOG_WARN("Failed to assign partitions info", K(ret)); } else { - sqc->set_task_count(pkt.task_count_); - sqc->set_thread_inited(true); + if (OB_SUCCESS != pkt.rc_) { + ret = pkt.rc_; + update_error_code(coord_info_.first_error_code_, pkt.rc_); + LOG_WARN("fail init sqc, please check remote server log for details", + "remote_server", + sqc->get_exec_addr(), + K(pkt), + KP(ret)); + } else if (pkt.task_count_ <= 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("task count returned by sqc invalid. expect 1 or more", K(pkt), K(ret)); + } else if (OB_FAIL(sqc->get_partitions_info().assign(pkt.partitions_info_))) { + LOG_WARN("Failed to assign partitions info", K(ret)); + } else { + sqc->set_task_count(pkt.task_count_); + sqc->set_thread_inited(true); + } } if (OB_SUCC(ret)) { diff --git a/src/sql/optimizer/ob_optimizer.cpp b/src/sql/optimizer/ob_optimizer.cpp index d56bb3e0e..56180ff3d 100644 --- a/src/sql/optimizer/ob_optimizer.cpp +++ b/src/sql/optimizer/ob_optimizer.cpp @@ -307,20 +307,16 @@ int ObOptimizer::check_pdml_supported_feature(const ObDMLStmt& stmt, const ObSQL LOG_TRACE("dml has constraint, old engine, disable pdml", K(ret)); is_use_pdml = false; } else { - // check global unique index, update(row movement) - int global_index_cnt = pdml_stmt.get_all_table_columns().at(0).index_dml_infos_.count(); - for (int idx = 0; idx < global_index_cnt && OB_SUCC(ret) && is_use_pdml; idx++) { - const ObIArray& column_exprs = - pdml_stmt.get_all_table_columns().at(0).index_dml_infos_.at(idx).column_exprs_; - bool has_unique_index = false; - LOG_TRACE("check pdml unique index", K(column_exprs)); - if (OB_FAIL(check_unique_index(column_exprs, has_unique_index))) { - LOG_WARN("failed to check has unique index", K(ret)); - } else if (has_unique_index) { - LOG_TRACE("dml has unique index, disable pdml", K(ret)); - is_use_pdml = false; - break; - } + // check enabling parallel with local unique index + // 1. disable parallel insert. because parallel unique check not supported + // 2. disable parallel update. only if the unqiue column is updated. + // for now, we blinedly disable PDML if table has unique local index + uint64_t main_table_tid = pdml_stmt.get_all_table_columns().at(0).index_dml_infos_.at(0).index_tid_; + bool with_unique_local_idx = false; + if (OB_FAIL(schema_guard->check_has_local_unique_index(main_table_tid, with_unique_local_idx))) { + LOG_WARN("fail check if table with local unqiue index", K(main_table_tid), K(ret)); + } else if (with_unique_local_idx) { + is_use_pdml = false; } } LOG_TRACE("check use all pdml feature", K(ret), K(is_use_pdml)); diff --git a/src/sql/resolver/dml/ob_del_upd_stmt.h b/src/sql/resolver/dml/ob_del_upd_stmt.h index e11e6b907..6606acced 100644 --- a/src/sql/resolver/dml/ob_del_upd_stmt.h +++ b/src/sql/resolver/dml/ob_del_upd_stmt.h @@ -22,9 +22,15 @@ namespace sql { struct IndexDMLInfo { public: IndexDMLInfo() - { - reset(); - } + : table_id_(common::OB_INVALID_ID), + loc_table_id_(common::OB_INVALID_ID), + index_tid_(common::OB_INVALID_ID), + rowkey_cnt_(0), + part_cnt_(common::OB_INVALID_ID), + all_part_num_(0), + need_filter_null_(false), + distinct_algo_(T_DISTINCT_NONE) + {} inline void reset() { table_id_ = common::OB_INVALID_ID; diff --git a/src/sql/resolver/dml/ob_sql_hint.cpp b/src/sql/resolver/dml/ob_sql_hint.cpp index 8fef16249..7935f7b72 100644 --- a/src/sql/resolver/dml/ob_sql_hint.cpp +++ b/src/sql/resolver/dml/ob_sql_hint.cpp @@ -62,8 +62,8 @@ const char* ObStmtHint::UNNEST_HINT = "UNNEST"; const char* ObStmtHint::NO_UNNEST_HINT = "NO_UNNEST"; const char* ObStmtHint::PLACE_GROUP_BY_HINT = "PLACE_GROUP_BY"; const char* ObStmtHint::NO_PLACE_GROUP_BY_HINT = "NO_PLACE_GROUP_BY"; -const char* ObStmtHint::ENABLE_PARALLEL_DML_HINT = "ENABLE_PARALLEL_DML_HINT"; -const char* ObStmtHint::DISABLE_PARALLEL_DML_HINT = "DISABLE_PARALLEL_DML_HINT"; +const char* ObStmtHint::ENABLE_PARALLEL_DML_HINT = "ENABLE_PARALLEL_DML"; +const char* ObStmtHint::DISABLE_PARALLEL_DML_HINT = "DISABLE_PARALLEL_DML"; const char* ObStmtHint::TRACING_HINT = "TRACING"; const char* ObStmtHint::STAT_HINT = "STAT"; const char* ObStmtHint::PX_JOIN_FILTER_HINT = "PX_JOIN_FILTER";