[CP] Fix px worker check status always return OB_SUCCESS

This commit is contained in:
qianchanger
2023-01-12 09:38:12 +00:00
committed by ob-robot
parent 8ffeb21dc9
commit fd39c584f3
18 changed files with 270 additions and 43 deletions

View File

@ -117,6 +117,7 @@ protected:
bool &has_set_stop() { return stop_; }
int64_t get_thread_count() const { return n_threads_; }
uint64_t get_thread_idx() const { return thread_idx_; }
void set_thread_idx(int64_t idx) { thread_idx_ = idx; }
private:
virtual void run(int64_t idx);

View File

@ -47,6 +47,7 @@
#include "rpc/obrpc/ob_rpc_packet.h"
#include "lib/container/ob_array.h"
#include "share/rc/ob_tenant_module_init_ctx.h"
#include "sql/engine/px/ob_px_worker.h"
using namespace oceanbase::lib;
using namespace oceanbase::common;
@ -222,6 +223,14 @@ void ObPxPool::set_px_thread_name()
lib::set_thread_name_inner(buf);
}
void ObPxPool::run(int64_t idx)
{
set_thread_idx(idx);
// Create worker for current thread.
ObPxWorker worker;
run1();
}
void ObPxPool::run1()
{
int ret = OB_SUCCESS;

View File

@ -55,6 +55,7 @@ class ObPxPool
: public share::ObThreadPool
{
using RunFuncT = std::function<void ()>;
void run(int64_t idx) final;
void run1() final;
static const int64_t QUEUE_WAIT_TIME = 100 * 1000;

View File

@ -110,6 +110,7 @@ ob_set_subtarget(ob_rootserver freeze
freeze/ob_major_freeze_service.cpp
freeze/ob_major_freeze_rpc_define.cpp
freeze/ob_major_freeze_helper.cpp
freeze/ob_major_freeze_util.cpp
)
ob_set_subtarget(ob_rootserver restore

View File

@ -15,6 +15,7 @@
#include "rootserver/freeze/ob_checksum_validator.h"
#include "rootserver/freeze/ob_freeze_info_manager.h"
#include "rootserver/freeze/ob_zone_merge_manager.h"
#include "rootserver/freeze/ob_major_freeze_util.h"
#include "rootserver/ob_root_utils.h"
#include "lib/mysqlclient/ob_mysql_proxy.h"
#include "lib/mysqlclient/ob_isql_client.h"
@ -301,6 +302,7 @@ bool ObIndexChecksumValidator::need_validate() const
}
int ObIndexChecksumValidator::validate_checksum(
const volatile bool &stop,
const SCN &frozen_scn,
const hash::ObHashMap<ObTabletLSPair, ObTabletCompactionStatus> &tablet_compaction_map,
int64_t &table_count,
@ -308,7 +310,10 @@ int ObIndexChecksumValidator::validate_checksum(
const int64_t expected_epoch)
{
int ret = OB_SUCCESS;
if ((!frozen_scn.is_valid()) || (tablet_compaction_map.empty())) {
if (stop) {
ret = OB_CANCELED;
LOG_WARN("already stop", KR(ret), K_(tenant_id));
} else if ((!frozen_scn.is_valid()) || (tablet_compaction_map.empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K_(tenant_id), K(frozen_scn));
} else if (IS_NOT_INIT) {
@ -317,14 +322,15 @@ int ObIndexChecksumValidator::validate_checksum(
} else if (!is_primary_cluster()) {
ret = OB_INNER_STAT_ERROR;
LOG_WARN("can only check index column checksum in primary cluster", KR(ret));
} else if (OB_FAIL(check_all_table_verification_finished(frozen_scn, tablet_compaction_map, table_count,
table_compaction_map, expected_epoch))) {
} else if (OB_FAIL(check_all_table_verification_finished(stop, frozen_scn, tablet_compaction_map,
table_count, table_compaction_map, expected_epoch))) {
LOG_WARN("fail to check all table verification finished", KR(ret), K_(tenant_id), K(frozen_scn));
}
return ret;
}
int ObIndexChecksumValidator::check_all_table_verification_finished(
const volatile bool &stop,
const SCN &frozen_scn,
const hash::ObHashMap<ObTabletLSPair, ObTabletCompactionStatus> &tablet_compaction_map,
int64_t &table_count,
@ -338,13 +344,16 @@ int ObIndexChecksumValidator::check_all_table_verification_finished(
ObSchemaGetterGuard schema_guard;
SMART_VARS_2((ObArray<const ObSimpleTableSchemaV2 *>, table_schemas),
(ObArray<uint64_t>, table_ids)) {
if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_full_schema_guard(tenant_id_, schema_guard))) {
if (stop) {
ret = OB_CANCELED;
LOG_WARN("already stop", KR(ret), K_(tenant_id));
} else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_full_schema_guard(tenant_id_, schema_guard))) {
LOG_WARN("fail to get tenant schema guard", KR(ret), K_(tenant_id));
} else if (OB_FAIL(schema_guard.get_table_schemas_in_tenant(tenant_id_, table_schemas))) {
LOG_WARN("fail to get tenant table schemas", KR(ret), K_(tenant_id));
} else {
table_count = table_schemas.count();
for (int64_t i = 0; (i < table_schemas.count()) && OB_SUCC(ret); ++i) {
for (int64_t i = 0; (i < table_schemas.count()) && OB_SUCC(ret) && !stop; ++i) {
const ObSimpleTableSchemaV2 *simple_schema = table_schemas.at(i);
if (OB_ISNULL(simple_schema)) {
ret = OB_ERR_UNEXPECTED;
@ -423,6 +432,7 @@ int ObIndexChecksumValidator::check_all_table_verification_finished(
}
#endif
// both tables' all tablets finished compaction, we should validate column checksum.
FREEZE_TIME_GUARD;
if (OB_FAIL(ObTabletReplicaChecksumOperator::check_column_checksum(tenant_id_,
*data_table_schema, *table_schema, frozen_scn, *sql_proxy_, expected_epoch))) {
if (OB_CHECKSUM_ERROR == ret) {
@ -488,7 +498,7 @@ int ObIndexChecksumValidator::check_all_table_verification_finished(
if (OB_SUCC(ret) && (OB_SUCCESS == check_ret)) {
ObArray<uint64_t> removed_table_ids; // record the table_id which will be removed
hash::ObHashMap<uint64_t, ObTableCompactionInfo>::iterator iter = table_compaction_map.begin();
for (;OB_SUCC(ret) && (iter != table_compaction_map.end()); ++iter) {
for (;!stop && OB_SUCC(ret) && (iter != table_compaction_map.end()); ++iter) {
const uint64_t cur_table_id = iter->first;
if (exist_in_table_array(cur_table_id, table_ids)) {
const ObTableCompactionInfo &compaction_info = iter->second;
@ -518,7 +528,6 @@ int ObIndexChecksumValidator::check_all_table_verification_finished(
if (check_ret == OB_CHECKSUM_ERROR) {
ret = check_ret;
}
return ret;
}
@ -591,6 +600,7 @@ int ObIndexChecksumValidator::handle_table_compaction_finished(
} else {
if (table_schema->has_tablet()) {
SMART_VAR(ObArray<ObTabletLSPair>, pairs) {
FREEZE_TIME_GUARD;
if (OB_FAIL(ObTabletReplicaChecksumOperator::get_tablet_ls_pairs(tenant_id_, *table_schema, *sql_proxy_, pairs))) {
LOG_WARN("fail to get tablet_ls pairs", KR(ret), K_(tenant_id), K(table_id));
} else if (pairs.count() < 1) {
@ -649,6 +659,7 @@ int ObIndexChecksumValidator::check_table_compaction_finished(
SMART_VAR(ObArray<ObTabletID>, tablet_ids) {
SMART_VAR(ObArray<ObTabletLSPair>, pairs) {
if (table_schema.has_tablet()) {
FREEZE_TIME_GUARD;
if (OB_FAIL(table_schema.get_tablet_ids(tablet_ids))) {
LOG_WARN("fail to get tablet_ids from table schema", KR(ret), K(table_schema));
} else if (OB_FAIL(ObTabletReplicaChecksumOperator::get_tablet_ls_pairs(tenant_id_, table_id,
@ -689,6 +700,7 @@ int ObIndexChecksumValidator::check_table_compaction_finished(
}
// if current table 'has tablet' & 'finished compaction' & 'not skip verifying', verify tablet replica checksum
if (OB_SUCC(ret) && latest_compaction_info.is_compacted()) {
FREEZE_TIME_GUARD;
if (OB_FAIL(ObTabletReplicaChecksumOperator::check_tablet_replica_checksum(tenant_id_, pairs, frozen_scn,
*sql_proxy_))) {
if (OB_CHECKSUM_ERROR == ret) {

View File

@ -104,7 +104,8 @@ public:
virtual ~ObIndexChecksumValidator() {}
public:
int validate_checksum(const share::SCN &frozen_scn,
int validate_checksum(const volatile bool &stop,
const share::SCN &frozen_scn,
const hash::ObHashMap<share::ObTabletLSPair, share::ObTabletCompactionStatus> &tablet_compaction_map,
int64_t &table_count,
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
@ -113,7 +114,8 @@ public:
private:
// valid '<data table, index table>' pair should finish index column checksum verification, other tables just skip verification.
int check_all_table_verification_finished(const share::SCN &frozen_scn,
int check_all_table_verification_finished(const volatile bool &stop,
const share::SCN &frozen_scn,
const hash::ObHashMap<share::ObTabletLSPair, share::ObTabletCompactionStatus> &tablet_compaction_map,
int64_t &table_count,
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,

View File

@ -243,35 +243,44 @@ int ObMajorFreezeService::check_inner_stat()
void ObMajorFreezeService::stop()
{
LOG_INFO("major_freeze_service start to stop", K_(tenant_id));
ObRecursiveMutexGuard guard(lock_);
SpinRLockGuard r_guard(rw_lock_);
if (OB_NOT_NULL(tenant_major_freeze_)) {
LOG_INFO("tenant_major_freeze_ start to stop", K_(tenant_id));
tenant_major_freeze_->stop();
}
LOG_INFO("major_freeze_service finish to stop", K_(tenant_id));
}
void ObMajorFreezeService::wait()
{
LOG_INFO("major_freeze_service start to wait", K_(tenant_id));
ObRecursiveMutexGuard guard(lock_);
SpinRLockGuard r_guard(rw_lock_);
int ret = OB_SUCCESS;
if (OB_NOT_NULL(tenant_major_freeze_)) {
LOG_INFO("tenant_major_freeze_ start to wait", K_(tenant_id));
if (OB_FAIL(tenant_major_freeze_->wait())) {
LOG_WARN("fail to wait", KR(ret), K_(tenant_id));
}
}
LOG_INFO("major_freeze_service finish to wait", K_(tenant_id));
}
void ObMajorFreezeService::destroy()
{
LOG_INFO("major_freeze_service start to destroy", K_(tenant_id));
ObRecursiveMutexGuard guard(lock_);
SpinRLockGuard r_guard(rw_lock_);
int ret = OB_SUCCESS;
if (OB_NOT_NULL(tenant_major_freeze_)) {
LOG_INFO("tenant_major_freeze_ start to destroy", K_(tenant_id));
if (OB_FAIL(tenant_major_freeze_->destroy())) {
LOG_WARN("fail to destroy", KR(ret), K_(tenant_id));
}
}
LOG_INFO("major_freeze_service finish to destroy", K_(tenant_id));
}
int ObMajorFreezeService::mtl_init(ObMajorFreezeService *&service)

View File

@ -0,0 +1,57 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX RS
#include "rootserver/freeze/ob_major_freeze_util.h"
#include "lib/time/ob_time_utility.h"
#include "share/ob_define.h"
namespace oceanbase
{
namespace rootserver
{
ObFreezeTimeGuard::ObFreezeTimeGuard(
const char *file,
const int64_t line,
const char *func,
const char *mod)
: warn_threshold_us_(FREEZE_WARN_THRESHOLD_US),
start_time_us_(common::ObTimeUtility::fast_current_time()),
file_(file),
line_(line),
func_name_(func),
log_mod_(mod)
{
}
ObFreezeTimeGuard::~ObFreezeTimeGuard()
{
int64_t now_us = common::ObTimeUtility::fast_current_time();
int64_t total_cost_us = now_us - start_time_us_;
if (OB_UNLIKELY(total_cost_us >= warn_threshold_us_)) {
constexpr int buffer_size = 256;
char strbuffer[buffer_size] = { 0 };
int n = snprintf(strbuffer, buffer_size, "cost too much time: %s (%s:%ld), ",
func_name_, strrchr(file_, '/') ? (strrchr(file_, '/') + 1) : file_, line_);
if (n >= buffer_size) {
snprintf(&strbuffer[buffer_size - 6], 6, "..., ");
}
common::OB_PRINT(log_mod_, OB_LOG_LEVEL_DIRECT(WARN), strbuffer,
LOG_KVS(K_(warn_threshold_us), K(total_cost_us), K_(start_time_us), K(now_us)));
}
}
} // end namespace rootserver
} // end namespace oceanbase

View File

@ -0,0 +1,50 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_ROOTSERVER_FREEZE_OB_MAJOR_FREEZE_UTIL_H_
#define OCEANBASE_ROOTSERVER_FREEZE_OB_MAJOR_FREEZE_UTIL_H_
#include <stdint.h>
namespace oceanbase
{
namespace rootserver
{
#define FREEZE_TIME_GUARD \
rootserver::ObFreezeTimeGuard freeze_time_guard(__FILE__, __LINE__, __FUNCTION__, "[RS] ")
class ObFreezeTimeGuard
{
public:
ObFreezeTimeGuard(const char *file,
const int64_t line,
const char *func,
const char *mod);
virtual ~ObFreezeTimeGuard();
private:
const int64_t FREEZE_WARN_THRESHOLD_US = 10 * 1000L * 1000L; // 10s
private:
const int64_t warn_threshold_us_;
const int64_t start_time_us_;
const char * const file_;
const int64_t line_;
const char * const func_name_;
const char * const log_mod_;
};
} // end namespace rootserver
} // end namespace oceanbase
#endif // OCEANBASE_ROOTSERVER_FREEZE_OB_MAJOR_FREEZE_UTIL_H_

View File

@ -14,6 +14,7 @@
#include "rootserver/freeze/ob_major_merge_progress_checker.h"
#include "rootserver/freeze/ob_zone_merge_manager.h"
#include "rootserver/freeze/ob_major_freeze_util.h"
#include "share/schema/ob_schema_getter_guard.h"
#include "share/tablet/ob_tablet_table_operator.h"
#include "share/tablet/ob_tablet_table_iterator.h"
@ -181,8 +182,17 @@ int ObMajorMergeProgressChecker::check_merge_progress(
LOG_WARN("fail to generate tablet table map", K_(tenant_id), KR(ret));
} else {
ObTabletInfo tablet_info;
while (!stop && OB_SUCC(ret) && OB_SUCC(iter.next(tablet_info))) {
if (!tablet_info.is_valid()) {
while (!stop && OB_SUCC(ret)) {
{
FREEZE_TIME_GUARD;
if (OB_FAIL(iter.next(tablet_info))) {
if (OB_ITER_END != ret) {
LOG_WARN("fail to get next tablet_info", KR(ret), K_(tenant_id), K(stop));
}
}
}
if (OB_FAIL(ret)) {
} else if (!tablet_info.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("iterate invalid tablet info", KR(ret), K(tablet_info));
} else if (OB_FAIL(check_tablet(tablet_info, tablet_map, all_progress, global_broadcast_scn,
@ -258,10 +268,14 @@ int ObMajorMergeProgressChecker::check_tablet(
ObLSInfo ls_info;
int64_t cluster_id = GCONF.cluster_id;
const ObLSID &ls_id = tablet_info.get_ls_id();
if (OB_FAIL(lst_operator_->get(cluster_id, tenant_id_,
ls_id, share::ObLSTable::DEFAULT_MODE, ls_info))) {
LOG_WARN("fail to get ls info", KR(ret), K_(tenant_id), K(ls_id));
} else if (OB_FAIL(check_majority_integrated(schema_guard, tablet_info, ls_info))) {
{
FREEZE_TIME_GUARD;
if (OB_FAIL(lst_operator_->get(cluster_id, tenant_id_,
ls_id, share::ObLSTable::DEFAULT_MODE, ls_info))) {
LOG_WARN("fail to get ls info", KR(ret), K_(tenant_id), K(ls_id));
}
}
if (FAILEDx(check_majority_integrated(schema_guard, tablet_info, ls_info))) {
LOG_WARN("fail to check majority integrated", KR(ret), K(tablet_info), K(ls_info));
} else if (OB_FAIL(check_tablet_compaction_scn(all_progress, global_broadcast_scn, tablet_info, ls_info))) {
LOG_WARN("fail to check data version", KR(ret), K(tablet_info), K(ls_info));
@ -458,13 +472,18 @@ int ObMajorMergeProgressChecker::get_associated_replica_num(
}
int ObMajorMergeProgressChecker::check_verification(
const volatile bool &stop,
const share::SCN &global_broadcast_scn,
const int64_t expected_epoch)
{
int ret = OB_SUCCESS;
if (!tablet_compaction_map_.empty()) {
if (index_validator_.need_validate() && OB_FAIL(index_validator_.validate_checksum(global_broadcast_scn,
tablet_compaction_map_, table_count_, table_compaction_map_, expected_epoch))) {
if (stop) {
ret = OB_CANCELED;
LOG_WARN("already stop", KR(ret), K_(tenant_id));
} else if (!tablet_compaction_map_.empty()) {
if (index_validator_.need_validate() && OB_FAIL(index_validator_.validate_checksum(stop,
global_broadcast_scn, tablet_compaction_map_, table_count_, table_compaction_map_,
expected_epoch))) {
LOG_WARN("fail to validate checksum of index validator", KR(ret), K(global_broadcast_scn),
K(expected_epoch));
}

View File

@ -62,7 +62,8 @@ public:
const share::SCN &global_broadcast_scn,
share::ObAllZoneMergeProgress &all_progress);
int check_verification(const share::SCN &global_broadcast_scn,
int check_verification(const volatile bool &stop,
const share::SCN &global_broadcast_scn,
const int64_t expected_epoch);
// @exist_uncompacted means not all table finished compaction

View File

@ -20,11 +20,13 @@
#include "rootserver/freeze/ob_freeze_info_manager.h"
#include "rootserver/ob_rs_event_history_table_operator.h"
#include "rootserver/freeze/ob_tenant_all_zone_merge_strategy.h"
#include "rootserver/freeze/ob_major_freeze_util.h"
#include "lib/container/ob_array.h"
#include "lib/container/ob_se_array.h"
#include "lib/container/ob_array_iterator.h"
#include "lib/container/ob_se_array_iterator.h"
#include "lib/allocator/page_arena.h"
#include "lib/profile/ob_trace_id.h"
#include "share/ob_errno.h"
#include "share/config/ob_server_config.h"
#include "share/tablet/ob_tablet_table_iterator.h"
@ -257,6 +259,7 @@ int ObMajorMergeScheduler::do_before_major_merge(const int64_t expected_epoch)
int ret = OB_SUCCESS;
share::SCN global_broadcast_scn;
global_broadcast_scn.set_min();
FREEZE_TIME_GUARD;
if (OB_FAIL(progress_checker_.prepare_handle())) {
LOG_WARN("fail to do prepare handle of progress checker", KR(ret));
@ -279,6 +282,7 @@ int ObMajorMergeScheduler::do_one_round_major_merge(const int64_t expected_epoch
// loop until 'this round major merge finished' or 'epoch changed'
while (!stop_ && !is_paused()) {
update_last_run_timestamp();
ObCurTraceId::init(GCONF.self_addr_);
ObZoneArray to_merge_zone;
// Place is_last_merge_complete() to the head of this while loop.
// So as to break this loop at once, when the last merge is complete.
@ -372,7 +376,7 @@ int ObMajorMergeScheduler::schedule_zones_to_merge(
// set zone merging flag
if (OB_SUCC(ret)) {
HEAP_VAR(ObZoneMergeInfo, tmp_info) {
FOREACH_CNT_X(zone, to_merge, OB_SUCCESS == ret) {
FOREACH_CNT_X(zone, to_merge, (OB_SUCCESS == ret) && !stop_) {
tmp_info.reset();
tmp_info.tenant_id_ = tenant_id_;
tmp_info.zone_ = *zone;
@ -411,7 +415,7 @@ int ObMajorMergeScheduler::start_zones_merge(const ObZoneArray &to_merge, const
} else if (OB_FAIL(zone_merge_mgr_->get_global_broadcast_scn(global_broadcast_scn))) {
LOG_WARN("fail to get_global_broadcast_scn", KR(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && (i < to_merge.count()); ++i) {
for (int64_t i = 0; !stop_ && OB_SUCC(ret) && (i < to_merge.count()); ++i) {
HEAP_VAR(ObZoneMergeInfo, tmp_info) {
tmp_info.tenant_id_ = tenant_id_;
tmp_info.zone_ = to_merge.at(i);
@ -458,7 +462,7 @@ int ObMajorMergeScheduler::update_merge_status(const int64_t expected_epoch)
"global_broadcast_scn", global_broadcast_scn.get_val_for_inner_table_field(),
"service_addr", GCONF.self_addr_);
}
} else if (OB_FAIL(progress_checker_.check_verification(global_broadcast_scn, expected_epoch))) {
} else if (OB_FAIL(progress_checker_.check_verification(stop_, global_broadcast_scn, expected_epoch))) {
LOG_WARN("fail to check verification", KR(ret), K_(tenant_id), K(global_broadcast_scn));
int64_t time_interval = 10L * 60 * 1000 * 1000; // record every 10 minutes
if (TC_REACH_TIME_INTERVAL(time_interval)) {
@ -496,7 +500,7 @@ int ObMajorMergeScheduler::handle_all_zone_merge(
} else {
// 1. check all zone finished compaction
HEAP_VAR(ObZoneMergeInfo, info) {
FOREACH_X(progress, all_progress, OB_SUCC(ret)) {
FOREACH_X(progress, all_progress, OB_SUCC(ret) && !stop_) {
const ObZone &zone = progress->zone_;
bool merged = false;
info.reset();
@ -703,6 +707,7 @@ int ObMajorMergeScheduler::try_update_epoch_and_reload()
}
LOG_WARN("fail to update freeze_service_epoch", KR(ret), K(ori_epoch), K(latest_epoch));
} else {
FREEZE_TIME_GUARD;
if (OB_FAIL(set_epoch(latest_epoch))) {
LOG_WARN("fail to set epoch", KR(ret), K(latest_epoch));
} else if (OB_FAIL(zone_merge_mgr_->reload())) {
@ -735,6 +740,7 @@ int ObMajorMergeScheduler::do_update_freeze_service_epoch(
{
int ret = OB_SUCCESS;
int64_t affected_rows = 0;
FREEZE_TIME_GUARD;
if (OB_FAIL(ObServiceEpochProxy::update_service_epoch(*sql_proxy_, tenant_id_,
ObServiceEpochProxy::FREEZE_SERVICE_EPOCH, latest_epoch, affected_rows))) {
LOG_WARN("fail to update freeze_service_epoch", KR(ret), K(latest_epoch), K_(tenant_id));
@ -750,6 +756,7 @@ int ObMajorMergeScheduler::update_all_tablets_report_scn(
const uint64_t global_braodcast_scn_val)
{
int ret = OB_SUCCESS;
FREEZE_TIME_GUARD;
if (OB_FAIL(ObTabletMetaTableCompactionOperator::batch_update_report_scn(
tenant_id_,
global_braodcast_scn_val,
@ -793,10 +800,13 @@ void ObMajorMergeScheduler::check_merge_interval_time(const bool is_merging)
ObServerTableOperator st_operator;
if (OB_FAIL(st_operator.init(sql_proxy_))) {
LOG_WARN("fail to init server table operator", K(ret), K_(tenant_id));
} else if (OB_FAIL(st_operator.get_start_service_time(GCONF.self_addr_, start_service_time))) {
LOG_WARN("fail to get start service time", KR(ret), K_(tenant_id));
} else {
total_service_time = now - start_service_time;
FREEZE_TIME_GUARD;
if (OB_FAIL(st_operator.get_start_service_time(GCONF.self_addr_, start_service_time))) {
LOG_WARN("fail to get start service time", KR(ret), K_(tenant_id));
} else {
total_service_time = now - start_service_time;
}
}
}
// In order to avoid LOG_ERROR when the tenant miss daily merge due to the cluster restarted.

View File

@ -85,16 +85,22 @@ int ObTenantMajorFreeze::start()
void ObTenantMajorFreeze::stop()
{
LOG_INFO("daily_launcher start to stop", K_(tenant_id));
daily_launcher_.stop();
LOG_INFO("freeze_info_detector start to stop", K_(tenant_id));
freeze_info_detector_.stop();
LOG_INFO("merge_scheduler start to stop", K_(tenant_id));
merge_scheduler_.stop();
}
int ObTenantMajorFreeze::wait()
{
int ret = OB_SUCCESS;
LOG_INFO("daily_launcher start to wait", K_(tenant_id));
daily_launcher_.wait();
LOG_INFO("freeze_info_detector start to wait", K_(tenant_id));
freeze_info_detector_.wait();
LOG_INFO("merge_scheduler start to wait", K_(tenant_id));
merge_scheduler_.wait();
return ret;
}
@ -102,12 +108,21 @@ int ObTenantMajorFreeze::wait()
int ObTenantMajorFreeze::destroy()
{
int ret = OB_SUCCESS;
LOG_INFO("daily_launcher start to destroy", K_(tenant_id));
if (OB_FAIL(daily_launcher_.destroy())) {
LOG_WARN("fail to destroy daily_launcher", KR(ret), K_(tenant_id));
} else if (OB_FAIL(freeze_info_detector_.destroy())) {
LOG_WARN("fail to destroy freeze_info_detector", KR(ret), K_(tenant_id));
} else if (OB_FAIL(merge_scheduler_.destroy())) {
LOG_WARN("fail to destroy merge_scheduler", KR(ret), K_(tenant_id));
}
if (OB_SUCC(ret)) {
LOG_INFO("freeze_info_detector start to destroy", K_(tenant_id));
if (OB_FAIL(freeze_info_detector_.destroy())) {
LOG_WARN("fail to destroy freeze_info_detector", KR(ret), K_(tenant_id));
}
}
if (OB_SUCC(ret)) {
LOG_INFO("merge_scheduler start to destroy", K_(tenant_id));
if (OB_FAIL(merge_scheduler_.destroy())) {
LOG_WARN("fail to destroy merge_scheduler", KR(ret), K_(tenant_id));
}
}
return ret;
}

View File

@ -23,6 +23,7 @@
#include "lib/mysqlclient/ob_mysql_proxy.h"
#include "lib/utility/ob_macro_utils.h"
#include "rootserver/ob_rs_event_history_table_operator.h"
#include "rootserver/freeze/ob_major_freeze_util.h"
namespace oceanbase
{
@ -310,6 +311,7 @@ int ObZoneMergeManagerBase::start_zone_merge(
ObMySQLTransaction trans;
const int64_t cur_time = ObTimeUtility::current_time();
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_);
FREEZE_TIME_GUARD;
if (OB_FAIL(check_valid(zone, idx))) {
LOG_WARN("fail to check valid", KR(ret), K(zone), K_(tenant_id));
@ -344,6 +346,7 @@ int ObZoneMergeManagerBase::start_zone_merge(
tmp_info.broadcast_scn_.set_scn(global_merge_info_.global_broadcast_scn(), need_update);
tmp_info.frozen_scn_.set_scn(global_merge_info_.frozen_scn(), need_update);
FREEZE_TIME_GUARD;
if (OB_FAIL(ObZoneMergeTableOperator::update_partial_zone_merge_info(trans, tenant_id_, tmp_info))) {
LOG_WARN("fail to update partial zone merge info", KR(ret), K_(tenant_id), K(tmp_info));
}
@ -373,6 +376,7 @@ int ObZoneMergeManagerBase::finish_zone_merge(
ObMySQLTransaction trans;
const int64_t cur_time = ObTimeUtility::current_time();
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_);
FREEZE_TIME_GUARD;
if (OB_FAIL(check_valid(zone, idx))) {
LOG_WARN("fail to check valid", KR(ret), K(zone), K_(tenant_id));
@ -408,6 +412,7 @@ int ObZoneMergeManagerBase::finish_zone_merge(
tmp_info.all_merged_scn_.set_scn(new_all_merged_scn, true);
}
FREEZE_TIME_GUARD;
if (OB_FAIL(ObZoneMergeTableOperator::update_partial_zone_merge_info(trans, tenant_id_, tmp_info))) {
LOG_WARN("fail to update partial zone merge info", KR(ret), K_(tenant_id), K(tmp_info));
}
@ -466,6 +471,7 @@ int ObZoneMergeManagerBase::set_merge_error(const int64_t error_type, const int6
is_merge_error = 0;
}
FREEZE_TIME_GUARD;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret), K_(tenant_id));
} else if (OB_FAIL(trans.start(proxy_, meta_tenant_id))) {
@ -480,6 +486,7 @@ int ObZoneMergeManagerBase::set_merge_error(const int64_t error_type, const int6
tmp_global_info.is_merge_error_.set_val(is_merge_error, true);
tmp_global_info.error_type_.set_val(error_type, true);
FREEZE_TIME_GUARD;
if (OB_FAIL(ObGlobalMergeTableOperator::update_partial_global_merge_info(trans, tenant_id_,
tmp_global_info))) {
LOG_WARN("fail to update partial global merge info", KR(ret), K(tmp_global_info));
@ -512,6 +519,7 @@ int ObZoneMergeManagerBase::set_zone_merging(
int64_t idx = OB_INVALID_INDEX;
ObMySQLTransaction trans;
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_);
FREEZE_TIME_GUARD;
if (OB_FAIL(check_valid(zone, idx))) {
LOG_WARN("fail to check valid", KR(ret), K(zone), K_(tenant_id));
} else if (OB_FAIL(trans.start(proxy_, meta_tenant_id))) {
@ -526,6 +534,7 @@ int ObZoneMergeManagerBase::set_zone_merging(
} else if (is_merging != zone_merge_infos_[idx].is_merging_.get_value()) {
tmp_info.is_merging_.set_val(is_merging, true);
FREEZE_TIME_GUARD;
if (OB_FAIL(ObZoneMergeTableOperator::update_partial_zone_merge_info(trans, tenant_id_, tmp_info))) {
LOG_WARN("fail to update partial zone merge info", KR(ret), K_(tenant_id), K(tmp_info));
}
@ -668,6 +677,7 @@ int ObZoneMergeManagerBase::generate_next_global_broadcast_scn(
SCN &next_scn)
{
int ret = OB_SUCCESS;
FREEZE_TIME_GUARD;
ObMySQLTransaction trans;
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_);
if (OB_FAIL(check_inner_stat())) {
@ -713,6 +723,7 @@ int ObZoneMergeManagerBase::generate_next_global_broadcast_scn(
LOG_INFO("next global_broadcast_scn", K_(tenant_id), K(next_scn), K(tmp_global_info));
tmp_global_info.merge_status_.set_val(ObZoneMergeInfo::MERGE_STATUS_MERGING, true);
FREEZE_TIME_GUARD;
if (OB_FAIL(ObGlobalMergeTableOperator::update_partial_global_merge_info(trans, tenant_id_,
tmp_global_info))) {
LOG_WARN("fail to update partial global merge info", KR(ret), K(tmp_global_info));
@ -761,6 +772,7 @@ int ObZoneMergeManagerBase::try_update_global_last_merged_scn(const int64_t expe
}
if (OB_SUCC(ret) && need_update) {
FREEZE_TIME_GUARD;
if (OB_FAIL(trans.start(proxy_, meta_tenant_id))) {
LOG_WARN("fail to start transaction", KR(ret), K_(tenant_id), K(meta_tenant_id));
} else if (OB_FAIL(check_freeze_service_epoch(trans, expected_epoch))) {
@ -776,6 +788,7 @@ int ObZoneMergeManagerBase::try_update_global_last_merged_scn(const int64_t expe
tmp_global_info.last_merged_scn_.set_scn(global_merge_info_.global_broadcast_scn(), true);
tmp_global_info.merge_status_.set_val(ObZoneMergeInfo::MERGE_STATUS_IDLE, true);
FREEZE_TIME_GUARD;
if (OB_FAIL(ObGlobalMergeTableOperator::update_partial_global_merge_info(trans, tenant_id_,
tmp_global_info))) {
LOG_WARN("fail to update partial global merge info", KR(ret), K(tmp_global_info));

View File

@ -471,3 +471,27 @@ ObPxLocalWorkerFactory::~ObPxLocalWorkerFactory()
{
destroy();
}
//////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////
int ObPxWorker::check_status()
{
int ret = OB_SUCCESS;
if (nullptr != session_) {
session_->is_terminate(ret);
}
if (OB_SUCC(ret)) {
if (is_timeout()) {
ret = OB_TIMEOUT;
} else if (IS_INTERRUPTED()) {
ObInterruptCode &ic = GET_INTERRUPT_CODE();
ret = ic.code_;
LOG_WARN("px execution was interrupted", K(ic), K(ret));
}
}
return ret;
}

View File

@ -226,6 +226,12 @@ public:
}
};
class ObPxWorker : public lib::Worker
{
public:
virtual int check_status() override;
};
}
}
#endif /* __OB_SQL_ENGINE_PX_WORKER_RUNNABLE_H__ */

View File

@ -539,7 +539,6 @@ int ObIndexTreeMultiPrefetcher::multi_prefetch()
int64_t tenant_id = MTL_ID();
ObMicroIndexInfo &cur_index_info = read_handle.index_block_info_;
ObMicroBlockDataHandle &next_handle = read_handle.get_read_handle();
ObMicroBlockDataHandle tmp_handle;
if (OB_UNLIKELY(!cur_index_info.is_valid() ||
nullptr == read_handle.micro_handle_ ||
&next_handle == read_handle.micro_handle_ ||
@ -550,7 +549,7 @@ int ObIndexTreeMultiPrefetcher::multi_prefetch()
tenant_id,
cur_index_info,
cur_index_info.is_data_block(),
tmp_handle))) {
next_handle))) {
//not in cache yet, stop this rowkey prefetching if it's not the rowkey to be feteched
ret = OB_SUCCESS;
if (is_rowkey_to_fetched) {
@ -560,13 +559,9 @@ int ObIndexTreeMultiPrefetcher::multi_prefetch()
} else {
stop_prefetch = true;
}
} else {
// already in block cache, get block data
read_handle.micro_handle_->reset();
*read_handle.micro_handle_ = tmp_handle;
if (OB_FAIL(read_handle.micro_handle_->get_cached_index_block_data(*index_read_info_, index_block_))) {
LOG_WARN("Fail to get cached index block data", K(ret), KPC(read_handle.micro_handle_));
}
} else if (FALSE_IT(read_handle.set_cur_micro_handle(next_handle))) {
} else if (OB_FAIL(read_handle.micro_handle_->get_cached_index_block_data(*index_read_info_, index_block_))) {
LOG_WARN("Fail to get cached index block data", K(ret), KPC(read_handle.micro_handle_));
}
if (OB_SUCC(ret) && !stop_prefetch) {
if (OB_FAIL(drill_down(cur_index_info.get_macro_id(), read_handle, cur_index_info.is_leaf_block(), is_rowkey_to_fetched))) {

View File

@ -221,13 +221,13 @@ public:
cur_prefetch_end_ = false;
index_block_info_.reset();
micro_handle_idx_ = 0;
for (int64_t i = 0; i < DEFAULT_GET_MICRO_DATA_HANDLE_CNT; ++i) {
for (int64_t i = 0; i < DEFAULT_MULTIGET_MICRO_DATA_HANDLE_CNT; ++i) {
micro_handles_[i].reset();
}
}
OB_INLINE ObMicroBlockDataHandle& get_read_handle()
{
return micro_handles_[micro_handle_idx_ % DEFAULT_GET_MICRO_DATA_HANDLE_CNT];
return micro_handles_[micro_handle_idx_ % DEFAULT_MULTIGET_MICRO_DATA_HANDLE_CNT];
}
OB_INLINE void set_cur_micro_handle(ObMicroBlockDataHandle &handle)
{
@ -236,11 +236,13 @@ public:
}
INHERIT_TO_STRING_KV("ObSSTableReadHandle", ObSSTableReadHandle, KPC_(rowkey),
K_(cur_level), K_(cur_prefetch_end), K_(index_block_info), K_(micro_handle_idx), K_(micro_handles));
// TODO(yht146439) change to 2
static const int64_t DEFAULT_MULTIGET_MICRO_DATA_HANDLE_CNT = 3;
int16_t cur_level_;
bool cur_prefetch_end_;
ObMicroIndexInfo index_block_info_;
int64_t micro_handle_idx_;
ObMicroBlockDataHandle micro_handles_[DEFAULT_GET_MICRO_DATA_HANDLE_CNT];
ObMicroBlockDataHandle micro_handles_[DEFAULT_MULTIGET_MICRO_DATA_HANDLE_CNT];
};
typedef ObReallocatedFixedArray<ObSSTableReadHandleExt> ReadHandleExtArray;
ObIndexTreeMultiPrefetcher() :