fix tablet report logic

This commit is contained in:
obdev 2022-11-03 10:10:17 +00:00 committed by wangzelin.wzl
parent 6dba6ed96f
commit 17bea8074b
17 changed files with 323 additions and 948 deletions

View File

@ -35,7 +35,6 @@ ob_set_subtarget(ob_server common
ob_srv_xlator_primary.cpp
ob_srv_xlator_rootserver.cpp
ob_srv_xlator_storage.cpp
ob_tablet_checksum_updater.cpp
ob_tenant_duty_task.cpp
ob_uniq_task_queue.cpp
)

View File

@ -164,7 +164,7 @@ ObService::ObService(const ObGlobalContext &gctx)
lease_state_mgr_(), heartbeat_process_(gctx, schema_updater_, lease_state_mgr_),
gctx_(gctx), server_trace_task_(), schema_release_task_(),
schema_status_task_(), remote_master_rs_update_task_(gctx), ls_table_updater_(),
tablet_table_updater_(), tablet_checksum_updater_(), meta_table_checker_()
tablet_table_updater_(), meta_table_checker_()
{
}
@ -205,8 +205,6 @@ int ObService::init(common::ObMySQLProxy &sql_proxy,
FLOG_WARN("init tablet table updater failed", KR(ret));
} else if (OB_FAIL(ls_table_updater_.init())) {
FLOG_WARN("init log stream table updater failed", KR(ret));
} else if (OB_FAIL(tablet_checksum_updater_.init(*this))) {
FLOG_WARN("init tablet checksum updater failed", KR(ret));
} else if (OB_FAIL(meta_table_checker_.init(
gctx_.lst_operator_,
gctx_.tablet_operator_,
@ -300,10 +298,6 @@ void ObService::stop()
tablet_table_updater_.stop();
FLOG_INFO("tablet table updater stopped");
FLOG_INFO("begin to stop tablet checksum updater");
tablet_checksum_updater_.stop();
FLOG_INFO("tablet checksum updater stopped");
FLOG_INFO("begin to stop meta table checker");
meta_table_checker_.stop();
FLOG_INFO("meta table checker stopped");
@ -329,10 +323,6 @@ void ObService::wait()
tablet_table_updater_.wait();
FLOG_INFO("wait tablet table updater success");
FLOG_INFO("begin to wait tablet checksum updater");
tablet_checksum_updater_.wait();
FLOG_INFO("wait tablet checksum updater success");
FLOG_INFO("begin to wait meta table checker");
meta_table_checker_.wait();
FLOG_INFO("wait meta table checker success");
@ -430,23 +420,6 @@ int ObService::submit_tablet_update_task(
return ret;
}
int ObService::submit_tablet_checksums_task(
const uint64_t tenant_id,
const ObLSID &ls_id,
const ObTabletID &tablet_id)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (!ls_id.is_valid_with_tenant(tenant_id) || !tablet_id.is_valid_with_tenant(tenant_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id), K(tablet_id));
} else if (OB_FAIL(tablet_checksum_updater_.async_update(tenant_id, ls_id, tablet_id))) {
LOG_WARN("fail to async update tablet checksum", KR(ret), K(tenant_id), K(ls_id), K(tablet_id));
}
return ret;
}
int ObService::submit_async_refresh_schema_task(
const uint64_t tenant_id,
@ -1942,11 +1915,13 @@ int ObService::write_ddl_sstable_commit_log(const ObDDLWriteSSTableCommitLogArg
return ret;
}
int ObService::inner_fill_tablet_replica_(
int ObService::inner_fill_tablet_info_(
const int64_t tenant_id,
const ObTabletID &tablet_id,
storage::ObLS *ls,
ObTabletReplica &tablet_replica)
ObTabletReplica &tablet_replica,
share::ObTabletReplicaChecksumItem &tablet_checksum,
const bool need_checksum)
{
ObLSHandle ls_handle;
ObTabletHandle tablet_handle;
@ -1978,9 +1953,9 @@ int ObService::inner_fill_tablet_replica_(
const ObLSID &ls_id = ls->get_ls_id();
int64_t data_size = 0;
int64_t required_size = 0;
ObArray<int64_t> column_checksums; // param palceholder
ObArray<int64_t> column_checksums;
if (OB_FAIL(tablet_handle.get_obj()->get_tablet_report_info(column_checksums, data_size,
required_size, false/*need_checksum*/))) {
required_size, need_checksum))) {
LOG_WARN("fail to get tablet report info from tablet", KR(ret), K(tenant_id), K(tablet_id));
} else if (OB_FAIL(tablet_replica.init(
tenant_id,
@ -1992,66 +1967,29 @@ int ObService::inner_fill_tablet_replica_(
required_size))) {
LOG_WARN("fail to init a tablet replica", KR(ret), K(tenant_id),
K(tablet_id), K(tablet_replica));
}
}
return ret;
}
int ObService::inner_fill_tablet_replica_checksum_item_(
const int64_t tenant_id,
const ObTabletID &tablet_id,
storage::ObLS *ls,
share::ObTabletReplicaChecksumItem &item)
{
int ret = OB_SUCCESS;
ObTabletHandle tablet_handle;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("service not inited", KR(ret));
} else if (OB_UNLIKELY(!tablet_id.is_valid())
|| OB_INVALID_TENANT_ID == tenant_id
|| OB_ISNULL(ls)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument or nullptr", KR(ret), K(tablet_id), K(tenant_id));
} else if (OB_ISNULL(ls->get_tablet_svr())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get_tablet_svr is null", KR(ret), K(tenant_id), K(tablet_id));
} else if (OB_FAIL(ls->get_tablet_svr()->get_tablet(
tablet_id,
tablet_handle,
ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) {
if (OB_TABLET_NOT_EXIST != ret) {
LOG_WARN("get tablet failed", KR(ret), K(tenant_id), K(tablet_id));
}
} else if (OB_ISNULL(gctx_.config_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("gctx_.config_ is null", KR(ret), K(tenant_id), K(tablet_id));
} else {
item.tenant_id_ = tenant_id;
item.ls_id_ = ls->get_ls_id();
item.tablet_id_ = tablet_id;
item.server_ = gctx_.self_addr();
item.row_count_ = tablet_handle.get_obj()->get_tablet_meta().report_status_.row_count_;
item.snapshot_version_ = tablet_handle.get_obj()->get_tablet_meta().report_status_.merge_snapshot_version_;
item.data_checksum_ = tablet_handle.get_obj()->get_tablet_meta().report_status_.data_checksum_;
int64_t data_size = 0;
int64_t required_size = 0;
ObArray<int64_t> column_checksums;
if (OB_FAIL(tablet_handle.get_obj()->get_tablet_report_info(column_checksums, data_size, required_size))) {
LOG_WARN("fail to get column checksums from tablet", KR(ret), K(tenant_id), K(tablet_id), K(item));
} else if (OB_FAIL(item.column_meta_.init(column_checksums))) {
} else if (!need_checksum) {
} else if (OB_FAIL(tablet_checksum.column_meta_.init(column_checksums))) {
LOG_WARN("fail to init report column meta with column_checksums", KR(ret), K(column_checksums));
} else {
tablet_checksum.tenant_id_ = tenant_id;
tablet_checksum.ls_id_ = ls->get_ls_id();
tablet_checksum.tablet_id_ = tablet_id;
tablet_checksum.server_ = gctx_.self_addr();
tablet_checksum.snapshot_version_ = snapshot_version;
tablet_checksum.row_count_ = tablet_handle.get_obj()->get_tablet_meta().report_status_.row_count_;
tablet_checksum.data_checksum_ = tablet_handle.get_obj()->get_tablet_meta().report_status_.data_checksum_;
}
}
return ret;
}
int ObService::fill_tablet_replica(
int ObService::fill_tablet_report_info(
const uint64_t tenant_id,
const ObLSID &ls_id,
const ObTabletID &tablet_id,
ObTabletReplica &tablet_replica)
ObTabletReplica &tablet_replica,
share::ObTabletReplicaChecksumItem &tablet_checksum,
const bool need_checksum)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!inited_)) {
@ -2077,62 +2015,18 @@ int ObService::fill_tablet_replica(
LOG_TRACE("log stream not exist in this tenant", KR(ret), K(tenant_id), K(ls_id));
}
} else if (FALSE_IT(ls = ls_handle.get_ls())) {
} else if (OB_FAIL(inner_fill_tablet_replica_(tenant_id,
tablet_id,
ls,
tablet_replica))) {
} else if (OB_FAIL(inner_fill_tablet_info_(tenant_id,
tablet_id,
ls,
tablet_replica,
tablet_checksum,
need_checksum))) {
if (OB_TABLET_NOT_EXIST != ret) {
LOG_WARN("fail to inner fill tenant's tablet replica", KR(ret),
K(tenant_id), K(tablet_id), K(ls), K(tablet_replica));
K(tenant_id), K(tablet_id), K(ls), K(tablet_replica), K(tablet_checksum), K(need_checksum));
} else {
LOG_TRACE("tablet not exist in this log stream", KR(ret),
K(tenant_id), K(tablet_id), K(ls), K(tablet_replica));
}
}
}
}
return ret;
}
int ObService::fill_tablet_replica_checksum_item(
const uint64_t tenant_id,
const share::ObLSID &ls_id,
const ObTabletID &tablet_id,
share::ObTabletReplicaChecksumItem &item)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("service not inited", KR(ret));
} else if (!tablet_id.is_valid() || !ls_id.is_valid() || OB_INVALID_TENANT_ID == tenant_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tablet_id), K(ls_id), K(tenant_id));
} else {
MTL_SWITCH(tenant_id) {
ObTabletHandle tablet_handle;
ObLSHandle ls_handle;
storage::ObLS *ls = nullptr;
ObLSService* ls_svr = nullptr;
if (OB_ISNULL(ls_svr = MTL(ObLSService*))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("MTL ObLSService is null", KR(ret), K(tenant_id));
} else if (OB_FAIL(ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::OBSERVER_MOD))) {
if (OB_LS_NOT_EXIST != ret) {
LOG_WARN("fail to get log_stream's ls_handle", KR(ret), K(tenant_id), K(ls_id));
} else {
LOG_TRACE("log stream not exist in this tenant", KR(ret), K(tenant_id), K(ls_id));
}
} else if (FALSE_IT(ls = ls_handle.get_ls())) {
} else if (OB_FAIL(inner_fill_tablet_replica_checksum_item_(tenant_id,
tablet_id,
ls,
item))) {
if (OB_TABLET_NOT_EXIST != ret) {
LOG_WARN("fail to inner fill tenant's tablet checksum item", KR(ret),
K(tenant_id), K(tablet_id), K(ls), K(item));
} else {
LOG_TRACE("tablet not exist in this log stream", KR(ret),
K(tenant_id), K(tablet_id), K(ls), K(item));
K(tenant_id), K(tablet_id), K(ls), K(tablet_replica), K(tablet_checksum), K(need_checksum));
}
}
}

