use DAG to batch force freeze tablets

This commit is contained in:
Fengjingkun
2024-03-05 06:50:08 +00:00
committed by ob-robot
parent a3cc513bfe
commit b83f4b2a89
9 changed files with 342 additions and 10 deletions

View File

@ -34,6 +34,7 @@
#include "storage/meta_mem/ob_tenant_meta_mem_mgr.h"
#include "storage/compaction/ob_basic_tablet_merge_ctx.h"
#include "storage/compaction/ob_tenant_compaction_progress.h"
#include "storage/tx_storage/ob_tenant_freezer.h"
namespace oceanbase
{
@ -1187,5 +1188,232 @@ void prepare_allocator(
}
/*
* ----------------------------------------ObBatchFreezeTabletsParam--------------------------------------------
*/
ObBatchFreezeTabletsParam::ObBatchFreezeTabletsParam()
: ls_id_(),
compaction_scn_(),
tablet_ids_()
{
tablet_ids_.set_attr(lib::ObMemAttr(MTL_ID(), "BtFrzTbls", ObCtxIds::MERGE_NORMAL_CTX_ID));
}
int ObBatchFreezeTabletsParam::assign(
const ObBatchFreezeTabletsParam &other)
{
int ret = OB_SUCCESS;
if (this == &other) {
// do nothing
} else if (OB_FAIL(tablet_ids_.assign(other.tablet_ids_))) {
LOG_WARN("failed to copy tablet ids", K(ret));
} else {
ls_id_ = other.ls_id_;
compaction_scn_ = other.compaction_scn_;
}
return ret;
}
int64_t ObBatchFreezeTabletsParam::get_hash() const
{
int64_t hash_val = 0;
hash_val = common::murmurhash(&ls_id_, sizeof(ls_id_), hash_val);
hash_val = common::murmurhash(&compaction_scn_, sizeof(compaction_scn_), hash_val);
return hash_val;
}
ObBatchFreezeTabletsDag::ObBatchFreezeTabletsDag()
: ObIDag(share::ObDagType::DAG_TYPE_BATCH_FREEZE_TABLETS),
is_inited_(false),
param_()
{
}
ObBatchFreezeTabletsDag::~ObBatchFreezeTabletsDag()
{
}
int ObBatchFreezeTabletsDag::init_by_param(
const share::ObIDagInitParam *param)
{
int ret = OB_SUCCESS;
const ObBatchFreezeTabletsParam *init_param = nullptr;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObBatchFreezeTabletsDag has been inited", K(ret), KPC(this));
} else if (FALSE_IT(init_param = static_cast<const ObBatchFreezeTabletsParam *>(param))) {
} else if (OB_UNLIKELY(nullptr == init_param || !init_param->is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid arguments", K(ret), KPC(init_param));
} else if (OB_FAIL(param_.assign(*init_param))) {
LOG_WARN("failed to init param", K(ret), KPC(init_param));
} else {
is_inited_ = true;
}
return ret;
}
int ObBatchFreezeTabletsDag::create_first_task()
{
int ret = OB_SUCCESS;
ObBatchFreezeTabletsTask *task = nullptr;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("BatchFreezeTabletsDag has not inited", K(ret));
} else if (OB_FAIL(create_task(nullptr/*parent*/, task))) {
LOG_WARN("failed to create batch tablet sstable task", K(ret));
}
return ret;
}
bool ObBatchFreezeTabletsDag::operator == (const ObIDag &other) const
{
bool is_same = true;
if (this == &other) {
// same
} else if (get_type() != other.get_type()) {
is_same = false;
} else if (param_.ls_id_ != static_cast<const ObBatchFreezeTabletsDag &>(other).param_.ls_id_) {
is_same = false;
} else if (param_.compaction_scn_ != static_cast<const ObBatchFreezeTabletsDag &>(other).param_.compaction_scn_) {
is_same = false;
}
return is_same;
}
int64_t ObBatchFreezeTabletsDag::hash() const
{
return param_.get_hash();
}
int ObBatchFreezeTabletsDag::fill_info_param(
compaction::ObIBasicInfoParam *&out_param,
ObIAllocator &allocator) const
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObBatchFreezeTabletsDag not inited", K(ret));
} else if (OB_FAIL(ADD_DAG_WARN_INFO_PARAM(out_param,
allocator,
get_type(),
param_.ls_id_.id(),
param_.compaction_scn_,
param_.tablet_ids_.count()))) {
LOG_WARN("failed to fill info param", K(ret), K(param_));
}
return ret;
}
int ObBatchFreezeTabletsDag::fill_dag_key(char *buf, const int64_t buf_len) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(databuff_printf(buf, buf_len, "ls_id=%ld compaction_scn=%ld freeze_tablet_cnt=%ld",
param_.ls_id_.id(), param_.compaction_scn_, param_.tablet_ids_.count()))) {
LOG_WARN("failed to fill dag key", K(ret), K(param_));
}
return ret;
}
ObBatchFreezeTabletsTask::ObBatchFreezeTabletsTask()
: ObITask(ObITask::TASK_TYPE_BATCH_FREEZE_TABLETS),
is_inited_(false),
base_dag_(nullptr)
{
}
ObBatchFreezeTabletsTask::~ObBatchFreezeTabletsTask()
{
}
int ObBatchFreezeTabletsTask::init()
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObBatchFreezeTabletsTask init twice", K(ret));
} else if (OB_ISNULL(dag_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null dag", K(ret));
} else if (OB_UNLIKELY(ObDagType::ObDagTypeEnum::DAG_TYPE_BATCH_FREEZE_TABLETS != dag_->get_type())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected dag type", K(ret));
} else if (FALSE_IT(base_dag_ = static_cast<ObBatchFreezeTabletsDag *>(dag_))) {
} else if (OB_UNLIKELY(!base_dag_->get_param().is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected not valid param", K(ret), K(base_dag_->get_param()));
} else {
is_inited_ = true;
}
return ret;
}
int ObBatchFreezeTabletsTask::process()
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
const ObBatchFreezeTabletsParam &param = base_dag_->get_param();
ObLSHandle ls_handle;
ObLS *ls_ptr = nullptr;
if (OB_TMP_FAIL(MTL(ObLSService *)->get_ls(param.ls_id_, ls_handle, ObLSGetMod::STORAGE_MOD))) {
LOG_WARN_RET(tmp_ret, "failed to get log stream", K(param));
} else {
ls_ptr = ls_handle.get_ls();
}
int64_t fail_freeze_cnt = 0;
int64_t succ_schedule_cnt = 0;
for (int64_t i = 0; OB_SUCC(ret) && i < param.tablet_ids_.count(); ++i) {
const ObTabletID &tablet_id = param.tablet_ids_.at(i);
ObTabletHandle tablet_handle;
ObTablet *tablet = nullptr;
if (OB_TMP_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(tablet_id, true/*need_rewrite*/, true/*is_sync*/))) {
LOG_WARN_RET(tmp_ret, "failed to force freeze tablet", K(param), K(tablet_id));
++fail_freeze_cnt;
} else if (OB_ISNULL(ls_ptr)) {
// do nothing
} else if (OB_TMP_FAIL(ls_ptr->get_tablet_svr()->get_tablet(tablet_id,
tablet_handle,
0/*timeout_us*/,
storage::ObMDSGetTabletMode::READ_ALL_COMMITED))) {
LOG_WARN_RET(tmp_ret, "failed to get tablet", K(param), K(tablet_id));
} else if (FALSE_IT(tablet = tablet_handle.get_obj())) {
} else if (OB_UNLIKELY(tablet->get_snapshot_version() < param.compaction_scn_)) {
// do nothing
} else if (OB_TMP_FAIL(compaction::ObTenantTabletScheduler::schedule_merge_dag(param.ls_id_,
*tablet,
MEDIUM_MERGE,
param.compaction_scn_))) {
if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) {
ret = tmp_ret;
LOG_WARN_RET(tmp_ret, "failed to schedule medium merge dag", K(param), K(tablet_id));
}
} else {
++succ_schedule_cnt;
}
if (OB_FAIL(share::dag_yield())) {
LOG_WARN("failed to dag yield", K(ret));
}
}
if (OB_UNLIKELY(fail_freeze_cnt * 2 > param.tablet_ids_.count())) {
ret = OB_PARTIAL_FAILED;
}
FLOG_INFO("batch freeze tablets finished", KR(ret), K(param), K(fail_freeze_cnt), KP(ls_ptr), K(succ_schedule_cnt));
return ret;
}
} // namespace compaction
} // namespace oceanbase

