[FEAT MERGE] add tenant param _no_logging

This commit is contained in:
AnimationFan 2024-11-20 12:48:15 +00:00 committed by ob-robot
parent 4db5ec53a2
commit 6ffbf2f1a6
43 changed files with 686 additions and 69 deletions

View File

@ -452,6 +452,7 @@ int ObTableLoadInstance::start_redef_table(
ddl_param.snapshot_version_ = start_res.snapshot_version_;
ddl_param.data_version_ = start_res.data_format_version_;
ddl_param.cluster_version_ = GET_MIN_CLUSTER_VERSION();
ddl_param.is_no_logging_ = start_res.is_no_logging_;
LOG_INFO("start redef table succeed", K(ddl_param));
}
return ret;

View File

@ -52,7 +52,8 @@ int ObTableLoadRedefTable::start(const ObTableLoadRedefTableStartArg &arg,
res.snapshot_version_,
status,
res.dest_table_id_,
res.schema_version_))) {
res.schema_version_,
res.is_no_logging_))) {
LOG_WARN("fail to get ddl task info", KR(ret), K(arg));
}
}
@ -85,6 +86,7 @@ int ObTableLoadRedefTable::start(const ObTableLoadRedefTableStartArg &arg,
res.dest_table_id_ = create_table_res.dest_table_id_;
res.task_id_ = create_table_res.task_id_;
res.schema_version_ = create_table_res.schema_version_;
res.is_no_logging_ = create_table_res.is_no_logging_;
LOG_INFO("succeed to create hidden table", K(arg), K(res));
}
THIS_WORKER.set_timeout_ts(origin_timeout_ts);

View File

@ -63,7 +63,7 @@ struct ObTableLoadRedefTableStartRes
{
public:
ObTableLoadRedefTableStartRes()
: dest_table_id_(common::OB_INVALID_ID), task_id_(0), schema_version_(0), snapshot_version_(0), data_format_version_(0)
: dest_table_id_(common::OB_INVALID_ID), task_id_(0), schema_version_(0), snapshot_version_(0), data_format_version_(0), is_no_logging_(false)
{
}
~ObTableLoadRedefTableStartRes() = default;
@ -74,14 +74,16 @@ public:
schema_version_ = 0;
snapshot_version_ = 0;
data_format_version_ = 0;
is_no_logging_ = false;
}
TO_STRING_KV(K_(dest_table_id), K_(task_id), K_(schema_version), K_(snapshot_version), K_(data_format_version));
TO_STRING_KV(K_(dest_table_id), K_(task_id), K_(schema_version), K_(snapshot_version), K_(data_format_version), K(is_no_logging_));
public:
uint64_t dest_table_id_;
int64_t task_id_;
int64_t schema_version_;
int64_t snapshot_version_;
uint64_t data_format_version_;
bool is_no_logging_;
};
struct ObTableLoadRedefTableFinishArg

View File