View File

@ -18,7 +18,6 @@
#include "share/ob_all_server_tracer.h"
#include "observer/ob_lease_state_mgr.h"
#include "observer/ob_heartbeat.h"
#include "observer/ob_tablet_checksum_updater.h"
#include "observer/ob_server_schema_updater.h"
#include "observer/ob_rpc_processor_simple.h"
#include "observer/ob_uniq_task_queue.h"
@ -92,18 +91,17 @@ public:
// @params[in] ls_id: tablet belongs to which log stream
// @params[in] tablet_id: the tablet to build
// @params[out] tablet_replica: infos about this tablet replica
// @params[out] tablet_checksum: infos about this tablet data/column checksum
// @params[in] need_checksum: whether to fill tablet_checksum
// ATTENTION: If ls not exist, then OB_LS_NOT_EXIST
// If tablet not exist on that ls, then OB_TABLET_NOT_EXIST
int fill_tablet_replica(
int fill_tablet_report_info(
const uint64_t tenant_id,
const share::ObLSID &ls_id,
const ObTabletID &tablet_id,
share::ObTabletReplica &tablet_replica);
int fill_tablet_replica_checksum_item(
const uint64_t tenant_id,
const share::ObLSID &ls_id,
const ObTabletID &tablet_id,
share::ObTabletReplicaChecksumItem &item);
share::ObTabletReplica &tablet_replica,
share::ObTabletReplicaChecksumItem &tablet_checksum,
const bool need_checksum = true);
int detect_master_rs_ls(const obrpc::ObDetectMasterRsArg &arg,
obrpc::ObDetectMasterRsLSResult &result);
@ -120,10 +118,6 @@ public:
const uint64_t tenant_id,
const share::ObLSID &ls_id,
const ObTabletID &tablet_id) override;
virtual int submit_tablet_checksums_task(
const uint64_t tenant_id,
const share::ObLSID &ls_id,
const ObTabletID &tablet_id) override;
////////////////////////////////////////////////////////////////
int check_frozen_version(const obrpc::ObCheckFrozenVersionArg &arg);
@ -227,16 +221,13 @@ public:
int renew_in_zone_hb(const share::ObInZoneHbRequest &arg,
share::ObInZoneHbResponse &result);
private:
int inner_fill_tablet_replica_(
int inner_fill_tablet_info_(
const int64_t tenant_id,
const ObTabletID &tablet_id,
storage::ObLS *ls,
share::ObTabletReplica &tablet_replica);
int inner_fill_tablet_replica_checksum_item_(
const int64_t tenant_id,
const ObTabletID &tablet_id,
storage::ObLS *ls,
share::ObTabletReplicaChecksumItem &item);
share::ObTabletReplica &tablet_replica,
share::ObTabletReplicaChecksumItem &tablet_checksum,
const bool need_checksum);
int register_self();
int check_server_empty(const obrpc::ObCheckServerEmptyArg &arg, const bool wait_log_scan, bool &server_empty);
@ -262,7 +253,6 @@ private:
// report
ObLSTableUpdater ls_table_updater_;
ObTabletTableUpdater tablet_table_updater_;
ObTabletChecksumUpdater tablet_checksum_updater_;
ObServerMetaTableChecker meta_table_checker_;
};

View File