View File

@ -352,6 +352,71 @@ public:
virtual ~ObTabletMiniMergeDag();
};
struct ObBatchFreezeTabletsParam : public share::ObIDagInitParam
{
public:
ObBatchFreezeTabletsParam();
virtual ~ObBatchFreezeTabletsParam() { tablet_ids_.reset(); }
virtual bool is_valid() const override { return ls_id_.is_valid() && compaction_scn_ > 0 && tablet_ids_.count() > 0; }
int assign(const ObBatchFreezeTabletsParam &other);
bool operator == (const ObBatchFreezeTabletsParam &other) const;
bool operator != (const ObBatchFreezeTabletsParam &other) const { return !this->operator==(other); }
int64_t get_hash() const;
VIRTUAL_TO_STRING_KV(K_(ls_id), K_(compaction_scn), "tablet_count", tablet_ids_.count(), K_(tablet_ids));
public:
static constexpr int64_t DEFAULT_BATCH_SIZE = 16;
share::ObLSID ls_id_;
int64_t compaction_scn_;
common::ObSEArray<common::ObTabletID, DEFAULT_BATCH_SIZE> tablet_ids_;
};
class ObBatchFreezeTabletsDag : public share::ObIDag
{
public:
ObBatchFreezeTabletsDag();
virtual ~ObBatchFreezeTabletsDag();
int init_by_param(const share::ObIDagInitParam *param);
virtual int create_first_task() override;
virtual bool operator == (const ObIDag &other) const override;
virtual int64_t hash() const override;
virtual int fill_info_param(
compaction::ObIBasicInfoParam *&out_param,
ObIAllocator &allocator) const override;
virtual int fill_dag_key(char *buf, const int64_t buf_len) const override;
virtual lib::Worker::CompatMode get_compat_mode() const override { return lib::Worker::CompatMode::MYSQL; }
virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; }
const ObBatchFreezeTabletsParam &get_param() const { return param_; }
virtual bool ignore_warning() override
{
return OB_PARTIAL_FAILED != dag_ret_;
}
INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(is_inited), K_(param));
private:
bool is_inited_;
ObBatchFreezeTabletsParam param_;
private:
DISALLOW_COPY_AND_ASSIGN(ObBatchFreezeTabletsDag);
};
class ObBatchFreezeTabletsTask : public share::ObITask
{
public:
ObBatchFreezeTabletsTask();
virtual ~ObBatchFreezeTabletsTask();
int init();
virtual int process() override;
private:
bool is_inited_;
ObBatchFreezeTabletsDag *base_dag_;
private:
DISALLOW_COPY_AND_ASSIGN(ObBatchFreezeTabletsTask);
};
} // namespace compaction
} // namespace oceanbase