@ -292,6 +292,7 @@ int ObTableLoadStoreTableCtx::init_insert_table_ctx()
insert_table_param.col_descs_ = &(schema_->column_descs_);
insert_table_param.cmp_funcs_ = &(schema_->cmp_funcs_);
insert_table_param.online_sample_percent_ = store_ctx_->ctx_->param_.online_sample_percent_;
insert_table_param.is_no_logging_ = store_ctx_->ctx_->ddl_param_.is_no_logging_;
if (OB_ISNULL(insert_table_ctx_ =
OB_NEWx(ObDirectLoadInsertTableContext, (&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;

View File

@ -20,6 +20,7 @@
#include "lib/thread/ob_thread_name.h"
#include "share/ob_srv_rpc_proxy.h"
#include "share/backup/ob_tenant_archive_round.h"
#include "share/ob_ddl_common.h"
using namespace oceanbase;
using namespace rootserver;
@ -457,6 +458,17 @@ int ObArchiveSchedulerService::open_tenant_archive_mode_(
// TODO(wangxiaohui.wxh):4.3, return failed if any tenant failed
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids_array.count(); i++) {
bool is_no_logging = false;
const uint64_t &tenant_id = tenant_ids_array.at(i);
if (OB_FAIL(ObDDLUtil::get_no_logging_param(tenant_id, is_no_logging))) {
LOG_WARN("fail to check tenant no logging param", K(ret), K(tenant_id));
} else if (is_no_logging) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("is not allow to set archive when in no logging mode", K(ret), K(tenant_id));
}
}
for (int64_t i = 0; i < tenant_ids_array.count(); i++) {
int tmp_ret = OB_SUCCESS;
const uint64_t &tenant_id = tenant_ids_array.at(i);
@ -471,7 +483,16 @@ int ObArchiveSchedulerService::open_tenant_archive_mode_(const uint64_t tenant_i
{
int ret = OB_SUCCESS;
ObArchiveHandler tenant_scheduler;
if (OB_FAIL(tenant_scheduler.init(tenant_id, schema_service_, *rpc_proxy_, *sql_proxy_))) {
bool is_no_logging = false;
if (OB_FAIL(ObDDLUtil::get_no_logging_param(tenant_id, is_no_logging))) {
LOG_WARN("fail to check tenant no logging param", K(ret), K(tenant_id));
} else if (is_no_logging) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("is not allow to set archive when in no logging mode", K(ret), K(tenant_id));
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(tenant_scheduler.init(tenant_id, schema_service_, *rpc_proxy_, *sql_proxy_))) {
LOG_WARN("failed to init tenant archive scheduler", K(ret), K(tenant_id));
} else if (OB_FAIL(tenant_scheduler.open_archive_mode())) {
LOG_WARN("failed to open archive mode", K(ret), K(tenant_id));

View File

@ -78,6 +78,8 @@ int ObColumnRedefinitionTask::init(const uint64_t tenant_id, const int64_t task_
start_time_ = ObTimeUtility::current_time();
if (OB_FAIL(init_ddl_task_monitor_info(target_object_id_))) {
LOG_WARN("init ddl task monitor info failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::get_no_logging_param(tenant_id_, is_no_logging_))) {
LOG_WARN("fail to get no logging param", K(ret), K(tenant_id_));
} else {
dst_tenant_id_ = tenant_id_;
dst_schema_version_ = schema_version_;

View File

@ -689,6 +689,7 @@ int ObDDLRedefinitionTask::send_build_single_replica_request()
param.execution_id_ = execution_id_;
param.data_format_version_ = data_format_version_;
param.consumer_group_id_ = alter_table_arg_.consumer_group_id_;
param.is_no_logging_ = is_no_logging_;
if (OB_FAIL(ObDDLUtil::get_tablets(tenant_id_, object_id_, param.source_tablet_ids_))) {
LOG_WARN("fail to get tablets", K(ret), K(tenant_id_), K(object_id_));
} else if (OB_FAIL(ObDDLUtil::get_tablets(dst_tenant_id_, target_object_id_, param.dest_tablet_ids_))) {

View File

@ -163,6 +163,7 @@ int ObDDLReplicaBuildExecutor::build(const ObDDLReplicaBuildExecutorParam &param
data_format_version_ = param.data_format_version_;
consumer_group_id_ = param.consumer_group_id_;
min_split_start_scn_ = param.min_split_start_scn_;
is_no_logging_ = param.is_no_logging_;
ObArray<ObSingleReplicaBuildCtx> replica_build_ctxs;
if (OB_FAIL(construct_replica_build_ctxs(param, replica_build_ctxs))) {
LOG_WARN("failed to construct replica build ctxs", K(ret));
@ -476,6 +477,7 @@ int ObDDLReplicaBuildExecutor::construct_rpc_arg(
} else {
arg.parallelism_ = parallelism_;
}
arg.is_no_logging_ = is_no_logging_;
if (OB_FAIL(arg.lob_col_idxs_.assign(lob_col_idxs_))) {
LOG_WARN("failed to assign to lob col idxs", K(ret));
} else if (OB_FAIL(arg.parallel_datum_rowkey_list_.assign(replica_build_ctx.parallel_datum_rowkey_list_))) {

View File

@ -46,7 +46,8 @@ public:
lob_col_idxs_(),
can_reuse_macro_blocks_(),
parallel_datum_rowkey_list_(),
min_split_start_scn_()
min_split_start_scn_(),
is_no_logging_(false)
{}
~ObDDLReplicaBuildExecutorParam () = default;
bool is_valid() const {
@ -73,12 +74,13 @@ public:
}
return is_valid;
}
TO_STRING_KV(K_(tenant_id), K_(dest_tenant_id), K_(ddl_type), K_(source_tablet_ids),
K_(dest_tablet_ids), K_(source_table_ids), K_(dest_table_ids),
K_(source_schema_versions), K_(dest_schema_versions), K_(snapshot_version),
K_(task_id), K_(parallelism), K_(execution_id),
K_(data_format_version), K_(consumer_group_id), K_(can_reuse_macro_blocks),
K_(parallel_datum_rowkey_list), K(min_split_start_scn_));
K_(parallel_datum_rowkey_list), K(min_split_start_scn_), K_(is_no_logging));
public:
uint64_t tenant_id_;
uint64_t dest_tenant_id_;
@ -100,6 +102,7 @@ public:
ObSArray<bool> can_reuse_macro_blocks_;
common::ObSEArray<common::ObSEArray<blocksstable::ObDatumRowkey, 8>, 8> parallel_datum_rowkey_list_;
share::SCN min_split_start_scn_;
int64_t is_no_logging_;
};
enum class ObReplicaBuildStat
@ -279,6 +282,7 @@ private:
ObArray<ObSingleReplicaBuildCtx> replica_build_ctxs_; // NOTE hold lock before access
share::SCN min_split_start_scn_;
ObSpinLock lock_; // NOTE keep rpc send out of lock scope
bool is_no_logging_;
};
} // end namespace rootserver

View File

@ -176,7 +176,8 @@ ObDDLTaskSerializeField::ObDDLTaskSerializeField(const int64_t task_version,
const int32_t sub_task_trace_id,
const bool is_unique_index,
const bool is_global_index,
const bool is_pre_split)
const bool is_pre_split,
const bool is_no_logging)
{
task_version_ = task_version;
parallelism_ = parallelism;
@ -184,10 +185,10 @@ ObDDLTaskSerializeField::ObDDLTaskSerializeField(const int64_t task_version,
consumer_group_id_ = consumer_group_id;
is_abort_ = is_abort;
sub_task_trace_id_ = sub_task_trace_id;
is_no_logging_ = is_no_logging;
is_unique_index_ = is_unique_index;
is_global_index_ = is_global_index;
is_pre_split_ = is_pre_split;
is_no_logging_ = false;
}
void ObDDLTaskSerializeField::reset()
@ -1047,7 +1048,8 @@ int ObDDLTask::serialize_params_to_message(char *buf, const int64_t buf_size, in
{
int ret = OB_SUCCESS;
ObDDLTaskSerializeField serialize_field(task_version_, parallelism_, data_format_version_, consumer_group_id_, is_abort_,
sub_task_trace_id_, is_unique_index_, is_global_index_, is_pre_split_);
sub_task_trace_id_, is_unique_index_, is_global_index_, is_pre_split_, is_no_logging_);
if (OB_UNLIKELY(nullptr == buf || buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_size));
@ -1078,6 +1080,7 @@ int ObDDLTask::deserialize_params_from_message(const uint64_t tenant_id, const c
is_unique_index_ = serialize_field.is_unique_index_;
is_global_index_ = serialize_field.is_global_index_;
is_pre_split_ = serialize_field.is_pre_split_;
is_no_logging_ = serialize_field.is_no_logging_;
}
return ret;
}
@ -1085,7 +1088,7 @@ int ObDDLTask::deserialize_params_from_message(const uint64_t tenant_id, const c
int64_t ObDDLTask::get_serialize_param_size() const
{
ObDDLTaskSerializeField serialize_field(task_version_, parallelism_, data_format_version_, consumer_group_id_, is_abort_,
sub_task_trace_id_, is_unique_index_, is_global_index_, is_pre_split_);
sub_task_trace_id_, is_unique_index_, is_global_index_, is_pre_split_, is_no_logging_);
return serialize_field.get_serialize_size();
}

View File

@ -208,9 +208,10 @@ public:
const int64_t consumer_group_id,
const bool is_abort,
const int32_t sub_task_trace_id,
const bool is_unique_index = false,
const bool is_global_index = false,
const bool is_pre_split = false);
const bool is_unique_index,
const bool is_global_index,
const bool is_pre_split,
const bool is_no_logging_);
~ObDDLTaskSerializeField() = default;
void reset();
public:
@ -688,7 +689,7 @@ public:
allocator_(lib::ObLabel("DdlTask")), compat_mode_(lib::Worker::CompatMode::INVALID), err_code_occurence_cnt_(0),
longops_stat_(nullptr), gmt_create_(0), stat_info_(), delay_schedule_time_(0), next_schedule_ts_(0),
execution_id_(-1), start_time_(0), data_format_version_(0), is_pre_split_(false), wait_trans_ctx_(), is_unique_index_(false),
is_global_index_(false), consensus_schema_version_(OB_INVALID_VERSION)
is_global_index_(false), consensus_schema_version_(OB_INVALID_VERSION), is_no_logging_(false)
{}
ObDDLTask():
ObDDLTask(share::DDL_INVALID)
@ -787,7 +788,7 @@ public:
bool is_unique_index() { return is_unique_index_; }
bool is_global_index() { return is_global_index_; }
int64_t get_consensus_schema_version() { return consensus_schema_version_; }
bool get_is_no_logging() const { return is_no_logging_; }
#ifdef ERRSIM
int check_errsim_error();
#endif
@ -799,8 +800,7 @@ public:
K_(task_version), K_(parallelism), K_(ddl_stmt_str), K_(compat_mode),
K_(sys_task_id), K_(err_code_occurence_cnt), K_(stat_info),
K_(next_schedule_ts), K_(delay_schedule_time), K(execution_id_), K(sql_exec_addrs_), K_(data_format_version), K(consumer_group_id_),
K_(dst_tenant_id), K_(dst_schema_version), K_(is_pre_split), K_(is_unique_index), K_(is_global_index), K_(consensus_schema_version));
static const int64_t MAX_ERR_TOLERANCE_CNT = 3L; // Max torlerance count for error code.
K_(dst_tenant_id), K_(dst_schema_version), K_(is_pre_split), K_(is_unique_index), K_(is_global_index), K_(consensus_schema_version), K(is_no_logging_)); static const int64_t MAX_ERR_TOLERANCE_CNT = 3L; // Max torlerance count for error code.
static const int64_t DEFAULT_TASK_IDLE_TIME_US = 10L * 1000L; // 10ms
protected:
int gather_redefinition_stats(const uint64_t tenant_id,
@ -884,6 +884,7 @@ protected:
bool is_unique_index_;
bool is_global_index_;
int64_t consensus_schema_version_;
bool is_no_logging_;
};
enum ColChecksumStat

View File

@ -936,6 +936,7 @@ int ObDropVecIndexTask::send_build_single_replica_request()
param.execution_id_ = execution_id_; // should >= 0
param.data_format_version_ = data_format_version_; // should > 0
param.consumer_group_id_ = consumer_group_id_;
param.is_no_logging_ = is_no_logging_;
if (OB_FAIL(ObDDLUtil::get_tablets(tenant_id_, vec_index_snapshot_data_.table_id_, param.source_tablet_ids_))) {
LOG_WARN("fail to get tablets", K(ret), K(tenant_id_), K(object_id_));

View File

@ -144,6 +144,8 @@ int ObFtsIndexBuildTask::init(
} else if (FALSE_IT(task_status_ = static_cast<ObDDLTaskStatus>(task_status))) {
} else if (OB_FAIL(init_ddl_task_monitor_info(index_schema->get_table_id()))) {
LOG_WARN("init ddl task monitor info failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::get_no_logging_param(tenant_id_, is_no_logging_))) {
LOG_WARN("fail to get no logging param", K(ret), K(tenant_id_));
} else {
dst_tenant_id_ = tenant_id_;
dst_schema_version_ = schema_version_;

View File

@ -427,6 +427,8 @@ int ObIndexBuildTask::init(
if (OB_FAIL(ret)) {
} else if (OB_FAIL(init_ddl_task_monitor_info(index_schema->get_table_id()))) {
LOG_WARN("init ddl task monitor info failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::get_no_logging_param(tenant_id_, is_no_logging_))) {
LOG_WARN("fail to get no logging param", K(ret), K(tenant_id_));
} else {
dst_tenant_id_ = tenant_id_;
dst_schema_version_ = schema_version_;
@ -439,7 +441,6 @@ int ObIndexBuildTask::init(
}
ddl_tracing_.open();
}
return ret;
}

View File

@ -118,6 +118,8 @@ int ObTableRedefinitionTask::init(const ObTableSchema* src_table_schema,
LOG_WARN("init ddl task monitor info failed", K(ret));
} else if (OB_FAIL(check_ddl_can_retry(ddl_need_retry_at_executor, dst_table_schema))) {
LOG_WARN("check use heap table ddl plan failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::get_no_logging_param(tenant_id_, is_no_logging_))) {
LOG_WARN("fail to get no logging param", K(ret), K(tenant_id_));
} else {
is_inited_ = true;
ddl_tracing_.open();

View File

@ -10852,6 +10852,8 @@ int ObRootService::set_config_pre_hook(obrpc::ObAdminSetConfigArg &arg)
ret = check_freeze_trigger_percentage_(*item);
} else if (0 == STRCMP(item->name_.ptr(), WRITING_THROTTLEIUNG_TRIGGER_PERCENTAGE)) {
ret = check_write_throttle_trigger_percentage(*item);
} else if (0 == STRCMP(item->name_.ptr(), _NO_LOGGING)) {
ret = check_no_logging(*item);
} else if (0 == STRCMP(item->name_.ptr(), WEAK_READ_VERSION_REFRESH_INTERVAL)) {
int64_t refresh_interval = ObConfigTimeParser::get(item->value_.ptr(), valid);
if (valid && OB_FAIL(check_weak_read_version_refresh_interval(refresh_interval, valid))) {
@ -11047,6 +11049,14 @@ int ObRootService::check_write_throttle_trigger_percentage(obrpc::ObAdminSetConf
return ret;
}
int ObRootService::check_no_logging(obrpc::ObAdminSetConfigItem &item)
{
int ret = OB_SUCCESS;
const char *warn_log = "set _no_logging, becacuse archivelog and _no_logging are exclusive parameters";
CHECK_TENANTS_CONFIG_WITH_FUNC(ObConfigDDLNoLoggingChecker, warn_log);
return ret;
}
int ObRootService::check_data_disk_write_limit_(obrpc::ObAdminSetConfigItem &item)
{
int ret = OB_SUCCESS;

View File

@ -994,6 +994,7 @@ private:
int check_freeze_trigger_percentage_(obrpc::ObAdminSetConfigItem &item);
int check_write_throttle_trigger_percentage(obrpc::ObAdminSetConfigItem &item);
int add_rs_event_for_alter_ls_replica_(const obrpc::ObAdminAlterLSReplicaArg &arg, const int ret_val);
int check_no_logging(obrpc::ObAdminSetConfigItem &item);
int check_data_disk_write_limit_(obrpc::ObAdminSetConfigItem &item);
int check_data_disk_usage_limit_(obrpc::ObAdminSetConfigItem &item);
int check_vector_memory_limit_(obrpc::ObAdminSetConfigItem &item);

View File

@ -18,7 +18,8 @@
#include "lib/ob_errno.h"
#include "lib/utility/ob_macro_utils.h"
#include "lib/oblog/ob_log_module.h"
#include "share/ob_ddl_common.h"
#include "rootserver/ob_rs_event_history_table_operator.h"
using namespace oceanbase;
using namespace common;
using namespace share;
@ -273,17 +274,34 @@ int ObDestRoundCheckpointer::gen_new_round_info_(
if (OB_FAIL(ret)) {
} else if (old_round_info.state_.is_beginning()) {
if (counter.not_start_cnt_ > 0) {
bool is_no_logging = false;
if (OB_FAIL(ObDDLUtil::get_no_logging_param(old_round_info.key_.tenant_id_, is_no_logging))) {
LOG_WARN("failed to check no logging", K(ret), K(old_round_info.key_.tenant_id_));
} else if (counter.not_start_cnt_ > 0) {
need_checkpoint = false;
} else if (counter.interrupted_cnt_ > 0) {
} else if (counter.interrupted_cnt_ > 0 || is_no_logging) {
ObSqlString comment;
new_round_info.state_.set_interrupted();
if (OB_FAIL(comment.append_fmt("log stream %ld interrupted.", counter.interrupted_ls_id_.id()))) {
LOG_WARN("failed to append interrupted log stream comment", K(ret), K(new_round_info), K(counter));
if (is_no_logging) {
if (OB_FAIL(comment.append_fmt("tenant %lu no logging is open", old_round_info.key_.tenant_id_))) {
LOG_WARN("failed to append interrupted log stream comment", K(ret), K(is_no_logging));
}
ROOTSERVICE_EVENT_ADD("log_archive", "change_status",
"tenant_id", old_round_info.key_.tenant_id_,
"dest_no", new_round_info.key_.dest_no_,
"old_status", old_round_info.state_.to_status_str(),
"new_status", new_round_info.state_.to_status_str());
} else{
if (OB_FAIL(comment.append_fmt("log stream %ld interrupted", counter.interrupted_ls_id_.id()))) {
LOG_WARN("failed to append interrupted log stream comment", K(ret), K(new_round_info), K(counter));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(new_round_info.comment_.assign(comment.ptr()))) {
LOG_WARN("failed to assign comment", K(ret), K(new_round_info), K(counter), K(comment));
}
LOG_INFO("switch to INTERRUPTED state", K(ret), K(old_round_info), K(counter), K(new_round_info));
LOG_INFO("switch to INTERRUPTED state", K(ret), K(old_round_info), K(counter), K(new_round_info), K(is_no_logging));
} else if (next_checkpoint_scn <= old_round_info.start_scn_) {
need_checkpoint = false;
} else if (OB_FALSE_IT(new_round_info.checkpoint_scn_ = next_checkpoint_scn)) {
@ -295,18 +313,34 @@ int ObDestRoundCheckpointer::gen_new_round_info_(
LOG_WARN("unexpected error occur", K(ret), K(old_round_info), K(counter), K(new_round_info));
}
} else if (old_round_info.state_.is_doing()) {
bool is_no_logging = false;
if (counter.not_start_cnt_ > 0) {
need_checkpoint = false;
} else if (OB_FALSE_IT(new_round_info.checkpoint_scn_ = next_checkpoint_scn)) {
} else if (counter.interrupted_cnt_ > 0) {
} else if (OB_FAIL(ObDDLUtil::get_no_logging_param(old_round_info.key_.tenant_id_, is_no_logging))) {
LOG_WARN("failed to check no logging", K(ret), K(old_round_info.key_.tenant_id_));
} else if (!is_no_logging && OB_FALSE_IT(new_round_info.checkpoint_scn_ = next_checkpoint_scn)) { // don't set checkpoint_scn when no logging is true
} else if (counter.interrupted_cnt_ > 0 || is_no_logging) {
ObSqlString comment;
new_round_info.state_.set_interrupted();
if (OB_FAIL(comment.append_fmt("log stream %ld interrupted.", counter.interrupted_ls_id_.id()))) {
LOG_WARN("failed to append interrupted log stream comment", K(ret), K(new_round_info), K(counter));
} else if (OB_FAIL(new_round_info.comment_.assign(comment.ptr()))) {
LOG_WARN("failed to assign comment", K(ret), K(new_round_info), K(counter), K(comment));
if (is_no_logging) {
if (OB_FAIL(comment.append_fmt("tenant %lu no logging is open", old_round_info.key_.tenant_id_))) {
LOG_WARN("failed to append interrupted log stream comment", K(ret), K(is_no_logging));
}
ROOTSERVICE_EVENT_ADD("log_archive", "change_status",
"tenant_id", old_round_info.key_.tenant_id_,
"dest_no", new_round_info.key_.dest_no_,
"old_status", old_round_info.state_.to_status_str(),
"new_status", new_round_info.state_.to_status_str());
} else {
if (OB_FAIL(comment.append_fmt("log stream %ld interrupted", counter.interrupted_ls_id_.id()))) {
LOG_WARN("failed to append interrupted log stream comment", K(ret), K(new_round_info), K(counter));
}
}
LOG_INFO("switch to INTERRUPTED state", K(ret), K(old_round_info), K(counter), K(new_round_info));
if (OB_FAIL(ret)) {
} else if (OB_FAIL(new_round_info.comment_.assign(comment.ptr()))) {
LOG_WARN("failed to assign comment", K(ret), K(new_round_info), K(counter), K(is_no_logging), K(comment));
}
LOG_INFO("switch to INTERRUPTED state", K(ret), K(old_round_info), K(counter), K(is_no_logging), K(new_round_info));
} else if (counter.doing_cnt_ == actual_count) {
} else {
ret = OB_ERR_UNEXPECTED;

View File

@ -37,7 +37,7 @@
#include "lib/utility/utility.h"
#include "storage/tx_storage/ob_tenant_freezer.h"
#include "share/vector_index/ob_vector_index_util.h"
#include "share/backup/ob_tenant_archive_mgr.h"
namespace oceanbase
{
using namespace share;
@ -1346,6 +1346,56 @@ bool ObConfigTableStoreFormatChecker::check(const ObConfigItem &t) const {
return bret;
}
bool ObConfigDDLNoLoggingChecker::check(const uint64_t tenant_id, const obrpc::ObAdminSetConfigItem &t) {
int ret = OB_SUCCESS;
bool is_valid = true;
uint64_t data_version = 0;
const bool value = ObConfigBoolParser::get(t.value_.ptr(), is_valid);
if (!is_valid) {
} else if (!GCTX.is_shared_storage_mode()) {
is_valid = false;
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "it's not allowded to set no logging in shared nothing mode");
} else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
is_valid = false;
OB_LOG(WARN, "failed to get mini data version", K(ret));
} else if (data_version < DATA_VERSION_4_3_5_0) {
is_valid = false;
ret = OB_NOT_SUPPORTED;
OB_LOG(WARN, "it's not allowded to set no logging during cluster updating process", K(ret));
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "it's not allowded to set no logging during cluster updating process");
}
if (!is_valid) {
} else {
if (OB_SYS_TENANT_ID == tenant_id) {
/* sys tenant not no allow archive */
} else {
ObArchivePersistHelper archive_op;
ObArchiveMode archive_mode;
common::ObMySQLProxy *sql_proxy = nullptr;
if (OB_ISNULL(sql_proxy = GCTX.sql_proxy_)) {
is_valid = false;
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "invalid sql proxy", K(ret), KP(sql_proxy));
} else if (OB_FAIL(archive_op.init(tenant_id))) {
is_valid = false;
OB_LOG(WARN, "failed to init archive op", K(ret), K(tenant_id));
} else if (OB_FAIL(archive_op.get_archive_mode(*sql_proxy, archive_mode))) {
is_valid = false;
OB_LOG(WARN, "failed to get archive mode", K(ret));
} else if (value && archive_mode.is_archivelog()) {
is_valid = false;
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "it's no allowded to set no logging during archive");
}
}
}
if (OB_FAIL(ret)) {
is_valid = false;
}
return is_valid;
}
bool ObConfigMigrationChooseSourceChecker::check(const ObConfigItem &t) const
{
ObString v_str(t.str());

View File

@ -896,6 +896,13 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObConfigTableStoreFormatChecker);
};
class ObConfigDDLNoLoggingChecker: public ObConfigChecker {
public:
static bool check(const uint64_t tenant_id, const obrpc::ObAdminSetConfigItem &t);
private:
DISALLOW_COPY_AND_ASSIGN(ObConfigDDLNoLoggingChecker);
};
class ObConfigArchiveLagTargetChecker {
public:
ObConfigArchiveLagTargetChecker(){}

View File

@ -53,6 +53,7 @@ const char* const SSL_EXTERNAL_KMS_INFO = "ssl_external_kms_info";
const char* const CLUSTER_ID = "cluster_id";
const char* const CLUSTER_NAME = "cluster";
const char* const FREEZE_TRIGGER_PERCENTAGE = "freeze_trigger_percentage";
const char* const _NO_LOGGING = "_no_logging";
const char* const WRITING_THROTTLEIUNG_TRIGGER_PERCENTAGE = "writing_throttling_trigger_percentage";
const char* const DATA_DISK_WRITE_LIMIT_PERCENTAGE = "data_disk_write_limit_percentage";
const char* const DATA_DISK_USAGE_LIMIT_PERCENTAGE = "data_disk_usage_limit_percentage";

View File

@ -2526,7 +2526,7 @@ int ObDDLUtil::get_data_information(
{
uint64_t target_object_id = 0;
int64_t schema_version = 0;
bool is_no_logging = false;
return get_data_information(
tenant_id,
task_id,
@ -2534,7 +2534,8 @@ int ObDDLUtil::get_data_information(
snapshot_version,
task_status,
target_object_id,
schema_version);
schema_version,
is_no_logging);
}
int ObDDLUtil::get_data_information(
@ -2544,7 +2545,8 @@ int ObDDLUtil::get_data_information(
int64_t &snapshot_version,
share::ObDDLTaskStatus &task_status,
uint64_t &target_object_id,
int64_t &schema_version)
int64_t &schema_version,
bool &is_no_logging)
{
int ret = OB_SUCCESS;
data_format_version = 0;
@ -2591,6 +2593,7 @@ int ObDDLUtil::get_data_information(
LOG_WARN("deserialize from msg failed", K(ret));
} else {
data_format_version = task.get_data_format_version();
is_no_logging = task.get_is_no_logging();
}
}
}
@ -3288,6 +3291,20 @@ int64_t ObDDLUtil::get_real_parallelism(const int64_t parallelism, const bool is
return real_parallelism;
}
int ObDDLUtil::get_no_logging_param(const int64_t tenant_id, bool &is_no_logging)
{
int ret = OB_SUCCESS;
is_no_logging = false;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
if (!tenant_config.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tenant config is invalid", K(ret), K(tenant_id));
} else {
is_no_logging = tenant_config->_no_logging;
}
return ret;
}
/****************** ObCheckTabletDataComplementOp *************/
int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status(

View File

@ -746,7 +746,8 @@ public:
int64_t &snapshot_version,
share::ObDDLTaskStatus &task_status,
uint64_t &target_object_id,
int64_t &schema_version);
int64_t &schema_version,
bool &is_no_logging);
static int replace_user_tenant_id(
const ObDDLType &ddl_type,
@ -904,6 +905,8 @@ public:
int64_t &check_dag_exit_retry_cnt,
bool is_complement_data_dag,
bool &all_dag_exit);
static int get_no_logging_param(const int64_t tenant_id, bool &is_no_logging);
private:
static int hold_snapshot(
common::ObMySQLTransaction &trans,

View File

@ -9113,6 +9113,7 @@ int ObDDLBuildSingleReplicaRequestArg::assign(const ObDDLBuildSingleReplicaReque
can_reuse_macro_block_ = other.can_reuse_macro_block_;
split_sstable_type_ = other.split_sstable_type_;
min_split_start_scn_ = other.min_split_start_scn_;
is_no_logging_ = other.is_no_logging_;
}
return ret;
}

View File

@ -165,6 +165,9 @@ DEF_STR_WITH_CHECKER(default_table_store_format, OB_TENANT_PARAMETER, "row",
"Specify the default storage format of creating table: row, column, compound format of row and column"
"values: row, column, compound",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(_no_logging, OB_TENANT_PARAMETER, "False",
"set true to skip writing ddl clog when all server using the same oss in shared storage mode",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(storage_rowsets_size, OB_TENANT_PARAMETER, "8192", "(0,1048576]",
"the row number processed by vectorized storage engine within one batch in column storage. Range: (0,1048576]",

View File

@ -905,11 +905,11 @@ int ObPxSubCoord::start_ddl()
const int64_t ddl_table_id = phy_plan->get_ddl_table_id();
const int64_t ddl_task_id = phy_plan->get_ddl_task_id();
const int64_t ddl_execution_id = phy_plan->get_ddl_execution_id();
uint64_t unused_taget_object_id = OB_INVALID_ID;
int64_t schema_version = OB_INVALID_VERSION;
bool is_no_logging = false;
if (OB_FAIL(ObDDLUtil::get_data_information(tenant_id, ddl_task_id, data_format_version, snapshot_version, unused_task_status, unused_taget_object_id, schema_version))) {
if (OB_FAIL(ObDDLUtil::get_data_information(tenant_id, ddl_task_id, data_format_version, snapshot_version, unused_task_status, unused_taget_object_id, schema_version, is_no_logging))) {
LOG_WARN("get ddl cluster version failed", K(ret));
} else if (OB_UNLIKELY(snapshot_version <= 0)) {
ret = OB_NEED_RETRY;
@ -920,12 +920,12 @@ int ObPxSubCoord::start_ddl()
} else {
ddl_ctrl_.direct_load_type_ = ObDDLUtil::use_idempotent_mode(data_format_version) ?
ObDirectLoadType::DIRECT_LOAD_DDL_V2 : ObDirectLoadType::DIRECT_LOAD_DDL;
ObTabletDirectLoadInsertParam direct_load_param;
direct_load_param.is_replay_ = false;
direct_load_param.common_param_.direct_load_type_ = ddl_ctrl_.direct_load_type_;
direct_load_param.common_param_.data_format_version_ = data_format_version;
direct_load_param.common_param_.read_snapshot_ = snapshot_version;
direct_load_param.common_param_.is_no_logging_ = is_no_logging;
direct_load_param.runtime_only_param_.exec_ctx_ = exec_ctx;
direct_load_param.runtime_only_param_.task_id_ = ddl_task_id;
direct_load_param.runtime_only_param_.table_id_ = ddl_table_id;

View File

@ -152,6 +152,7 @@ int ObComplementDataParam::init(const ObDDLBuildSingleReplicaRequestArg &arg)
tablet_task_id_ = arg.tablet_task_id_;
data_format_version_ = arg.data_format_version_;
user_parallelism_ = arg.parallelism_;
is_no_logging_ = arg.is_no_logging_;
FLOG_INFO("succeed to init ObComplementDataParam", K(ret), KPC(this));
}
return ret;
@ -450,6 +451,7 @@ int ObComplementDataContext::init(
direct_load_param.common_param_.read_snapshot_ = param.snapshot_version_;
direct_load_param.common_param_.ls_id_ = param.dest_ls_id_;
direct_load_param.common_param_.tablet_id_ = param.dest_tablet_id_;
direct_load_param.common_param_.is_no_logging_ = param.is_no_logging_;
direct_load_param.runtime_only_param_.exec_ctx_ = nullptr;
direct_load_param.runtime_only_param_.task_id_ = param.task_id_;
direct_load_param.runtime_only_param_.table_id_ = param.dest_table_id_;

View File

@ -61,7 +61,7 @@ public:
row_store_type_(common::ENCODING_ROW_STORE), orig_schema_version_(0), dest_schema_version_(0),
snapshot_version_(0), task_id_(0), execution_id_(-1), tablet_task_id_(0), compat_mode_(lib::Worker::CompatMode::INVALID), data_format_version_(0),
orig_schema_tablet_size_(0), user_parallelism_(0),
concurrent_cnt_(0), ranges_(),
concurrent_cnt_(0), ranges_(),is_no_logging_(false),
allocator_("CompleteDataPar", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID())
{}
~ObComplementDataParam() { destroy(); }
@ -118,13 +118,14 @@ public:
data_format_version_ = 0;
user_parallelism_ = 0;
concurrent_cnt_ = 0;
is_no_logging_ = false;
ranges_.reset();
}
TO_STRING_KV(K_(is_inited), K_(orig_tenant_id), K_(dest_tenant_id), K_(orig_ls_id), K_(dest_ls_id),
K_(orig_table_id), K_(dest_table_id), K_(orig_tablet_id), K_(dest_tablet_id), K_(orig_schema_version),
K_(tablet_task_id), K_(dest_schema_version), K_(snapshot_version), K_(task_id),
K_(execution_id), K_(compat_mode), K_(data_format_version), K_(orig_schema_tablet_size), K_(user_parallelism),
K_(concurrent_cnt), K_(ranges));
K_(concurrent_cnt), K_(ranges), K_(is_no_logging));
public:
bool is_inited_;
uint64_t orig_tenant_id_;
@ -149,6 +150,7 @@ public:
/* complememt prepare task will initialize parallel task ranges */
int64_t concurrent_cnt_; /* real complement tasks num */
ObArray<blocksstable::ObDatumRange> ranges_;
bool is_no_logging_;
private:
common::ObArenaAllocator allocator_;
static constexpr int64_t MAX_RPC_STREAM_WAIT_THREAD_COUNT = 100;

View File

@ -201,7 +201,7 @@ int ObDDLMacroBlockClogCb::init(const share::ObLSID &ls_id,
redo_info.data_buffer_.ptr(),
redo_info.data_buffer_.length(),
redo_info.block_type_))) {
LOG_WARN("failed to set data macro meta", K(ret), KP(redo_info.data_buffer_.ptr()), K(redo_info.data_buffer_.length()));
LOG_WARN("failed to set data macro meta", K(ret), K(redo_info));
} else {
ddl_macro_block_.block_type_ = redo_info.block_type_;
ddl_macro_block_.logic_id_ = redo_info.logic_id_;

View File

@ -782,7 +782,7 @@ int ObDDLRedoLogWriter::local_write_ddl_macro_redo(
LOG_WARN("fail to deserialize ddl redo log", K(ret));
/* use the ObString data_buffer_ in tmp_log.redo_info_, do not rely on the macro_block_buf in original log*/
} else if (OB_FAIL(cb->init(ls_id, tmp_log.get_redo_info(), macro_block_id, tablet_handle))) {
LOG_WARN("init ddl clog callback failed", K(ret), K(tmp_log.get_redo_info()), K(macro_block_id));
LOG_WARN("init ddl clog callback failed", K(ret), K(redo_info), K(tmp_log.get_redo_info()), K(macro_block_id));
} else if (OB_FAIL(DDL_SIM(tenant_id, task_id, DDL_REDO_WRITER_WRITE_MACRO_LOG_FAILED))) {
LOG_WARN("ddl sim failure", K(ret), K(tenant_id), K(task_id));
} else if (OB_FAIL(log_handler->append(buffer,
@ -1690,7 +1690,9 @@ int ObDDLRedoLogWriter::write_block_to_disk(const ObDDLMacroBlockRedoInfo &redo_
bool is_object_exist = false;
bool is_major_exist = false;
macro_id = redo_info.macro_block_id_;
if (OB_FAIL(ObDDLUtil::is_major_exist(ls_id, redo_info.table_key_.tablet_id_, is_major_exist))) {
if (ObDDLMacroBlockType::DDL_MB_SS_EMPTY_DATA_TYPE == redo_info.block_type_) {
/* if empty block type, skip write to disk*/
} else if (OB_FAIL(ObDDLUtil::is_major_exist(ls_id, redo_info.table_key_.tablet_id_, is_major_exist))) {
LOG_WARN("failed to check is major exist", K(ret));
} else if (is_major_exist) {
/* if major exit, skip*/
@ -1859,7 +1861,6 @@ int ObDDLRedoLogWriterCallback::write(const ObStorageObjectHandle &macro_handle,
if (OB_SUCC(ret)) {
MacroBlockId macro_block_id = macro_handle.get_macro_id();
redo_info.table_key_ = table_key_;
redo_info.data_buffer_.assign(buf, buf_len);
redo_info.block_type_ = block_type_;
redo_info.logic_id_ = logic_id;
redo_info.start_scn_ = start_scn_;
@ -1870,6 +1871,11 @@ int ObDDLRedoLogWriterCallback::write(const ObStorageObjectHandle &macro_handle,
redo_info.parallel_cnt_ = 0; // TODO @zhuoran.zzr, place holder for shared storage
redo_info.cg_cnt_ = 0;
redo_info.with_cs_replica_ = with_cs_replica_;
if (ObDDLMacroBlockType::DDL_MB_SS_EMPTY_DATA_TYPE == block_type_) {
redo_info.data_buffer_.assign(nullptr, 0);
} else {
redo_info.data_buffer_.assign(buf, buf_len);
}
if (GCTX.is_shared_storage_mode()) { /* shared storage */
redo_info.macro_block_id_ = macro_handle.get_macro_id();
redo_info.parallel_cnt_ = parallel_cnt_;
@ -1882,11 +1888,18 @@ int ObDDLRedoLogWriterCallback::write(const ObStorageObjectHandle &macro_handle,
if (need_delay_) {
char *tmp_buf = nullptr;
if (OB_ISNULL(tmp_buf = (char*)(allocator_.alloc(buf_len)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc buf", K(ret));
} else if (FALSE_IT(MEMCPY(tmp_buf, buf, buf_len))) {
} else if (FALSE_IT(redo_info.data_buffer_.assign(tmp_buf, buf_len))) {
if (ObDDLMacroBlockType::DDL_MB_SS_EMPTY_DATA_TYPE == block_type_) {
redo_info.data_buffer_.assign(nullptr, 0);
} else {
if (OB_ISNULL(tmp_buf = (char*)(allocator_.alloc(buf_len)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc buf", K(ret));
} else if (FALSE_IT(MEMCPY(tmp_buf, buf, buf_len))) {
} else if (FALSE_IT(redo_info.data_buffer_.assign(tmp_buf, buf_len))) {
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(redo_info_array_.push_back(redo_info))) {
LOG_WARN("failed to push back val", K(ret));
} else if (redo_info_array_.count() > 10) {

View File

@ -493,13 +493,18 @@ int ObDDLRedoReplayExecutor::do_full_replay_(
#ifdef OB_BUILD_SHARED_STORAGE
if (GCTX.is_shared_storage_mode()){
/* write gc occupy file*/
if (OB_FAIL(ObDDLRedoLogWriter::write_gc_flag(tablet_handle,
if (ObDDLMacroBlockType::DDL_MB_SS_EMPTY_DATA_TYPE == macro_block.block_type_) {
/* skip write gc flag and upload block*/
} else if (OB_FAIL(ObDDLRedoLogWriter::write_gc_flag(tablet_handle,
redo_info.table_key_,
redo_info.parallel_cnt_,
redo_info.cg_cnt_))) {
LOG_WARN("failed to write tablet gc flag file", K(ret));
} else if (OB_FAIL(write_ss_block(write_info, macro_handle))) {
LOG_WARN("failed to write shared storage block", K(ret));
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(macro_block.block_handle_.set_block_id(log_->get_redo_info().macro_block_id_))) {
LOG_WARN("set macro block id failed", K(ret), K(log_->get_redo_info().macro_block_id_));
}
@ -514,6 +519,7 @@ int ObDDLRedoReplayExecutor::do_full_replay_(
LOG_WARN("set macro block id failed", K(ret), K(macro_handle.get_macro_id()));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(macro_block.set_data_macro_meta(macro_block.block_handle_.get_block_id(),
redo_info.data_buffer_.ptr(),
@ -559,6 +565,7 @@ int ObDDLRedoReplayExecutor::do_full_replay_(
} else if (data_format_version <= 0) {
data_format_version = direct_load_mgr_handle.get_obj()->get_data_format_version();
}
if (OB_SUCC(ret) && need_replay) {
if (OB_FAIL(ObDDLKVPendingGuard::set_macro_block(tablet_handle.get_obj(), macro_block,
snapshot_version, data_format_version, direct_load_mgr_handle))) {

View File

@ -91,7 +91,7 @@ int ObDDLServerClient::create_hidden_table(
LOG_WARN("failed to set register task id", K(ret), K(res));
}
if (OB_SUCC(ret)) {
if (OB_FAIL(wait_task_reach_pending(arg.get_tenant_id(), res.task_id_, snapshot_version, data_format_version, *GCTX.sql_proxy_))) {
if (OB_FAIL(wait_task_reach_pending(arg.get_tenant_id(), res.task_id_, snapshot_version, data_format_version, *GCTX.sql_proxy_, res.is_no_logging_))) {
LOG_WARN("failed to wait table lock. remove register task id and abort redef table task.", K(ret), K(arg), K(res));
}
#ifdef ERRSIM
@ -132,6 +132,7 @@ int ObDDLServerClient::start_redef_table(const obrpc::ObStartRedefTableArg &arg,
obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
int64_t unused_snapshot_version = OB_INVALID_VERSION;
uint64_t unused_data_format_version = 0;
bool unused_no_logging = false;
if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(arg));
@ -144,7 +145,7 @@ int ObDDLServerClient::start_redef_table(const obrpc::ObStartRedefTableArg &arg,
LOG_WARN("failed to start redef table", KR(ret), K(arg));
} else if (OB_FAIL(OB_DDL_HEART_BEAT_TASK_CONTAINER.set_register_task_id(res.task_id_, res.tenant_id_))) {
LOG_WARN("failed to set register task id", K(ret), K(res));
} else if (OB_FAIL(wait_task_reach_pending(arg.orig_tenant_id_, res.task_id_, unused_snapshot_version, unused_data_format_version, *GCTX.sql_proxy_))) {
} else if (OB_FAIL(wait_task_reach_pending(arg.orig_tenant_id_, res.task_id_, unused_snapshot_version, unused_data_format_version, *GCTX.sql_proxy_, unused_no_logging))) {
LOG_WARN("failed to wait table lock. remove register task id and abort redef table task.", K(ret), K(arg), K(res));
int tmp_ret = OB_SUCCESS;
obrpc::ObAbortRedefTableArg abort_redef_table_arg;
@ -400,7 +401,8 @@ int ObDDLServerClient::wait_task_reach_pending(
const int64_t task_id,
int64_t &snapshot_version,
uint64_t &data_format_version,
ObMySQLProxy &sql_proxy)
ObMySQLProxy &sql_proxy,
bool &is_no_logging)
{
int ret = OB_SUCCESS;
ObSqlString sql_string;
@ -417,9 +419,11 @@ int ObDDLServerClient::wait_task_reach_pending(
LOG_WARN("ddl sim failure", K(ret), K(tenant_id), K(task_id));
} else {
while (OB_SUCC(ret)) {
uint64_t unused_target_object_id = 0;
int64_t unused_schema_version = 0;
share::ObDDLTaskStatus task_status = share::ObDDLTaskStatus::PREPARE;
if (OB_FAIL(ObDDLUtil::get_data_information(tenant_id, task_id, data_format_version,
snapshot_version, task_status))) {
snapshot_version, task_status, unused_target_object_id, unused_schema_version, is_no_logging))) {
if (OB_LIKELY(OB_ITER_END == ret)) {
ret = OB_ENTRY_NOT_EXIST;
ObAddr unused_addr;

View File

@ -52,7 +52,8 @@ private:
const int64_t task_id,
int64_t &snapshot_version,
uint64_t &data_format_version,
ObMySQLProxy &sql_proxy);
ObMySQLProxy &sql_proxy,
bool &is_no_logging);
static int heart_beat_clear(const int64_t task_id, const uint64_t tenant_id);
static int check_need_stop(const uint64_t tenant_id);
};

View File

@ -136,9 +136,14 @@ int ObDDLMacroBlock::set_data_macro_meta(const MacroBlockId &macro_id, const cha
const bool force_set_macro_meta)
{
int ret = OB_SUCCESS;
if (!macro_id.is_valid() || nullptr == macro_block_buf || 0 >= size) {
if (!macro_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(macro_id), KP(macro_block_buf), K(size));
LOG_WARN("invalid arguments", K(ret), K(macro_id));
} else if (ObDDLMacroBlockType::DDL_MB_SS_EMPTY_DATA_TYPE == block_type) {
/* skip no logging type, not need to set meta*/
} else if (nullptr == macro_block_buf || 0 >= size) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), KP(macro_block_buf), K(size));
} else {
/* shared nothing need macro_meta*/
if (GCTX.is_shared_storage_mode() && !force_set_macro_meta) {
@ -417,18 +422,24 @@ void ObDDLMacroBlockRedoInfo::reset()
bool ObDDLMacroBlockRedoInfo::is_valid() const
{
bool ret = table_key_.is_valid() && data_buffer_.ptr() != nullptr && block_type_ != ObDDLMacroBlockType::DDL_MB_INVALID_TYPE
bool ret = table_key_.is_valid() && block_type_ != ObDDLMacroBlockType::DDL_MB_INVALID_TYPE
&& start_scn_.is_valid_and_not_min() && data_format_version_ >= 0 && macro_block_id_.is_valid()
// the type is default invalid, allow default value for compatibility
&& type_ >= ObDirectLoadType::DIRECT_LOAD_INVALID && type_ < ObDirectLoadType::DIRECT_LOAD_MAX;
if (ret && is_incremental_direct_load(type_)) {
ret = logic_id_.is_valid() && trans_id_.is_valid();
}
if (ret && ObDDLMacroBlockType::DDL_MB_SS_EMPTY_DATA_TYPE != block_type_){
/* when in ss empty type, nullptr is allowded*/
ret = ret && !((data_buffer_.ptr() == nullptr || data_buffer_.length() == 0));
}
if (ret && !GCTX.is_shared_storage_mode()) { /* for shared nothing */
ret = logic_id_.is_valid();
#ifdef OB_BUILD_SHARED_STORAGE
} else if (ret && GCTX.is_shared_storage_mode()) { /* for shared storage*/
ret = ret && (parallel_cnt_ > 0 && cg_cnt_ >0);
ret = ret && (parallel_cnt_ > 0 && cg_cnt_ >0);
#endif
}
return ret;

View File

@ -1235,7 +1235,7 @@ ObTabletDirectLoadMgr::ObTabletDirectLoadMgr()
lock_(), ref_cnt_(0), direct_load_type_(ObDirectLoadType::DIRECT_LOAD_INVALID),
need_process_cs_replica_(false), need_fill_column_group_(false), sqc_build_ctx_(),
column_items_(), lob_column_idxs_(), lob_col_types_(), schema_item_(), dir_id_(0), task_cnt_(0), cg_cnt_(0),
micro_index_clustered_(false), tablet_transfer_seq_(ObStorageObjectOpt::INVALID_TABLET_TRANSFER_SEQ)
micro_index_clustered_(false), tablet_transfer_seq_(ObStorageObjectOpt::INVALID_TABLET_TRANSFER_SEQ), is_no_logging_(false)
{
column_items_.set_attr(ObMemAttr(MTL_ID(), "DL_schema"));
lob_column_idxs_.set_attr(ObMemAttr(MTL_ID(), "DL_schema"));
@ -1262,6 +1262,7 @@ ObTabletDirectLoadMgr::~ObTabletDirectLoadMgr()
is_schema_item_ready_ = false;
micro_index_clustered_ = false;
tablet_transfer_seq_ = ObStorageObjectOpt::INVALID_TABLET_TRANSFER_SEQ;
is_no_logging_ = false;
}
bool ObTabletDirectLoadMgr::is_valid()

View File

@ -415,9 +415,10 @@ public:
*/
bool is_originally_column_store_data_direct_load() const { return is_data_direct_load(direct_load_type_) && !need_process_cs_replica_; }
bool get_is_no_logging() {return is_no_logging_;}
VIRTUAL_TO_STRING_KV(K_(is_inited), K_(is_schema_item_ready), K_(ls_id), K_(tablet_id), K_(table_key), K_(data_format_version), K_(ref_cnt),
K_(direct_load_type), K_(need_process_cs_replica), K_(need_fill_column_group),K_(sqc_build_ctx), KPC(lob_mgr_handle_.get_obj()), K_(schema_item), K_(column_items), K_(lob_column_idxs),
K_(task_cnt), K_(cg_cnt), K_(micro_index_clustered), K_(tablet_transfer_seq));
K_(task_cnt), K_(cg_cnt), K_(micro_index_clustered), K_(tablet_transfer_seq), K_(is_no_logging));
protected:
int prepare_schema_item_on_demand(const uint64_t table_id,
@ -474,6 +475,7 @@ protected:
int64_t cg_cnt_;
bool micro_index_clustered_;
int64_t tablet_transfer_seq_;
bool is_no_logging_;
};
class ObTabletFullDirectLoadMgr final : public ObTabletDirectLoadMgr

View File

@ -760,7 +760,9 @@ int ObMacroBlockSliceStore::init(
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory", K(ret));
} else if (OB_FAIL(static_cast<ObDDLRedoLogWriterCallback *>(ddl_redo_callback_)->init(
ls_id, table_key.tablet_id_, DDL_MB_DATA_TYPE, table_key, ddl_task_id, start_scn,
ls_id, table_key.tablet_id_,
tablet_direct_load_mgr->get_is_no_logging() ? DDL_MB_SS_EMPTY_DATA_TYPE : DDL_MB_DATA_TYPE,
table_key, ddl_task_id, start_scn,
data_format_version,
tablet_direct_load_mgr->get_task_cnt(),
tablet_direct_load_mgr->get_cg_cnt(),
@ -1861,7 +1863,9 @@ int ObCOSliceWriter::init(const ObStorageSchema *storage_schema, const int64_t c
LOG_WARN("invalid cg idx", K(ret), K(cg_idx), K(tablet_direct_load_mgr->get_sqc_build_ctx().cg_index_builders_.count()));
} else {
ObSSTableIndexItem &cur_item = tablet_direct_load_mgr->get_sqc_build_ctx().cg_index_builders_.at(cg_idx);
if (OB_FAIL(flush_callback_.init(ls_id, table_key.tablet_id_, DDL_MB_DATA_TYPE, table_key, ddl_task_id,
if (OB_FAIL(flush_callback_.init(ls_id, table_key.tablet_id_,
tablet_direct_load_mgr->get_is_no_logging() ? DDL_MB_SS_EMPTY_DATA_TYPE : DDL_MB_DATA_TYPE,
table_key, ddl_task_id,
start_scn, data_format_version, tablet_direct_load_mgr->get_task_cnt(),
tablet_direct_load_mgr->get_cg_cnt(), tablet_direct_load_mgr->get_direct_load_type(), row_id_offset))) {
LOG_WARN("fail to init redo log writer callback", KR(ret));

View File

@ -238,13 +238,14 @@ struct ObDirectInsertCommonParam final
{
public:
ObDirectInsertCommonParam()
: ls_id_(), tablet_id_(), direct_load_type_(DIRECT_LOAD_INVALID), data_format_version_(0), read_snapshot_(0)
: ls_id_(), tablet_id_(), direct_load_type_(DIRECT_LOAD_INVALID), data_format_version_(0), read_snapshot_(0), replay_normal_in_cs_replica_(false), is_no_logging_(false)
{}
~ObDirectInsertCommonParam() = default;
bool is_valid() const { return ls_id_.is_valid() && tablet_id_.is_valid()
&& data_format_version_ >= 0 && read_snapshot_ >= 0 && DIRECT_LOAD_INVALID <= direct_load_type_ && direct_load_type_ <= DIRECT_LOAD_MAX;
}
TO_STRING_KV(K_(ls_id), K_(tablet_id), K_(direct_load_type), K_(data_format_version), K_(read_snapshot));
TO_STRING_KV(K_(ls_id), K_(tablet_id), K_(direct_load_type), K_(data_format_version), K_(read_snapshot), K_(replay_normal_in_cs_replica), K_(is_no_logging));
public:
share::ObLSID ls_id_;
common::ObTabletID tablet_id_;
@ -253,6 +254,8 @@ public:
// read_snapshot_ is used to scan the source data.
// For full direct load task, it is also the commit version of the target macro block.
int64_t read_snapshot_;
bool replay_normal_in_cs_replica_; // when ddl and add cs replica are concurrent, leader may write normal clog
bool is_no_logging_;
};
// only used in runtime execution

View File

@ -1201,6 +1201,8 @@ int ObDDLKV::set_macro_block(
}
}
if (OB_FAIL(ret)) {
} else if (DDL_MB_SS_EMPTY_DATA_TYPE == macro_block.block_type_) {
/* skip, emtpy type do not have data macro*/
} else if (OB_FAIL(ddl_memtable->insert_block_meta_tree(macro_block.block_handle_, data_macro_meta, macro_block.end_row_id_))) {
LOG_WARN("insert block meta tree faield", K(ret));
} else {

View File

@ -55,7 +55,8 @@ ObDirectLoadInsertTableParam::ObDirectLoadInsertTableParam()
datum_utils_(nullptr),
col_descs_(nullptr),
cmp_funcs_(nullptr),
online_sample_percent_(1.)
online_sample_percent_(1.),
is_no_logging_(false)
{
}
@ -207,6 +208,7 @@ int ObDirectLoadInsertTabletContext::create_tablet_direct_load()
direct_load_param.common_param_.read_snapshot_ = param_->snapshot_version_;
direct_load_param.common_param_.ls_id_ = ls_id_;
direct_load_param.common_param_.tablet_id_ = tablet_id_;
direct_load_param.common_param_.is_no_logging_ = param_->is_no_logging_;
direct_load_param.runtime_only_param_.exec_ctx_ = nullptr;
direct_load_param.runtime_only_param_.task_id_ = param_->ddl_task_id_;
direct_load_param.runtime_only_param_.table_id_ = param_->table_id_;

View File

@ -59,7 +59,8 @@ public:
KP_(datum_utils),
KP_(col_descs),
KP_(cmp_funcs),
K_(online_sample_percent));
K_(online_sample_percent),
K_(is_no_logging));
public:
uint64_t table_id_; // dest_table_id
@ -82,6 +83,7 @@ public:
const common::ObIArray<share::schema::ObColDesc> *col_descs_;
const blocksstable::ObStoreCmpFuncs *cmp_funcs_;
double online_sample_percent_;
bool is_no_logging_;
};
struct ObDirectLoadInsertTabletWriteCtx

View File

@ -424,6 +424,7 @@ _min_malloc_sample_interval
_multimodel_memory_trace_level
_mvcc_gc_using_min_txn_snapshot
_nested_loop_join_enabled
_no_logging
_object_storage_io_timeout
_obkv_feature_mode
_ob_ddl_temp_file_compress_func

View File

@ -18,6 +18,7 @@
#include "share/backup/ob_tenant_archive_round.h"
#include "share/backup/ob_archive_checkpoint.h"
#include "share/backup/ob_tenant_archive_mgr.h"
#include "src/observer/omt/ob_tenant_config_mgr.h"
using namespace oceanbase;
using namespace common;
@ -28,7 +29,7 @@ class ArchiveCheckpointerTest : public testing::Test
public:
ArchiveCheckpointerTest() {}
virtual ~ArchiveCheckpointerTest(){}
virtual void SetUp() {};
virtual void SetUp();
virtual void TearDown() {}
virtual void TestBody() {}
@ -454,6 +455,10 @@ TEST_F(ArchiveCheckpointerTest, in_prepare)
ASSERT_EQ(g_call_cnt, 0);
}
void ArchiveCheckpointerTest::SetUp()
{
ASSERT_EQ(OB_SUCCESS, omt::ObTenantConfigMgr::get_instance().add_tenant_config(TENANT_ID));
}
TEST_F(ArchiveCheckpointerTest, in_beginning_01)
{
@ -3500,6 +3505,388 @@ TEST_F(ArchiveCheckpointerTest, some_ls_interrupt_01)
ASSERT_EQ(g_call_cnt, 5);
}
TEST_F(ArchiveCheckpointerTest, test_no_logging_beginning)
{
// set tenant config as no logging first
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(TENANT_ID));
tenant_config->_no_logging = true;
// old round's status is BEGINNING.
ObTenantArchiveRoundAttr old_round;
fill_beginning_round(old_round, "2022-01-01 00:00:30");
// 2 log streams are archiving.
ObDestRoundSummary summary;
// log stream 1001 is archiving.
ObLSDestRoundSummary ls_1001;
ObArchiveLSPieceSummary piece_1001_1;
fill_archive_ls_piece(
1001,
false,
1,
ObArchiveRoundState::doing(),
"2022-01-01 00:00:30",
"2022-01-01 00:00:50",
0,
2000,
200,
20,
piece_1001_1);
ASSERT_EQ(ls_1001.add_one_piece(piece_1001_1), OB_SUCCESS);
// log stream 1002 is archiving.
ObLSDestRoundSummary ls_1002;
ObArchiveLSPieceSummary piece_1002_1;
fill_archive_ls_piece(
1002,
false,
1,
ObArchiveRoundState::doing(),
"2022-01-01 00:00:30",
"2022-01-01 00:00:40",
0,
1000,
100,
10,
piece_1002_1);
ASSERT_EQ(ls_1002.add_one_piece(piece_1002_1), OB_SUCCESS);
ASSERT_EQ(summary.add_ls_dest_round_summary(ls_1001), OB_SUCCESS);
ASSERT_EQ(summary.add_ls_dest_round_summary(ls_1002), OB_SUCCESS);
// All log streams are archiving, status turn to DOING.
class MockRoundHandler final: public ObArchiveRoundHandler
{
public:
int checkpoint_to(
const ObTenantArchiveRoundAttr &old_round,
const ObTenantArchiveRoundAttr &new_round,
const common::ObIArray<ObTenantArchivePieceAttr> &pieces) override
{
int ret = OB_SUCCESS;
g_call_cnt++;
ArchiveCheckpointerTest test;
ObTenantArchiveRoundAttr expect_round;
test.fill_new_round(
old_round,
ObArchiveRoundState::interrupted(),
"2022-01-01 00:00:30",
"2022-01-01 00:00:50",
1,
0,
0,
300,
30,
0,
0,
expect_round);
ObTenantArchivePieceAttr expect_piece;
test.fill_piece(
old_round,
1,
"2022-01-01 00:00:30",
"2022-01-01 00:00:50",
300,
30,
ObArchivePieceStatus::active(),
ObBackupFileStatus::STATUS::BACKUP_FILE_INCOMPLETE,
expect_piece);
ret = test.compare_two_rounds(new_round, expect_round);
if (OB_SUCC(ret)) {
if (pieces.count() != 1) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid pieces count", K(ret), K(pieces));
} else {
const ObTenantArchivePieceAttr &piece = pieces.at(0);
ret = test.compare_two_pieces(piece, expect_piece);
}
}
return ret;
}
};
ObDestRoundCheckpointer::PieceGeneratedCb gen_piece_cb =
[](common::ObISQLClient *proxy, const ObTenantArchiveRoundAttr &old_round, const ObDestRoundCheckpointer::Result &result, const ObDestRoundCheckpointer::GeneratedPiece &piece)
{
int ret = OB_SUCCESS;
g_call_cnt++;
ArchiveCheckpointerTest test;
ObDestRoundCheckpointer::GeneratedPiece expect_piece;
test.fill_piece(
old_round,
1,
"2022-01-01 00:00:30",
"2022-01-01 00:00:50",
300,
30,
ObArchivePieceStatus::active(),
ObBackupFileStatus::STATUS::BACKUP_FILE_INCOMPLETE,
expect_piece.piece_info_);
ObDestRoundCheckpointer::GeneratedLSPiece ls_piece_1001 = test.gen_checkpoint_ls_piece(
1001,
"2022-01-01 00:00:30",
"2022-01-01 00:00:50",
0,
2000,
200,
20);
ObDestRoundCheckpointer::GeneratedLSPiece ls_piece_1002 = test.gen_checkpoint_ls_piece(
1002,
"2022-01-01 00:00:30",
"2022-01-01 00:00:40",
0,
1000,
100,
10);
expect_piece.ls_piece_list_.push_back(ls_piece_1001);
expect_piece.ls_piece_list_.push_back(ls_piece_1002);
ret = test.compare_two_checkpoint_pieces(piece, expect_piece);
return ret;
};
ObDestRoundCheckpointer::RoundCheckpointCb round_cb =
[](common::ObISQLClient *proxy, const ObTenantArchiveRoundAttr &old_round, const ObTenantArchiveRoundAttr &new_round)
{
int ret = OB_SUCCESS;
g_call_cnt++;
ArchiveCheckpointerTest test;
ObTenantArchiveRoundAttr expect_round;
test.fill_new_round(
old_round,
ObArchiveRoundState::interrupted(),
"2022-01-01 00:00:30",
"2022-01-01 00:00:50",
1,
0,
0,
300,
30,
0,
0,
expect_round);
ret = test.compare_two_rounds(new_round, expect_round);
return ret;
};
int ret = OB_SUCCESS;
g_call_cnt = 0;
MockRoundHandler mock_handler;
ObDestRoundCheckpointer checkpointer;
share::SCN limit_scn;
(void)limit_scn.convert_for_logservice(convert_timestr_2_scn("2022-01-01 00:00:45"));
ret = checkpointer.init(&mock_handler, gen_piece_cb, round_cb, limit_scn);
ASSERT_EQ(OB_SUCCESS, ret);
ret = checkpointer.checkpoint(old_round, summary);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(g_call_cnt, 3);
// reset no logging param as false
tenant_config->_no_logging = false;
}
TEST_F(ArchiveCheckpointerTest, test_no_logging_doing)
{
// set tenant config as no logging first
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(TENANT_ID));
tenant_config->_no_logging = true;
// old round's status is DOING
ObTenantArchiveRoundAttr old_round;
fill_round(
ObArchiveRoundState::doing(),
"2022-01-01 00:00:30",
"2022-01-01 00:00:35",
"2022-01-01 00:00:40",
1,
0,
0,
100,
10,
0,
0,
old_round);
// 2 log streams are archiving.
ObDestRoundSummary summary;
// log stream 1001 is archiving.
ObLSDestRoundSummary ls_1001;
ObArchiveLSPieceSummary piece_1001_1;
fill_archive_ls_piece(
1001,
false,
1,
ObArchiveRoundState::doing(),
"2022-01-01 00:00:30",
"2022-01-01 00:00:50",
0,
2000,
200,
20,
piece_1001_1);
ASSERT_EQ(ls_1001.add_one_piece(piece_1001_1), OB_SUCCESS);
// log stream 1002 is archiving.
ObLSDestRoundSummary ls_1002;
ObArchiveLSPieceSummary piece_1002_1;
fill_archive_ls_piece(
1002,
false,
1,
ObArchiveRoundState::doing(),
"2022-01-01 00:00:30",
"2022-01-01 00:00:40",
0,
1000,
100,
10,
piece_1002_1);
ASSERT_EQ(ls_1002.add_one_piece(piece_1002_1), OB_SUCCESS);
ASSERT_EQ(summary.add_ls_dest_round_summary(ls_1001), OB_SUCCESS);
ASSERT_EQ(summary.add_ls_dest_round_summary(ls_1002), OB_SUCCESS);
// All log streams are archiving, the next status is DOING.
class MockRoundHandler final: public ObArchiveRoundHandler
{
public:
int checkpoint_to(
const ObTenantArchiveRoundAttr &old_round,
const ObTenantArchiveRoundAttr &new_round,
const common::ObIArray<ObTenantArchivePieceAttr> &pieces) override
{
int ret = OB_SUCCESS;
g_call_cnt++;
ArchiveCheckpointerTest test;
ObTenantArchiveRoundAttr expect_round;
test.fill_new_round(
old_round,
ObArchiveRoundState::interrupted(),
"2022-01-01 00:00:35",
"2022-01-01 00:00:50",
1,
0,
0,
300,
30,
0,
0,
expect_round);
ObTenantArchivePieceAttr expect_piece;
test.fill_piece(
old_round,
1,
"2022-01-01 00:00:35",
"2022-01-01 00:00:50",
300,
30,
ObArchivePieceStatus::active(),
ObBackupFileStatus::STATUS::BACKUP_FILE_AVAILABLE,
expect_piece);
ret = test.compare_two_rounds(new_round, expect_round);
if (OB_SUCC(ret)) {
if (pieces.count() != 1) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid pieces count", K(ret), K(pieces));
} else {
const ObTenantArchivePieceAttr &piece = pieces.at(0);
ret = test.compare_two_pieces(piece, expect_piece);
}
}
return ret;
}
};
ObDestRoundCheckpointer::PieceGeneratedCb gen_piece_cb =
[](common::ObISQLClient *proxy, const ObTenantArchiveRoundAttr &old_round, const ObDestRoundCheckpointer::Result &result, const ObDestRoundCheckpointer::GeneratedPiece &piece)
{
int ret = OB_SUCCESS;
g_call_cnt++;
ArchiveCheckpointerTest test;
ObDestRoundCheckpointer::GeneratedPiece expect_piece;
test.fill_piece(
old_round,
1,
"2022-01-01 00:00:35",
"2022-01-01 00:00:50",
300,
30,
ObArchivePieceStatus::active(),
ObBackupFileStatus::STATUS::BACKUP_FILE_AVAILABLE,
expect_piece.piece_info_);
ObDestRoundCheckpointer::GeneratedLSPiece ls_piece_1001 = test.gen_checkpoint_ls_piece(
1001,
"2022-01-01 00:00:30",
"2022-01-01 00:00:50",
0,
2000,
200,
20);
ObDestRoundCheckpointer::GeneratedLSPiece ls_piece_1002 = test.gen_checkpoint_ls_piece(
1002,
"2022-01-01 00:00:30",
"2022-01-01 00:00:40",
0,
1000,
100,
10);
expect_piece.ls_piece_list_.push_back(ls_piece_1001);
expect_piece.ls_piece_list_.push_back(ls_piece_1002);
ret = test.compare_two_checkpoint_pieces(piece, expect_piece);
return ret;
};
ObDestRoundCheckpointer::RoundCheckpointCb round_cb =
[](common::ObISQLClient *proxy, const ObTenantArchiveRoundAttr &old_round, const ObTenantArchiveRoundAttr &new_round)
{
int ret = OB_SUCCESS;
g_call_cnt++;
ArchiveCheckpointerTest test;
ObTenantArchiveRoundAttr expect_round;
test.fill_new_round(
old_round,
ObArchiveRoundState::interrupted(),
"2022-01-01 00:00:35",
"2022-01-01 00:00:50",
1,
0,
0,
300,
30,
0,
0,
expect_round);
ret = test.compare_two_rounds(new_round, expect_round);
return ret;
};
int ret = OB_SUCCESS;
g_call_cnt = 0;
MockRoundHandler mock_handler;
ObDestRoundCheckpointer checkpointer;
share::SCN limit_scn;
(void)limit_scn.convert_for_logservice(convert_timestr_2_scn("2022-01-01 00:00:45"));
ret = checkpointer.init(&mock_handler, gen_piece_cb, round_cb, limit_scn);
ASSERT_EQ(OB_SUCCESS, ret);
ret = checkpointer.checkpoint(old_round, summary);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(g_call_cnt, 3);
// reset no logging param as false
tenant_config->_no_logging = false;
}
int main(int argc, char **argv)
{
system("rm -f test_archive_checkpoint.log*");