@ -1,508 +0,0 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SERVER
#include "ob_tablet_checksum_updater.h"
#include "observer/ob_service.h"
#include "share/ob_tablet_replica_checksum_operator.h"
namespace oceanbase
{
using namespace common;
using namespace share;
namespace observer
{
ObTabletChecksumUpdateTask::ObTabletChecksumUpdateTask()
: tenant_id_(OB_INVALID_TENANT_ID),
ls_id_(),
tablet_id_(),
add_timestamp_(OB_INVALID_TIMESTAMP)
{
}
ObTabletChecksumUpdateTask::ObTabletChecksumUpdateTask(
const uint64_t tenant_id,
const share::ObLSID &ls_id,
const common::ObTabletID &tablet_id,
const int64_t add_timestamp)
: tenant_id_(tenant_id),
ls_id_(ls_id),
tablet_id_(tablet_id),
add_timestamp_(add_timestamp)
{
}
ObTabletChecksumUpdateTask::~ObTabletChecksumUpdateTask()
{
reset();
}
void ObTabletChecksumUpdateTask::reset()
{
tenant_id_ = OB_INVALID_TENANT_ID;
ls_id_.reset();
tablet_id_.reset();
add_timestamp_ = OB_INVALID_TIMESTAMP;
}
bool ObTabletChecksumUpdateTask::is_valid() const
{
return OB_INVALID_TENANT_ID != tenant_id_
&& ls_id_.is_valid_with_tenant(tenant_id_)
&& tablet_id_.is_valid_with_tenant(tenant_id_)
&& 0 < add_timestamp_;
}
int ObTabletChecksumUpdateTask::init(
const uint64_t tenant_id,
const ObLSID &ls_id,
const ObTabletID &tablet_id,
const int64_t add_timestamp)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id)
|| !tablet_id.is_valid_with_tenant(tenant_id)
|| 0 >= add_timestamp)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid arguments", KR(ret), K(tenant_id), K(ls_id), K(tablet_id), K(add_timestamp));
} else {
tenant_id_ = tenant_id;
ls_id_ = ls_id;
tablet_id_ = tablet_id;
add_timestamp_ = add_timestamp;
}
return ret;
}
int64_t ObTabletChecksumUpdateTask::hash() const
{
uint64_t hash_val = 0;
hash_val = murmurhash(&tenant_id_, sizeof(tenant_id_), hash_val);
hash_val = murmurhash(&ls_id_, sizeof(ls_id_), hash_val);
hash_val = murmurhash(&tablet_id_, sizeof(tablet_id_), hash_val);
return hash_val;
}
void ObTabletChecksumUpdateTask::check_task_status() const
{
// TODO @danling.fjk ignore this error log temp
// int64_t now = ObTimeUtility::current_time();
// const int64_t safe_interval = TABLET_CHECK_INTERVAL;
// // print an error log if this task is not executed correctly since two mins ago
// if (now - add_timestamp_ > safe_interval) {
// LOG_ERROR("tablet table update task cost too much time to execute",
// K(*this), K(safe_interval), "cost_time", now - add_timestamp_);
// }
}
int ObTabletChecksumUpdateTask::assign(const ObTabletChecksumUpdateTask &other)
{
int ret = OB_SUCCESS;
if (this != &other) {
tenant_id_ = other.tenant_id_;
ls_id_ = other.ls_id_;
tablet_id_ = other.tablet_id_;
add_timestamp_ = other.add_timestamp_;
}
return ret;
}
bool ObTabletChecksumUpdateTask::operator==(const ObTabletChecksumUpdateTask &other) const
{
bool bret = false;
if (this == &other) {
bret = true;
} else {
bret = (tenant_id_ == other.tenant_id_
&& ls_id_ == other.ls_id_
&& tablet_id_ == other.tablet_id_);
}
return bret;
}
bool ObTabletChecksumUpdateTask::operator!=(const ObTabletChecksumUpdateTask &other) const
{
return !(operator==(other));
}
bool ObTabletChecksumUpdateTask::compare_without_version(
const ObTabletChecksumUpdateTask &other) const
{
return (operator==(other));
}
ObTabletChecksumUpdater::ObTabletChecksumUpdater()
: inited_(false),
stopped_(true),
ob_service_(nullptr),
task_queue_()
{
}
ObTabletChecksumUpdater::~ObTabletChecksumUpdater()
{
destroy();
}
int ObTabletChecksumUpdater::init(ObService &ob_service)
{
int ret = OB_SUCCESS;
const int64_t task_queue_size = !lib::is_mini_mode()
? TASK_QUEUE_SIZE
: MINI_MODE_TASK_QUEUE_SIZE;
const int64_t task_thread_cnt = !lib::is_mini_mode()
? UPDATE_TASK_THREAD_CNT
: MINI_MODE_UPDATE_TASK_THREAD_CNT;
if (OB_UNLIKELY(inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("inited twice", KR(ret));
} else if (OB_FAIL(task_queue_.init(this,
task_thread_cnt, task_queue_size, "TbltCksUp"))) {
LOG_WARN("failed to init tablet checksum updater task queue", KR(ret),
"thread_count", task_thread_cnt,
"queue_size", task_queue_size);
} else {
ob_service_ = &ob_service;
inited_ = true;
stopped_ = false;
LOG_INFO("success to init ObTabletChecksumUpdater");
}
return ret;
}
void ObTabletChecksumUpdater::stop()
{
if (inited_) {
stopped_ = true;
task_queue_.stop();
LOG_INFO("stop ObTabletChecksumUpdater success");
}
}
void ObTabletChecksumUpdater::wait()
{
if (inited_) {
task_queue_.wait();
LOG_INFO("wait ObTabletChecksumUpdater");
}
}
void ObTabletChecksumUpdater::destroy()
{
stop();
wait();
inited_ = false;
stopped_ = true;
ob_service_ = nullptr;
}
int ObTabletChecksumUpdater::async_update(
const uint64_t tenant_id,
const share::ObLSID &ls_id,
const common::ObTabletID &tablet_id)
{
int ret = OB_SUCCESS;
int64_t add_timestamp = ObTimeUtility::current_time();
ObTabletChecksumUpdateTask task;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObTabletChecksumUpdater not inited", KR(ret));
} else if (tablet_id.is_reserved_tablet() || is_virtual_tenant_id(tenant_id)) {
LOG_TRACE("no need to report virtual tenant's tablet and reserved tablet",
KR(ret), K(tablet_id), K(tenant_id));
} else if (OB_INVALID_TENANT_ID == tenant_id
|| !ls_id.is_valid()
|| !ls_id.is_valid_with_tenant(tenant_id)
|| !tablet_id.is_valid()
|| !tablet_id.is_valid_with_tenant(tenant_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id), K(tablet_id));
} else if (OB_FAIL(task.init(tenant_id,
ls_id,
tablet_id,
add_timestamp))) {
LOG_WARN("set update task failed", KR(ret), K(tenant_id), K(ls_id), K(tablet_id),
K(add_timestamp));
} else if (OB_FAIL(add_task_(task))){
LOG_WARN("fail to add task", KR(ret), K(tenant_id), K(ls_id), K(tablet_id),
K(add_timestamp));
}
return ret;
}
int ObTabletChecksumUpdater::add_task_(const ObTabletChecksumUpdateTask &task)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", KR(ret));
} else if (!task.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid task", KR(ret), K(task));
} else if (OB_FAIL(task_queue_.add(task))){
// TODO: deal with barrier-tasks when execute
if (OB_EAGAIN == ret) {
LOG_TRACE("tablet table update task exist", K(task));
ret = OB_SUCCESS;
} else {
LOG_WARN("add tablet table update task failed", KR(ret), K(task));
}
}
return ret;
}
int ObTabletChecksumUpdater::reput_to_queue_(const ObIArray<ObTabletChecksumUpdateTask> &tasks)
{
int ret = OB_SUCCESS;
// try to push task back to queue, ignore ret code
for (int64_t i = 0; i < tasks.count(); i++) {
const ObTabletChecksumUpdateTask &task = tasks.at(i);
if (!task.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get invalid task", KR(ret), K(task));
} else if (OB_FAIL(add_task_(task))) {
LOG_ERROR("fail to reput to queue", KR(ret), K(task));
}
}
return ret;
}
int ObTabletChecksumUpdater::process_barrier(
const ObTabletChecksumUpdateTask &task,
bool &stopped)
{
int ret = OB_NOT_SUPPORTED;
UNUSEDx(task, stopped);
return ret;
}
int ObTabletChecksumUpdater::batch_process_tasks(
const common::ObIArray<ObTabletChecksumUpdateTask> &tasks,
bool &stopped)
{
int ret = OB_SUCCESS;
UNUSED(stopped);
int tmp_ret = OB_SUCCESS;
const int64_t start_time = ObTimeUtility::current_time();
ObArray<ObTabletReplicaChecksumItem> update_tablet_items;
UpdateTabletChecksumTaskList update_tasks;
RemoveTabletChecksumTaskList remove_tasks;
ObCurTraceId::init(GCONF.self_addr_);
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObTabletChecksumUpdater is not inited", KR(ret));
} else if (OB_FAIL(generate_tasks_(tasks, update_tablet_items, update_tasks, remove_tasks))){
if (OB_TENANT_NOT_IN_SERVER != ret && OB_NOT_RUNNING != ret) {
LOG_ERROR("generate_tasks failed", KR(ret), "tasks count", tasks.count(),
"update_tablet_items", update_tablet_items.count(),
"update_tasks", update_tasks.count(),
"remove_tasks", remove_tasks.count());
} else {
LOG_WARN("Tenant/LogStream has been stopped, skip to process tasks", KR(ret));
}
} else {
tmp_ret = do_batch_update_(start_time, update_tasks, update_tablet_items);
if (OB_SUCCESS != tmp_ret) {
ret = OB_SUCC(ret) ? tmp_ret : ret;
LOG_WARN("do_batch_update_ failed", KR(tmp_ret), K(start_time), K(update_tasks),
K(update_tablet_items));
} else {
ObTaskController::get().allow_next_syslog();
LOG_INFO("REPORT: success to update tablets", KR(tmp_ret), K(update_tablet_items));
}
tmp_ret = do_batch_remove_(start_time, remove_tasks);
if (OB_SUCCESS != tmp_ret) {
ret = OB_SUCC(ret) ? tmp_ret : ret;
LOG_WARN("do_batch_remove_ failed", KR(tmp_ret), K(start_time), K(remove_tasks));
} else {
ObTaskController::get().allow_next_syslog();
LOG_INFO("REPORT: success to remove tablets", KR(tmp_ret), K(remove_tasks));
}
}
return ret;
}
int ObTabletChecksumUpdater::generate_tasks_(
const ObIArray<ObTabletChecksumUpdateTask> &tasks,
ObIArray<ObTabletReplicaChecksumItem> &update_tablet_items,
UpdateTabletChecksumTaskList &update_tasks,
RemoveTabletChecksumTaskList &remove_tasks)
{
int ret = OB_SUCCESS;
int64_t tenant_id = OB_INVALID_TENANT_ID;
int64_t count = UNIQ_TASK_QUEUE_BATCH_EXECUTE_NUM;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObTabletChecksumUpdater is not inited", KR(ret));
} else if (OB_ISNULL(ob_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid argument", KR(ret), KP_(ob_service));
} else if (OB_UNLIKELY(tasks.count() <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("tasks count <= 0", KR(ret), "tasks_count", tasks.count());
} else if (FALSE_IT(tenant_id = tasks.at(0).get_tenant_id())) {
// shall never be here
} else {
ObTabletReplicaChecksumItem item;
FOREACH_CNT_X(task, tasks, OB_SUCC(ret)) {
// split tasks into remove and update
if (OB_ISNULL(task)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid task", KR(ret), K(task));
} else if (tenant_id != task->get_tenant_id()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("tenant_id not the same", KR(ret), K(tenant_id), K(task));
} else {
task->check_task_status();
item.reset();
if (OB_FAIL(ob_service_->fill_tablet_replica_checksum_item(task->get_tenant_id(),
task->get_ls_id(), task->get_tablet_id(), item))) {
if (OB_TABLET_NOT_EXIST == ret || OB_LS_NOT_EXIST == ret) {
LOG_INFO("try update a not exist or invalid tablet, turn to remove tablet table",
KR(ret), "tenant_id", task->get_tenant_id(),
"ls_id", task->get_ls_id(),
"tablet_id", task->get_tablet_id());
ret = OB_SUCCESS;
if (OB_FAIL(remove_tasks.reserve(count))) {
// reserve() is reentrant, do not have to check whether first time
LOG_WARN("fail to reserver remove_tasks", KR(ret), K(count));
} else if (OB_FAIL(remove_tasks.push_back(*task))) {
LOG_WARN("fail to push back remove task", KR(ret), KPC(task));
}
} else {
LOG_WARN("fail to fill tablet item", KR(ret), "tenant_id", task->get_tenant_id(),
"ls_id", task->get_ls_id(), "tablet_id", task->get_tablet_id());
}
} else {
LOG_TRACE("fill tablet checksum item success", K(task), K(item));
if (OB_FAIL(update_tablet_items.reserve(count))) {
// reserve() is reentrant, do not have to check whether first time
LOG_WARN("fail to reserve update_tablet_items", KR(ret), K(count));
} else if (OB_FAIL(update_tasks.reserve(count))) {
// reserve() is reentrant, do not have to check whether first time
LOG_WARN("fail to reserve update_tasks", KR(ret), K(count));
} else if (OB_FAIL(update_tablet_items.push_back(item))) {
LOG_WARN("fail to push back item", KR(ret), K(item));
} else if (OB_FAIL(update_tasks.push_back(*task))) {
LOG_WARN("fail to push back task", KR(ret), KPC(task));
}
}
}
} //FOREACH
if (OB_SUCC(ret)
&& (update_tasks.count() != update_tablet_items.count()
|| update_tasks.count() + remove_tasks.count() != tasks.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet task count and item count not match", KR(ret),
"tablet_update_tasks count", update_tasks.count(),
"tablet_update_items count", update_tablet_items.count(),
"tablet_remove_tasks count", remove_tasks.count(),
"tasks count", tasks.count());
}
}
return ret;
}
int ObTabletChecksumUpdater::do_batch_update_(
const int64_t start_time,
const ObIArray<ObTabletChecksumUpdateTask> &tasks,
const ObIArray<ObTabletReplicaChecksumItem> &items)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
int64_t tenant_id = OB_INVALID_TENANT_ID;
if (tasks.count() != items.count() || 0 == tasks.count()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("tasks num not match", KR(ret), "task_cnt", tasks.count(), "item_cnt", items.count());
} else if (FALSE_IT(tenant_id = tasks.at(0).get_tenant_id())) {
} else if (OB_FAIL(ObTabletReplicaChecksumOperator::batch_update(tenant_id, items, *GCTX.sql_proxy_))) {
LOG_WARN("do tablet table update failed, try to reput to queue", KR(ret),
"escape time", ObTimeUtility::current_time() - start_time);
(void) throttle_(ret, ObTimeUtility::current_time() - start_time);
if (OB_SUCCESS != (tmp_ret = reput_to_queue_(tasks))) {
LOG_ERROR("fail to reput update task to queue", KR(tmp_ret), K(tasks.count()));
} else {
LOG_INFO("reput update task to queue success", K(tasks.count()));
}
} else {
LOG_INFO("batch process update success", KR(ret), K(items.count()),
"use_time", ObTimeUtility::current_time() - start_time);
}
return ret;
}
int ObTabletChecksumUpdater::do_batch_remove_(
const int64_t start_time,
const ObIArray<ObTabletChecksumUpdateTask> &tasks)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
int64_t tenant_id = OB_INVALID_TENANT_ID;
if (0 == tasks.count()) {
LOG_INFO("no need to remove task", KR(ret), "task_cnt", tasks.count());
} else if (FALSE_IT(tenant_id = tasks.at(0).get_tenant_id())) {
} else {
ObArray<share::ObLSID> ls_ids;
ObArray<common::ObTabletID> tablet_ids;
for (int64_t i = 0; OB_SUCC(ret) && i < tasks.count(); ++i) {
if (OB_FAIL(ls_ids.push_back(tasks.at(i).ls_id_))) {
LOG_WARN("failed to add ls id", KR(ret));
} else if (OB_FAIL(tablet_ids.push_back(tasks.at(i).tablet_id_))) {
LOG_WARN("failed to add tablet id", KR(ret));
}
}
if (OB_SUCC(ret) && OB_FAIL(ObTabletReplicaChecksumOperator::batch_remove(tenant_id, ls_ids, tablet_ids, *GCTX.sql_proxy_))) {
LOG_WARN("do tablet checksum remove failed, try to reput to queue", KR(ret),
"escape time", ObTimeUtility::current_time() - start_time);
(void) throttle_(ret, ObTimeUtility::current_time() - start_time);
if (OB_SUCCESS != (tmp_ret = reput_to_queue_(tasks))) {
LOG_ERROR("fail to reput remove task to queue", KR(tmp_ret), K(tasks.count()));
} else {
LOG_INFO("reput remove task to queue success", K(tasks.count()));
}
} else {
LOG_INFO("batch process remove success", KR(ret), K(tasks.count()),
"use_time", ObTimeUtility::current_time() - start_time);
}
}
return ret;
}
int ObTabletChecksumUpdater::throttle_(
const int return_code,
const int64_t execute_time_us)
{
int ret = OB_SUCCESS;
int64_t sleep_us = 0;
if (OB_SUCCESS != return_code) {
sleep_us = 2l * 1000 * 1000; // 2s
} else if (execute_time_us > 20 * 1000 * 1000) { // 20s
sleep_us = MIN(1L * 1000 * 1000, (execute_time_us - 20 * 1000 * 1000));
LOG_WARN("detected slow update, may be too many concurrent updating", K(sleep_us));
}
const static int64_t sleep_step_us = 20 * 1000; // 20ms
for (; !stopped_ && sleep_us > 0;
sleep_us -= sleep_step_us) {
ob_usleep(static_cast<int32_t>(std::min(sleep_step_us, sleep_us)));
}
return ret;
}
} // observer
} // oceanbase