View File

@ -1603,11 +1603,10 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
ls_time_guard.add_time_guard(tablet_time_guard);
} // end of while
// TODO(@chengkong): submit a async task
FOREACH(need_freeze_tablet_id, need_freeze_tablets) {
if (OB_TMP_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(*need_freeze_tablet_id, true/*force_freeze*/, true/*is_sync*/))) {
LOG_WARN("failed to force freeze tablet", KR(tmp_ret), K(ls_id), K(*need_freeze_tablet_id));
}
if (OB_FAIL(ret) || need_freeze_tablets.empty()) {
} else if (OB_TMP_FAIL(schedule_batch_freeze_dag(merge_version, ls_id, need_freeze_tablets))) {
LOG_WARN("failed to schedule batch force freeze tablets dag", K(tmp_ret), K(ls_id),
"tablet_count", need_freeze_tablets.count());
}
ls_time_guard.click(ObCompactionScheduleTimeGuard::FAST_FREEZE);
@ -2048,5 +2047,33 @@ void ObTenantTabletScheduler::report_blocking_medium(
}
}
int ObTenantTabletScheduler::schedule_batch_freeze_dag(
const int64_t merge_version,
const share::ObLSID &ls_id,
const common::ObIArray<ObTabletID> &tablet_ids)
{
int ret = OB_SUCCESS;
ObBatchFreezeTabletsParam param;
if (OB_UNLIKELY(!ls_id.is_valid() || tablet_ids.empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid arguments", K(ret), K(ls_id), K(tablet_ids));
} else if (FALSE_IT(param.ls_id_ = ls_id)) {
} else if (FALSE_IT(param.compaction_scn_ = merge_version)) {
} else if (OB_FAIL(param.tablet_ids_.assign(tablet_ids))) {
LOG_WARN("failed to assign tablet ids", K(ret));
} else if (OB_FAIL(MTL(ObTenantDagScheduler *)->create_and_add_dag<ObBatchFreezeTabletsDag>(&param, true/*is_emergency*/))) {
if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) {
LOG_WARN("failed to create merge dag", K(ret), K(param));
} else if (OB_EAGAIN == ret) {
LOG_WARN("curr ls exists batch freeze dag, wait the dag to finish", K(ret), K(ls_id));
}
} else {
LOG_INFO("Succ to create tablet batch freeze dag", K(ret), K(param));
}
return ret;
}
} // namespace storage
} // namespace oceanbase

View File

@ -294,6 +294,10 @@ private:
const bool &tablet_could_schedule_medium,
const bool &could_major_merge,
const share::ObLSID &ls_id);
int schedule_batch_freeze_dag(
const int64_t merge_version,
const share::ObLSID &ls_id,
const common::ObIArray<ObTabletID> &tablet_ids);
public:
static const int64_t INIT_COMPACTION_SCN = 1;
typedef common::ObSEArray<ObGetMergeTablesResult, compaction::ObPartitionMergePolicy::OB_MINOR_PARALLEL_INFO_ARRAY_SIZE> MinorParallelResultArray;