View File

@ -1,141 +0,0 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_OBSERVER_OB_TABLET_CHECKSUM_UPDATER_H_
#define OCEANBASE_OBSERVER_OB_TABLET_CHECKSUM_UPDATER_H_
#include "observer/ob_uniq_task_queue.h"
#include "common/ob_tablet_id.h"
#include "share/ob_ls_id.h"
namespace oceanbase
{
namespace common
{
class ObAddr;
class ObTabletID;
}
namespace share
{
class ObLSID;
struct ObTabletReplicaChecksumItem;
}
namespace observer
{
class ObService;
class ObTabletChecksumUpdater;
class ObTabletChecksumUpdateTask : public ObIUniqTaskQueueTask<ObTabletChecksumUpdateTask>
{
public:
friend class ObTabletChecksumUpdater;
ObTabletChecksumUpdateTask();
ObTabletChecksumUpdateTask(
const uint64_t tenant_id,
const share::ObLSID &ls_id,
const common::ObTabletID &tablet_id,
const int64_t add_timestamp);
virtual ~ObTabletChecksumUpdateTask();
int init(
const uint64_t tenant_id,
const share::ObLSID &ls_id,
const common::ObTabletID &tablet_id,
const int64_t add_timestamp);
void reset();
bool is_valid() const;
int64_t hash() const;
void check_task_status() const;
int assign(const ObTabletChecksumUpdateTask &other);
virtual bool operator==(const ObTabletChecksumUpdateTask &other) const;
virtual bool operator!=(const ObTabletChecksumUpdateTask &other) const;
inline int64_t get_tenant_id() const { return tenant_id_; }
inline const share::ObLSID &get_ls_id() const { return ls_id_; }
inline const common::ObTabletID &get_tablet_id() const { return tablet_id_; }
inline int64_t get_add_timestamp() const { return add_timestamp_; }
bool compare_without_version(const ObTabletChecksumUpdateTask& other) const;
uint64_t get_group_id() const { return tenant_id_; }
bool need_process_alone() const { return false; }
virtual bool need_assign_when_equal() const { return false; }
inline int assign_when_equal(const ObTabletChecksumUpdateTask &other)
{
UNUSED(other);
return common::OB_NOT_SUPPORTED;
}
bool is_barrier() const { return false; }
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(tablet_id), K_(add_timestamp));
private:
const int64_t TABLET_CHECK_INTERVAL = 120l * 1000 * 1000; //2 minutes
int64_t tenant_id_;
share::ObLSID ls_id_;
common::ObTabletID tablet_id_;
int64_t add_timestamp_;
};
// TODO impl Checker to clean the abandoned task
typedef ObUniqTaskQueue<ObTabletChecksumUpdateTask, ObTabletChecksumUpdater> ObTabletChecksumTaskQueue;
typedef ObArray<ObTabletChecksumUpdateTask> UpdateTabletChecksumTaskList;
typedef ObArray<ObTabletChecksumUpdateTask> RemoveTabletChecksumTaskList;
// TODO impl MTL Module
class ObTabletChecksumUpdater
{
public:
ObTabletChecksumUpdater();
virtual ~ObTabletChecksumUpdater();
int init(ObService &ob_service);
inline bool is_inited() const { return inited_; }
void stop();
void wait();
void destroy();
int async_update(
const uint64_t tenant_id,
const share::ObLSID &ls_id,
const common::ObTabletID &tablet_id);
int batch_process_tasks(
const common::ObIArray<ObTabletChecksumUpdateTask> &tasks,
bool &stopped);
int process_barrier(const ObTabletChecksumUpdateTask &task, bool &stopped);
private:
int add_task_(const ObTabletChecksumUpdateTask &task);
int reput_to_queue_(const ObIArray<ObTabletChecksumUpdateTask> &tasks);
int generate_tasks_(
const ObIArray<ObTabletChecksumUpdateTask> &tasks,
ObIArray<share::ObTabletReplicaChecksumItem> &update_tablet_items,
UpdateTabletChecksumTaskList &update_tasks,
RemoveTabletChecksumTaskList &remove_tasks);
int do_batch_update_(
const int64_t start_time,
const ObIArray<ObTabletChecksumUpdateTask> &tasks,
const ObIArray<share::ObTabletReplicaChecksumItem> &items);
int do_batch_remove_(
const int64_t start_time,
const ObIArray<ObTabletChecksumUpdateTask> &tasks);
int throttle_(const int return_code, const int64_t execute_time_us);
private:
const int64_t MINI_MODE_UPDATE_TASK_THREAD_CNT = 1;
const int64_t UPDATE_TASK_THREAD_CNT = 7;
const int64_t MINI_MODE_TASK_QUEUE_SIZE = 20 * 10000;
const int64_t TASK_QUEUE_SIZE = 100 * 10000;
bool inited_;
bool stopped_;
ObService *ob_service_;
ObTabletChecksumTaskQueue task_queue_;
DISALLOW_COPY_AND_ASSIGN(ObTabletChecksumUpdater);
};
} // observer
} // oceanbase
#endif // OCEANBASE_OBSERVER_OB_TABLET_CHECKSUM_UPDATER_H_

View File

@ -37,10 +37,6 @@ public:
const uint64_t tenant_id,
const share::ObLSID &ls_id,
const common::ObTabletID &tablet_id) = 0;
virtual int submit_tablet_checksums_task(
const uint64_t tenant_id,
const share::ObLSID &ls_id,
const common::ObTabletID &tablet_id) = 0;
};
} // end namespace observer

View File

@ -17,6 +17,9 @@
#include "share/tablet/ob_tablet_info.h" // for ObTabletInfo
#include "share/tablet/ob_tablet_table_operator.h" // for ObTabletOperator
#include "observer/ob_service.h" // for is_mini_mode
#include "share/ob_tablet_replica_checksum_operator.h" // for ObTabletReplicaChecksumItem
#include "lib/mysqlclient/ob_mysql_transaction.h" // ObMySQLTransaction
#include "lib/mysqlclient/ob_mysql_proxy.h"
namespace oceanbase
{
@ -287,6 +290,7 @@ int ObTabletTableUpdater::generate_tasks_(
const ObIArray<ObTabletTableUpdateTask> &batch_tasks,
ObArray<ObTabletReplica> &update_tablet_replicas,
ObArray<ObTabletReplica> &remove_tablet_replicas,
ObArray<ObTabletReplicaChecksumItem> &update_tablet_checksums,
UpdateTaskList &update_tablet_tasks,
RemoveTaskList &remove_tablet_tasks)
{
@ -307,6 +311,7 @@ int ObTabletTableUpdater::generate_tasks_(
// shall never be here
} else {
ObTabletReplica replica;
ObTabletReplicaChecksumItem checksum_item;
FOREACH_CNT_X(task, batch_tasks, OB_SUCC(ret)) {
// split tasks into remove and update
if (OB_ISNULL(task)) {
@ -318,10 +323,12 @@ int ObTabletTableUpdater::generate_tasks_(
} else {
task->check_task_status();
replica.reset();
if (OB_FAIL(ob_service_->fill_tablet_replica(task->get_tenant_id(),
task->get_ls_id(),
task->get_tablet_id(),
replica))) {
checksum_item.reset();
if (OB_FAIL(ob_service_->fill_tablet_report_info(task->get_tenant_id(),
task->get_ls_id(),
task->get_tablet_id(),
replica,
checksum_item))) {
if (OB_TABLET_NOT_EXIST == ret || OB_LS_NOT_EXIST == ret) {
ret = OB_SUCCESS;
// fill primary keys of the replica for removing
@ -353,11 +360,16 @@ int ObTabletTableUpdater::generate_tasks_(
if (OB_FAIL(update_tablet_replicas.reserve(count))) {
// reserve() is reentrant, do not have to check whether first time
LOG_WARN("fail to reserve update_tablet_replicas", KR(ret), K(count));
} else if (OB_FAIL(update_tablet_checksums.reserve(count))) {
// reserve() is reentrant, do not have to check whether first time
LOG_WARN("fail to reserve update_tablet_checksums", KR(ret), K(count));
} else if (OB_FAIL(update_tablet_tasks.reserve(count))) {
// reserve() is reentrant, do not have to check whether first time
LOG_WARN("fail to reserve update_tablet_tasks", KR(ret), K(count));
} else if (OB_FAIL(update_tablet_replicas.push_back(replica))) {
LOG_WARN("fail to push back replica", KR(ret), K(replica));
} else if (OB_FAIL(update_tablet_checksums.push_back(checksum_item))) {
LOG_WARN("fail to push back checksum item", KR(ret), K(checksum_item));
} else if (OB_FAIL(update_tablet_tasks.push_back(*task))) {
LOG_WARN("fail to push back task", KR(ret), KPC(task));
}
@ -367,12 +379,14 @@ int ObTabletTableUpdater::generate_tasks_(
if (OB_SUCC(ret)
&& (update_tablet_tasks.count() != update_tablet_replicas.count()
|| update_tablet_tasks.count() != update_tablet_checksums.count()
|| update_tablet_tasks.count() + remove_tablet_tasks.count() !=
batch_tasks.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet task count and replica count not match", KR(ret),
"tablet_update_tasks count", update_tablet_tasks.count(),
"tablet_update_replicas count", update_tablet_replicas.count(),
"tablet_update_checksums count", update_tablet_checksums.count(),
"tablet_remove_tasks count", remove_tablet_tasks.count(),
"batch_tasks count", batch_tasks.count());
}
@ -390,6 +404,7 @@ int ObTabletTableUpdater::batch_process_tasks(
const int64_t start_time = ObTimeUtility::current_time();
ObArray<ObTabletReplica> update_tablet_replicas;
ObArray<ObTabletReplica> remove_tablet_replicas;
ObArray<ObTabletReplicaChecksumItem> update_tablet_checksums;
UpdateTaskList update_tablet_tasks;
RemoveTaskList remove_tablet_tasks;
ObCurTraceId::init(GCONF.self_addr_);
@ -424,19 +439,21 @@ int ObTabletTableUpdater::batch_process_tasks(
batch_tasks,
update_tablet_replicas,
remove_tablet_replicas,
update_tablet_checksums,
update_tablet_tasks,
remove_tablet_tasks))) {
LOG_ERROR("generate_tasks failed", KR(ret), "batch_tasks count", batch_tasks.count(),
"update_tablet_replicas", update_tablet_replicas.count(),
"remove_tablet_replicas", remove_tablet_replicas.count(),
"update_tablet_checksums", update_tablet_checksums.count(),
"update_tablet_tasks", update_tablet_tasks.count(),
"remove_tablet_tasks", remove_tablet_tasks.count());
} else {
tmp_ret = do_batch_update_(start_time, update_tablet_tasks, update_tablet_replicas);
tmp_ret = do_batch_update_(start_time, update_tablet_tasks, update_tablet_replicas, update_tablet_checksums);
if (OB_SUCCESS != tmp_ret) {
ret = OB_SUCC(ret) ? tmp_ret : ret;
LOG_WARN("do_batch_update_ failed", KR(tmp_ret), K(start_time), K(update_tablet_tasks),
K(update_tablet_replicas));
K(update_tablet_replicas), K(update_tablet_checksums));
} else {
ObTaskController::get().allow_next_syslog();
LOG_INFO("REPORT: success to update tablets", KR(tmp_ret), K(update_tablet_replicas));
@ -472,18 +489,37 @@ int ObTabletTableUpdater::do_batch_remove_(
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tasks count", KR(ret), K(tasks_count));
} else if (FALSE_IT(tenant_id = tasks.at(0).get_tenant_id())) {
} else if (OB_FAIL(tablet_operator_->batch_remove(tenant_id, replicas))) {
LOG_WARN("do tablet table remove failed, try to reput to queue", KR(ret),
"escape time", ObTimeUtility::current_time() - start_time);
(void) throttle_(ret, ObTimeUtility::current_time() - start_time);
if (OB_SUCCESS != (tmp_ret = reput_to_queue_(tasks))) {
LOG_ERROR("fail to reput remove task to queue", KR(tmp_ret), K(tasks_count));
} else {
LOG_INFO("reput remove task to queue success", K(tasks_count));
}
} else {
LOG_INFO("batch process remove success", KR(ret), K(tasks_count),
"use_time", ObTimeUtility::current_time() - start_time);
common::ObMySQLTransaction trans;
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
if (OB_FAIL(trans.start(GCTX.sql_proxy_, meta_tenant_id))) {
LOG_WARN("fail to start transaction", KR(ret), K(tenant_id), K(meta_tenant_id));
} else if (OB_FAIL(tablet_operator_->batch_remove(trans, tenant_id, replicas))) {
LOG_WARN("do tablet table remove failed, try to reput to queue", KR(ret),
"escape time", ObTimeUtility::current_time() - start_time);
} else if (OB_FAIL(ObTabletReplicaChecksumOperator::batch_remove_with_trans(trans, tenant_id, replicas))) {
LOG_WARN("do tablet table checksum remove failed, try to reput to queue", KR(ret),
"escape time", ObTimeUtility::current_time() - start_time);
}
if (trans.is_started()) {
int trans_ret = trans.end(OB_SUCCESS == ret);
if (OB_SUCCESS != trans_ret) {
LOG_WARN("fail to end transaction", KR(trans_ret));
ret = ((OB_SUCCESS == ret) ? trans_ret : ret);
}
}
if (OB_FAIL(ret)) {
(void) throttle_(ret, ObTimeUtility::current_time() - start_time);
if (OB_SUCCESS != (tmp_ret = reput_to_queue_(tasks))) {
LOG_ERROR("fail to reput remove task to queue", KR(tmp_ret), K(tasks_count));
} else {
LOG_INFO("reput remove task to queue success", K(tasks_count));
}
} else {
LOG_INFO("batch process remove success", KR(ret), K(tasks_count),
"use_time", ObTimeUtility::current_time() - start_time);
}
}
return ret;
}
@ -491,30 +527,51 @@ int ObTabletTableUpdater::do_batch_remove_(
int ObTabletTableUpdater::do_batch_update_(
const int64_t start_time,
const ObIArray<ObTabletTableUpdateTask> &tasks,
const ObIArray<ObTabletReplica> &replicas)
const ObIArray<ObTabletReplica> &replicas,
const ObIArray<ObTabletReplicaChecksumItem> &checksums)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
int64_t tenant_id = OB_INVALID_TENANT_ID;
if (tasks.count() != replicas.count()
|| tasks.count() != checksums.count()
|| OB_ISNULL(tablet_operator_)
|| 0 == tasks.count()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("tasks num not match or invalid tablet_operator",
KR(ret), "task_cnt", tasks.count(), "replica_cnt", replicas.count());
LOG_WARN("tasks num not match or invalid tablet_operator", KR(ret), "task_cnt", tasks.count(),
"replica_cnt", replicas.count(), "checksum_cnt", checksums.count());
} else if (FALSE_IT(tenant_id = tasks.at(0).get_tenant_id())) {
} else if (OB_FAIL(tablet_operator_->batch_update(tenant_id, replicas))) {
LOG_WARN("do tablet table update failed, try to reput to queue", KR(ret),
"escape time", ObTimeUtility::current_time() - start_time);
(void) throttle_(ret, ObTimeUtility::current_time() - start_time);
if (OB_SUCCESS != (tmp_ret = reput_to_queue_(tasks))) {
LOG_ERROR("fail to reput update task to queue", KR(tmp_ret), K(tasks.count()));
} else {
LOG_INFO("reput update task to queue success", K(tasks.count()));
}
} else {
LOG_INFO("batch process update success", KR(ret), K(replicas.count()),
common::ObMySQLTransaction trans;
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
if (OB_FAIL(trans.start(GCTX.sql_proxy_, meta_tenant_id))) {
LOG_WARN("fail to start transaction", KR(ret), K(tenant_id), K(meta_tenant_id));
} else if (OB_FAIL(tablet_operator_->batch_update(trans, tenant_id, replicas))) {
LOG_WARN("do tablet table update failed, try to reput to queue", KR(ret),
"escape time", ObTimeUtility::current_time() - start_time);
} else if (OB_FAIL(ObTabletReplicaChecksumOperator::batch_update_with_trans(trans, tenant_id, checksums))) {
LOG_WARN("do tablet table checksum update failed, try to reput to queue", KR(ret),
"escape time", ObTimeUtility::current_time() - start_time);
}
if (trans.is_started()) {
int trans_ret = trans.end(OB_SUCCESS == ret);
if (OB_SUCCESS != trans_ret) {
LOG_WARN("fail to end transaction", KR(trans_ret));
ret = ((OB_SUCCESS == ret) ? trans_ret : ret);
}
}
if (OB_FAIL(ret)) {
(void) throttle_(ret, ObTimeUtility::current_time() - start_time);
if (OB_SUCCESS != (tmp_ret = reput_to_queue_(tasks))) {
LOG_ERROR("fail to reput update task to queue", KR(tmp_ret), K(tasks.count()));
} else {
LOG_INFO("reput update task to queue success", K(tasks.count()));
}
} else {
LOG_INFO("batch process update success", KR(ret), K(replicas.count()),
"use_time", ObTimeUtility::current_time() - start_time);
}
}
return ret;
}

View File

@ -29,6 +29,7 @@ namespace share
class ObLSID;
class ObTabletReplica;
class ObTabletTableOperator;
struct ObTabletReplicaChecksumItem;
}
namespace observer
{
@ -143,12 +144,14 @@ private:
// @parma [in] batch_tasks, input tasks
// @parma [out] update_tablet_replicas, generated update replicas
// @parma [out] remove_tablet_replicas, generated remove replicas
// @parma [out] update_tablet_checksums, generated update tablet checksums
// @parma [out] update_tablet_tasks, generated update tasks
// @parma [out] remove_tablet_tasks, generated remove tasks
int generate_tasks_(
const ObIArray<ObTabletTableUpdateTask> &batch_tasks,
ObArray<share::ObTabletReplica> &update_tablet_replicas,
ObArray<share::ObTabletReplica> &remove_tablet_replicas,
ObArray<share::ObTabletReplicaChecksumItem> &update_tablet_checksums,
UpdateTaskList &update_tablet_tasks,
RemoveTaskList &remove_tablet_tasks);
@ -156,10 +159,12 @@ private:
// @parma [in] start_time, the time to start this execution
// @parma [in] tasks, batch of tasks to execute
// @parma [in] replicas, related replica to each task
// @parma [in] checksums, related checksum to each task
int do_batch_update_(
const int64_t start_time,
const ObIArray<ObTabletTableUpdateTask> &tasks,
const ObIArray<share::ObTabletReplica> &replicas);
const ObIArray<share::ObTabletReplica> &replicas,
const ObIArray<share::ObTabletReplicaChecksumItem> &checksums);
// do_batch_remove - the real action to remove a batch of tasks
// @parma [in] start_time, the time to start this execution

View File

@ -22,6 +22,7 @@
#include "share/tablet/ob_tablet_table_iterator.h" // ObTenantTabletTableIterator
#include "storage/tx_storage/ob_ls_service.h" // ObLSService, ObLSIterator
#include "storage/tx_storage/ob_ls_handle.h" // ObLSHandle
#include "share/ob_tablet_replica_checksum_operator.h" // ObTabletReplicaChecksumItem
namespace oceanbase
{
@ -621,6 +622,8 @@ int ObTenantMetaChecker::check_report_replicas_(
ObTabletID tablet_id;
ObTabletReplica local_replica; // replica from local
ObTabletReplica table_replica; // replica from meta table
share::ObTabletReplicaChecksumItem tablet_checksum; // TODO(@donglou.zl) check tablet_replica_checksum
const bool need_checksum = false;
const ObLSID &ls_id = ls->get_ls_id();
while (OB_SUCC(ret)) {
if (OB_FAIL(tablet_iter.get_next_tablet(tablet_handle))) {
@ -650,11 +653,13 @@ int ObTenantMetaChecker::check_report_replicas_(
LOG_WARN("get replica from hashmap failed",
KR(ret), K_(tenant_id), K(ls_id), K(tablet_id));
}
} else if (OB_FAIL(GCTX.ob_service_->fill_tablet_replica(
} else if (OB_FAIL(GCTX.ob_service_->fill_tablet_report_info(
tenant_id_,
ls_id,
tablet_id,
local_replica))) {
local_replica,
tablet_checksum,
need_checksum))) {
LOG_WARN("fail to fill tablet replica", KR(ret), K_(tenant_id), K(ls_id), K(tablet_id));
} else if (table_replica.is_equal_for_report(local_replica)) {
continue;

View File

@ -20,6 +20,8 @@
#include "lib/mysqlclient/ob_mysql_proxy.h"
#include "share/inner_table/ob_inner_table_schema_constants.h"
#include "share/schema/ob_column_schema.h"
#include "share/tablet/ob_tablet_info.h"
#include "share/config/ob_server_config.h"
namespace oceanbase
{
@ -346,14 +348,77 @@ ObTabletReplicaChecksumItem &ObTabletReplicaChecksumItem::operator=(const ObTabl
/****************************** ObTabletReplicaChecksumOperator ******************************/
int ObTabletReplicaChecksumOperator::batch_remove(
int ObTabletReplicaChecksumOperator::batch_remove_with_trans(
ObMySQLTransaction &trans,
const uint64_t tenant_id,
const ObIArray<ObLSID> &ls_ids,
const ObIArray<ObTabletID> &tablet_ids,
ObISQLClient &sql_proxy)
const common::ObIArray<share::ObTabletReplica> &tablet_replicas)
{
int ret = OB_NOT_IMPLEMENT;
UNUSEDx(tenant_id, ls_ids, tablet_ids, sql_proxy);
int ret = OB_SUCCESS;
const int64_t replicas_count = tablet_replicas.count();
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || replicas_count <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), "tablet_replica cnt", replicas_count);
} else {
int64_t start_idx = 0;
int64_t end_idx = min(MAX_BATCH_COUNT, replicas_count);
while (OB_SUCC(ret) && (start_idx < end_idx)) {
if (OB_FAIL(inner_batch_remove_by_sql_(tenant_id, tablet_replicas, start_idx, end_idx, trans))) {
LOG_WARN("fail to inner batch remove", KR(ret), K(tenant_id), K(start_idx), K(end_idx));
} else {
start_idx = end_idx;
end_idx = min(start_idx + MAX_BATCH_COUNT, replicas_count);
}
}
}
return ret;
}
int ObTabletReplicaChecksumOperator::inner_batch_remove_by_sql_(
const uint64_t tenant_id,
const common::ObIArray<share::ObTabletReplica> &tablet_replicas,
const int64_t start_idx,
const int64_t end_idx,
ObMySQLTransaction &trans)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id)
|| (tablet_replicas.count() <= 0)
|| (start_idx < 0)
|| (start_idx > end_idx)
|| (end_idx > tablet_replicas.count()))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(start_idx), K(end_idx),
"tablet_replica cnt", tablet_replicas.count());
} else {
ObSqlString sql;
int64_t affected_rows = 0;
if (OB_FAIL(sql.assign_fmt("DELETE FROM %s WHERE tenant_id = '%lu' AND (tablet_id, svr_ip, svr_port, ls_id) IN(",
OB_ALL_TABLET_REPLICA_CHECKSUM_TNAME, tenant_id))) {
LOG_WARN("fail to assign sql", KR(ret), K(tenant_id));
} else {
char ip[OB_MAX_SERVER_ADDR_SIZE] = "";
for (int64_t idx = start_idx; OB_SUCC(ret) && (idx < end_idx); ++idx) {
if (OB_UNLIKELY(!tablet_replicas.at(idx).get_server().ip_to_string(ip, sizeof(ip)))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("convert server ip to string failed", KR(ret), "server", tablet_replicas.at(idx).get_server());
} else if (OB_FAIL(sql.append_fmt("('%lu', '%s', %d, %ld)%s",
tablet_replicas.at(idx).get_tablet_id().id(),
ip,
tablet_replicas.at(idx).get_server().get_port(),
tablet_replicas.at(idx).get_ls_id().id(),
((idx == end_idx - 1) ? ")" : ", ")))) {
LOG_WARN("fail to assign sql", KR(ret), K(tenant_id), K(idx), K(start_idx), K(end_idx));
}
}
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
if (FAILEDx(trans.write(meta_tenant_id, sql.ptr(), affected_rows))) {
LOG_WARN("fail to execute sql", KR(ret), K(meta_tenant_id), K(sql));
} else {
LOG_INFO("will batch delete tablet replica checksum", K(affected_rows));
}
}
}
return ret;
}
@ -687,58 +752,38 @@ int ObTabletReplicaChecksumOperator::construct_tablet_replica_checksum_item_(
return ret;
}
int ObTabletReplicaChecksumOperator::batch_update(
int ObTabletReplicaChecksumOperator::batch_update_with_trans(
common::ObMySQLTransaction &trans,
const uint64_t tenant_id,
const ObIArray<ObTabletReplicaChecksumItem> &items,
ObISQLClient &sql_proxy)
const common::ObIArray<ObTabletReplicaChecksumItem> &items)
{
return batch_insert_or_update_(tenant_id, items, sql_proxy, true);
return batch_insert_or_update_with_trans_(tenant_id, items, trans, true);
}
int ObTabletReplicaChecksumOperator::batch_insert(
int ObTabletReplicaChecksumOperator::batch_insert_or_update_with_trans_(
const uint64_t tenant_id,
const ObIArray<ObTabletReplicaChecksumItem> &items,
ObISQLClient &sql_proxy)
{
return batch_insert_or_update_(tenant_id, items, sql_proxy, false);
}
int ObTabletReplicaChecksumOperator::batch_insert_or_update_(
const uint64_t tenant_id,
const ObIArray<ObTabletReplicaChecksumItem> &items,
ObISQLClient &sql_proxy,
common::ObMySQLTransaction &trans,
const bool is_update)
{
int ret = OB_SUCCESS;
ObMySQLTransaction trans;
if (OB_UNLIKELY((OB_INVALID_TENANT_ID == tenant_id) || (items.count() <= 0))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), "items count", items.count());
} else {
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
if (OB_FAIL(trans.start(&sql_proxy, meta_tenant_id))) {
LOG_WARN("fail to start transaction", KR(ret), K(tenant_id), K(meta_tenant_id));
} else {
int64_t start_idx = 0;
int64_t end_idx = min(MAX_BATCH_COUNT, items.count());
while (OB_SUCC(ret) && (start_idx < end_idx)) {
if (OB_FAIL(inner_batch_insert_or_update_by_sql_(tenant_id, items, start_idx,
end_idx, trans, is_update))) {
LOG_WARN("fail to inner batch insert", KR(ret), K(tenant_id), K(start_idx), K(is_update));
} else {
start_idx = end_idx;
end_idx = min(start_idx + MAX_BATCH_COUNT, items.count());
}
int64_t start_idx = 0;
int64_t end_idx = min(MAX_BATCH_COUNT, items.count());
while (OB_SUCC(ret) && (start_idx < end_idx)) {
if (OB_FAIL(inner_batch_insert_or_update_by_sql_(tenant_id, items, start_idx,
end_idx, trans, is_update))) {
LOG_WARN("fail to inner batch insert", KR(ret), K(tenant_id), K(start_idx), K(is_update));
} else {
start_idx = end_idx;
end_idx = min(start_idx + MAX_BATCH_COUNT, items.count());
}
}
}
if (trans.is_started()) {
int trans_ret = trans.end(OB_SUCCESS == ret);
if (OB_SUCCESS != trans_ret) {
LOG_WARN("fail to end transaction", KR(trans_ret));
ret = ((OB_SUCCESS == ret) ? trans_ret : ret);
}
}
return ret;
}

View File

@ -30,6 +30,7 @@ namespace common
class ObISQLClient;
class ObAddr;
class ObTabletID;
class ObMySQLTransaction;
namespace sqlclient
{
class ObMySQLResult;
@ -37,6 +38,7 @@ class ObMySQLResult;
}
namespace share
{
class ObTabletReplica;
struct ObTabletReplicaReportColumnMeta
{
@ -117,19 +119,14 @@ public:
const common::ObSqlString &sql,
common::ObISQLClient &sql_proxy,
common::ObIArray<ObTabletReplicaChecksumItem> &items);
static int batch_insert(
static int batch_update_with_trans(
common::ObMySQLTransaction &trans,
const uint64_t tenant_id,
const common::ObIArray<ObTabletReplicaChecksumItem> &items,
common::ObISQLClient &sql_proxy);
static int batch_update(
const common::ObIArray<ObTabletReplicaChecksumItem> &item);
static int batch_remove_with_trans(
common::ObMySQLTransaction &trans,
const uint64_t tenant_id,
const common::ObIArray<ObTabletReplicaChecksumItem> &items,
common::ObISQLClient &sql_proxy);
static int batch_remove(
const uint64_t tenant_id,
const common::ObIArray<share::ObLSID> &ls_ids,
const common::ObIArray<common::ObTabletID> &tablet_ids,
common::ObISQLClient &sql_proxy);
const common::ObIArray<share::ObTabletReplica> &tablet_replicas);
static int check_column_checksum(
const uint64_t tenant_id,
@ -153,10 +150,10 @@ public:
common::ObString &column_meta_hex_str);
private:
static int batch_insert_or_update_(
static int batch_insert_or_update_with_trans_(
const uint64_t tenant_id,
const common::ObIArray<ObTabletReplicaChecksumItem> &items,
common::ObISQLClient &sql_proxy,
common::ObMySQLTransaction &trans,
const bool is_update);
static int inner_batch_insert_or_update_by_sql_(
@ -167,6 +164,13 @@ private:
common::ObISQLClient &sql_client,
const bool is_update);
static int inner_batch_remove_by_sql_(
const uint64_t tenant_id,
const common::ObIArray<share::ObTabletReplica> &tablet_replicas,
const int64_t start_idx,
const int64_t end_idx,
common::ObMySQLTransaction &trans);
static int inner_batch_get_by_sql_(
const uint64_t tenant_id,
const common::ObSqlString &sql,

View File

@ -396,35 +396,49 @@ int ObTabletTableOperator::batch_update(
const ObIArray<ObTabletReplica> &replicas)
{
int ret = OB_SUCCESS;
common::ObMySQLTransaction trans;
if (OB_UNLIKELY(!inited_) || OB_ISNULL(sql_proxy_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || replicas.count() <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), "replicas count", replicas.count());
} else {
common::ObMySQLTransaction trans;
const uint64_t sql_tenant_id = gen_meta_tenant_id(tenant_id);
if (OB_FAIL(trans.start(sql_proxy_, sql_tenant_id))) {
LOG_WARN("start transaction failed", KR(ret), K(sql_tenant_id));
} else {
int64_t start_idx = 0;
int64_t end_idx = min(MAX_BATCH_COUNT, replicas.count());
while (OB_SUCC(ret) && (start_idx < end_idx)) {
if (OB_FAIL(inner_batch_update_by_sql_(tenant_id, replicas, start_idx, end_idx, trans))) {
LOG_WARN("fail to inner batch update", KR(ret), K(tenant_id), K(replicas), K(start_idx));
} else {
start_idx = end_idx;
end_idx = min(start_idx + MAX_BATCH_COUNT, replicas.count());
}
} else if (OB_FAIL(batch_update(trans, tenant_id, replicas))) {
LOG_WARN("fail to batch update", KR(ret), K(tenant_id));
}
if (trans.is_started()) {
int trans_ret = trans.end(OB_SUCCESS == ret);
if (OB_SUCCESS != trans_ret) {
LOG_WARN("end transaction failed", KR(trans_ret));
ret = OB_SUCCESS == ret ? trans_ret : ret;
}
}
}
if (trans.is_started()) {
int trans_ret = trans.end(OB_SUCCESS == ret);
if (OB_SUCCESS != trans_ret) {
LOG_WARN("end transaction failed", KR(trans_ret));
ret = OB_SUCCESS == ret ? trans_ret : ret;
return ret;
}
int ObTabletTableOperator::batch_update(
ObISQLClient &sql_client,
const uint64_t tenant_id,
const ObIArray<ObTabletReplica> &replicas)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || replicas.count() <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), "replicas count", replicas.count());
} else {
int64_t start_idx = 0;
int64_t end_idx = min(MAX_BATCH_COUNT, replicas.count());
while (OB_SUCC(ret) && (start_idx < end_idx)) {
if (OB_FAIL(inner_batch_update_by_sql_(tenant_id, replicas, start_idx, end_idx, sql_client))) {
LOG_WARN("fail to inner batch update", KR(ret), K(tenant_id), K(replicas), K(start_idx));
} else {
start_idx = end_idx;
end_idx = min(start_idx + MAX_BATCH_COUNT, replicas.count());
}
}
}
return ret;
@ -549,40 +563,52 @@ int ObTabletTableOperator::batch_remove(
const ObIArray<ObTabletReplica> &replicas)
{
int ret = OB_SUCCESS;
common::ObMySQLTransaction trans;
if (OB_UNLIKELY(!inited_) || OB_ISNULL(sql_proxy_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || replicas.count() <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), "replicas count", replicas.count());
} else {
common::ObMySQLTransaction trans;
const uint64_t sql_tenant_id = gen_meta_tenant_id(tenant_id);
if (OB_FAIL(trans.start(sql_proxy_, sql_tenant_id))) {
LOG_WARN("start transaction failed", KR(ret), K(sql_tenant_id));
} else {
int64_t start_idx = 0;
int64_t end_idx = min(MAX_BATCH_COUNT, replicas.count());
while (OB_SUCC(ret) && (start_idx < end_idx)) {
if (OB_FAIL(inner_batch_remove_by_sql_(tenant_id, replicas, start_idx, end_idx, trans))) {
LOG_WARN("fail to inner batch remove", KR(ret), K(tenant_id), K(replicas), K(start_idx));
} else {
start_idx = end_idx;
end_idx = min(start_idx + MAX_BATCH_COUNT, replicas.count());
}
}
} else if (OB_FAIL(batch_remove(trans, tenant_id, replicas))) {
LOG_WARN("fail to batch remove", KR(ret));
}
}
if (trans.is_started()) {
int trans_ret = trans.end(OB_SUCCESS == ret);
if (OB_SUCCESS != trans_ret) {
LOG_WARN("end transaction failed", KR(trans_ret));
ret = OB_SUCCESS == ret ? trans_ret : ret;
if (trans.is_started()) {
int trans_ret = trans.end(OB_SUCCESS == ret);
if (OB_SUCCESS != trans_ret) {
LOG_WARN("end transaction failed", KR(trans_ret));
ret = OB_SUCCESS == ret ? trans_ret : ret;
}
}
}
return ret;
}
int ObTabletTableOperator::batch_remove(
ObISQLClient &sql_client,
const uint64_t tenant_id,
const ObIArray<ObTabletReplica> &replicas)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || replicas.count() <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), "replicas count", replicas.count());
} else {
int64_t start_idx = 0;
int64_t end_idx = min(MAX_BATCH_COUNT, replicas.count());
while (OB_SUCC(ret) && (start_idx < end_idx)) {
if (OB_FAIL(inner_batch_remove_by_sql_(tenant_id, replicas, start_idx, end_idx, sql_client))) {
LOG_WARN("fail to inner batch remove", KR(ret), K(tenant_id), K(replicas), K(start_idx));
} else {
start_idx = end_idx;
end_idx = min(start_idx + MAX_BATCH_COUNT, replicas.count());
}
}
}
return ret;
}
int ObTabletTableOperator::inner_batch_remove_by_sql_(
const uint64_t tenant_id,

View File

@ -102,6 +102,12 @@ public:
int batch_update(
const uint64_t tenant_id,
const ObIArray<ObTabletReplica> &replicas);
// batch update replicas into __all_tablet_meta_table
// differ from above batch_update(), it will use @sql_client to commit, not inner sql_proxy_.
int batch_update(
common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const ObIArray<ObTabletReplica> &replicas);
// batch remove replicas from __all_tablet_meta_table
//
// @param [in] tenant_id, target tenant_id
@ -110,6 +116,12 @@ public:
int batch_remove(
const uint64_t tenant_id,
const ObIArray<ObTabletReplica> &replicas);
// batch remove replicas from __all_tablet_meta_table
// differ from above batch_remove(), it will use @sql_client to commit, not inner sql_proxy_.
int batch_remove(
common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const ObIArray<ObTabletReplica> &replicas);
// remove residual tablet in __all_tablet_meta_table for ObServerMetaTableChecker
//
// @param [in] tenant_id, tenant for query

View File

@ -881,9 +881,7 @@ int ObTabletMergeFinishTask::process()
}
if (OB_SUCC(ret) && ctx.param_.is_major_merge() && NULL != ctx.param_.report_) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(ctx.param_.report_->submit_tablet_checksums_task(MTL_ID(), ctx.param_.ls_id_, tablet_id))) {
LOG_WARN("failed to submit tablet checksums task to report", K(tmp_ret), K(MTL_ID()), K(ctx.param_.ls_id_), K(tablet_id));
} else if (OB_TMP_FAIL(ctx.param_.report_->submit_tablet_update_task(MTL_ID(), ctx.param_.ls_id_, tablet_id))) {
if (OB_TMP_FAIL(ctx.param_.report_->submit_tablet_update_task(MTL_ID(), ctx.param_.ls_id_, tablet_id))) {
LOG_WARN("failed to submit tablet update task to report", K(tmp_ret), K(MTL_ID()), K(ctx.param_.ls_id_), K(tablet_id));
} else if (OB_TMP_FAIL(ctx.ls_handle_.get_ls()->get_tablet_svr()->update_tablet_report_status(tablet_id))) {
LOG_WARN("failed to update tablet report status", K(tmp_ret), K(MTL_ID()), K(tablet_id));

View File

@ -658,9 +658,7 @@ int ObTenantTabletScheduler::schedule_tablet_major_merge(
}
if (OB_SUCC(ret) && tablet_merge_finish && tablet.get_tablet_meta().report_status_.need_report()) {
if (OB_TMP_FAIL(GCTX.ob_service_->submit_tablet_checksums_task(MTL_ID(), ls_id, tablet_id))) {
LOG_WARN("failed to submit tablet checksums task to report", K(tmp_ret), K(MTL_ID()), K(tablet_id));
} else if (OB_TMP_FAIL(GCTX.ob_service_->submit_tablet_update_task(MTL_ID(), ls_id, tablet_id))) {
if (OB_TMP_FAIL(GCTX.ob_service_->submit_tablet_update_task(MTL_ID(), ls_id, tablet_id))) {
LOG_WARN("failed to submit tablet update task to report", K(tmp_ret), K(MTL_ID()), K(tablet_id));
} else if (OB_TMP_FAIL(ls.get_tablet_svr()->update_tablet_report_status(tablet_id))) {
LOG_WARN("failed to update tablet report status", K(tmp_ret), K(MTL_ID()), K(tablet_id));

View File

@ -381,8 +381,6 @@ int ObDDLTableMergeTask::process()
merge_param_.ddl_task_id_,
sstable->get_meta().get_col_checksum()))) {
LOG_WARN("report ddl column checksum failed", K(ret), K(merge_param_));
} else if (OB_FAIL(GCTX.ob_service_->submit_tablet_checksums_task(tenant_id, merge_param_.ls_id_, merge_param_.tablet_id_))) {
LOG_WARN("fail to submit tablet checksums task", K(ret), K(tenant_id), K(merge_param_));
} else if (OB_FAIL(GCTX.ob_service_->submit_tablet_update_task(tenant_id, merge_param_.ls_id_, merge_param_.tablet_id_))) {
LOG_WARN("fail to submit tablet update task", K(ret), K(tenant_id), K(merge_param_));
}

View File

@ -26,9 +26,6 @@ public:
MOCK_METHOD3(submit_tablet_update_task, int(const uint64_t tenant_id,
const share::ObLSID &ls_id,
const common::ObTabletID &tablet_id));
MOCK_METHOD3(submit_tablet_checksums_task, int(const uint64_t tenant_id,
const share::ObLSID &ls_id,
const common::ObTabletID &tablet_id));
};
}