fix ddl hold snapshot repeatedly.
This commit is contained in:
parent
27d3dcfc30
commit
0d1e89d206
@ -633,7 +633,9 @@ int ObConstraintTask::init(const ObDDLTaskRecord &task_record)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObConstraintTask::hold_snapshot(const int64_t snapshot_version)
|
||||
int ObConstraintTask::hold_snapshot(
|
||||
common::ObMySQLTransaction &trans,
|
||||
const int64_t snapshot_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObDDLService &ddl_service = root_service_->get_ddl_service();
|
||||
@ -668,7 +670,7 @@ int ObConstraintTask::hold_snapshot(const int64_t snapshot_version)
|
||||
OB_FAIL(ObDDLUtil::get_tablets(tenant_id_, table_schema->get_aux_lob_piece_tid(), tablet_ids))) {
|
||||
LOG_WARN("failed to get data lob piece table snapshot", K(ret));
|
||||
} else if (OB_FAIL(ddl_service.get_snapshot_mgr().batch_acquire_snapshot(
|
||||
ddl_service.get_sql_proxy(), SNAPSHOT_FOR_DDL, tenant_id_, schema_version_, snapshot_scn, nullptr, tablet_ids))) {
|
||||
trans, SNAPSHOT_FOR_DDL, tenant_id_, schema_version_, snapshot_scn, nullptr, tablet_ids))) {
|
||||
LOG_WARN("acquire snapshot failed", K(ret), K(tablet_ids));
|
||||
} else {
|
||||
snapshot_version_ = snapshot_version;
|
||||
@ -742,14 +744,16 @@ int ObConstraintTask::wait_trans_end()
|
||||
DEBUG_SYNC(CONSTRAINT_WAIT_TRANS_END);
|
||||
|
||||
if (OB_SUCC(ret) && new_status != CHECK_CONSTRAINT_VALID && snapshot_version_ > 0 && !snapshot_held_) {
|
||||
if (OB_FAIL(ObDDLTaskRecordOperator::update_snapshot_version(root_service_->get_sql_proxy(),
|
||||
ObMySQLTransaction trans;
|
||||
if (OB_FAIL(trans.start(&root_service_->get_sql_proxy(), tenant_id_))) {
|
||||
LOG_WARN("fail to start trans", K(ret), K(tenant_id_));
|
||||
} else if (OB_FAIL(ObDDLTaskRecordOperator::update_snapshot_version(trans,
|
||||
tenant_id_,
|
||||
task_id_,
|
||||
snapshot_version_))) {
|
||||
LOG_WARN("update snapshot version failed", K(ret), K(task_id_));
|
||||
} else if (OB_FAIL(hold_snapshot(snapshot_version_))) {
|
||||
} else if (OB_FAIL(hold_snapshot(trans, snapshot_version_))) {
|
||||
if (OB_SNAPSHOT_DISCARDED == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
snapshot_held_ = false;
|
||||
snapshot_version_ = 0;
|
||||
wait_trans_ctx_.reset();
|
||||
@ -760,6 +764,15 @@ int ObConstraintTask::wait_trans_end()
|
||||
new_status = CHECK_CONSTRAINT_VALID;
|
||||
snapshot_held_ = true;
|
||||
}
|
||||
if (trans.is_started()) {
|
||||
bool need_commit = (ret == OB_SUCCESS);
|
||||
int tmp_ret = trans.end(need_commit);
|
||||
if (OB_SUCCESS != tmp_ret) {
|
||||
LOG_WARN("fail to end trans", K(ret), K(tmp_ret), K(need_commit));
|
||||
}
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
}
|
||||
ret = OB_SNAPSHOT_DISCARDED == ret ? OB_SUCCESS : ret;
|
||||
}
|
||||
|
||||
if (OB_FAIL(switch_status(new_status, true, ret))) {
|
||||
|
@ -109,7 +109,7 @@ public:
|
||||
virtual int deserialize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) override;
|
||||
virtual int64_t get_serialize_param_size() const override;
|
||||
private:
|
||||
int hold_snapshot(const int64_t snapshot_version);
|
||||
int hold_snapshot(common::ObMySQLTransaction &trans, const int64_t snapshot_version);
|
||||
int release_snapshot(const int64_t snapshot_version);
|
||||
int wait_trans_end();
|
||||
int validate_constraint_valid();
|
||||
|
@ -410,7 +410,7 @@ int ObDDLRedefinitionTask::hold_snapshot_for_major_refresh_mv_(const int64_t sna
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
ObDDLService &ddl_service = root_service->get_ddl_service();
|
||||
if (OB_FAIL(ddl_service.get_snapshot_mgr().batch_acquire_snapshot(
|
||||
if (OB_FAIL(ddl_service.get_snapshot_mgr().batch_acquire_snapshot_in_trans(
|
||||
ddl_service.get_sql_proxy(), SNAPSHOT_FOR_MAJOR_REFRESH_MV, tenant_id_,
|
||||
schema_version_, snapshot_scn, nullptr, tablet_ids))) {
|
||||
LOG_WARN("batch acquire snapshot failed", K(ret), K(tablet_ids));
|
||||
|
@ -678,14 +678,16 @@ int ObIndexBuildTask::wait_trans_end()
|
||||
|
||||
// persistent snapshot_version into inner table and hold snapshot of data_table and index table
|
||||
if (OB_SUCC(ret) && !state_finished && snapshot_version_ > 0 && !snapshot_held_) {
|
||||
if (OB_FAIL(ObDDLTaskRecordOperator::update_snapshot_version(root_service_->get_sql_proxy(),
|
||||
ObMySQLTransaction trans;
|
||||
if (OB_FAIL(trans.start(&root_service_->get_sql_proxy(), tenant_id_))) {
|
||||
LOG_WARN("fail to start trans", K(ret), K(tenant_id_));
|
||||
} else if (OB_FAIL(ObDDLTaskRecordOperator::update_snapshot_version(trans,
|
||||
tenant_id_,
|
||||
task_id_,
|
||||
snapshot_version_))) {
|
||||
LOG_WARN("update snapshot version failed", K(ret), K(task_id_), K(snapshot_version_));
|
||||
} else if (OB_FAIL(hold_snapshot(snapshot_version_))) {
|
||||
} else if (OB_FAIL(hold_snapshot(trans, snapshot_version_))) {
|
||||
if (OB_SNAPSHOT_DISCARDED == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
snapshot_version_ = 0;
|
||||
snapshot_held_ = false;
|
||||
LOG_INFO("snapshot discarded, need retry waiting trans", K(ret));
|
||||
@ -696,6 +698,15 @@ int ObIndexBuildTask::wait_trans_end()
|
||||
snapshot_held_ = true;
|
||||
state_finished = true;
|
||||
}
|
||||
if (trans.is_started()) {
|
||||
bool need_commit = (ret == OB_SUCCESS);
|
||||
int tmp_ret = trans.end(need_commit);
|
||||
if (OB_SUCCESS != tmp_ret) {
|
||||
LOG_WARN("fail to end trans", K(ret), K(tmp_ret), K(need_commit));
|
||||
}
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
}
|
||||
ret = OB_SNAPSHOT_DISCARDED == ret ? OB_SUCCESS : ret;
|
||||
}
|
||||
|
||||
if (state_finished || OB_FAIL(ret)) {
|
||||
@ -708,7 +719,9 @@ int ObIndexBuildTask::wait_trans_end()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObIndexBuildTask::hold_snapshot(const int64_t snapshot)
|
||||
int ObIndexBuildTask::hold_snapshot(
|
||||
common::ObMySQLTransaction &trans,
|
||||
const int64_t snapshot)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
SCN snapshot_scn;
|
||||
@ -752,7 +765,7 @@ int ObIndexBuildTask::hold_snapshot(const int64_t snapshot)
|
||||
OB_FAIL(ObDDLUtil::get_tablets(tenant_id_, data_table_schema->get_aux_lob_piece_tid(), tablet_ids))) {
|
||||
LOG_WARN("failed to get data lob piece table snapshot", K(ret));
|
||||
} else if (OB_FAIL(ddl_service.get_snapshot_mgr().batch_acquire_snapshot(
|
||||
ddl_service.get_sql_proxy(), SNAPSHOT_FOR_DDL, tenant_id_, schema_version_, snapshot_scn, nullptr, tablet_ids))) {
|
||||
trans, SNAPSHOT_FOR_DDL, tenant_id_, schema_version_, snapshot_scn, nullptr, tablet_ids))) {
|
||||
LOG_WARN("batch acquire snapshot failed", K(ret), K(tablet_ids));
|
||||
}
|
||||
}
|
||||
|
@ -137,7 +137,7 @@ private:
|
||||
int clean_on_failed();
|
||||
int succ();
|
||||
virtual int cleanup_impl() override;
|
||||
int hold_snapshot(const int64_t snapshot);
|
||||
int hold_snapshot(common::ObMySQLTransaction &trans, const int64_t snapshot);
|
||||
int release_snapshot(const int64_t snapshot);
|
||||
int update_index_status_in_schema(
|
||||
const share::schema::ObTableSchema &index_schema,
|
||||
|
@ -56,7 +56,7 @@ int ObSnapshotInfoManager::acquire_snapshot(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSnapshotInfoManager::batch_acquire_snapshot(
|
||||
int ObSnapshotInfoManager::batch_acquire_snapshot_in_trans(
|
||||
common::ObMySQLProxy &proxy,
|
||||
share::ObSnapShotType snapshot_type,
|
||||
const uint64_t tenant_id,
|
||||
@ -67,6 +67,33 @@ int ObSnapshotInfoManager::batch_acquire_snapshot(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMySQLTransaction trans;
|
||||
if (OB_FAIL(trans.start(&proxy, tenant_id))) {
|
||||
LOG_WARN("fail to start trans", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(batch_acquire_snapshot(trans, snapshot_type,
|
||||
tenant_id, schema_version, snapshot_scn, comment, tablet_ids))) {
|
||||
LOG_WARN("batch add snapshot failed", K(ret));
|
||||
}
|
||||
if (trans.is_started()) {
|
||||
bool need_commit = (ret == OB_SUCCESS);
|
||||
int tmp_ret = trans.end(need_commit);
|
||||
if (OB_SUCCESS != tmp_ret) {
|
||||
LOG_WARN("fail to end trans", K(tmp_ret), K(need_commit));
|
||||
}
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSnapshotInfoManager::batch_acquire_snapshot(
|
||||
common::ObMySQLTransaction &trans,
|
||||
share::ObSnapShotType snapshot_type,
|
||||
const uint64_t tenant_id,
|
||||
const int64_t schema_version,
|
||||
const SCN &snapshot_scn,
|
||||
const char *comment,
|
||||
const common::ObIArray<ObTabletID> &tablet_ids)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSnapshotTableProxy snapshot_proxy;
|
||||
ObSnapshotInfo snapshot;
|
||||
ObTimeoutCtx timeout_ctx;
|
||||
@ -75,9 +102,9 @@ int ObSnapshotInfoManager::batch_acquire_snapshot(
|
||||
snapshot.snapshot_scn_ = snapshot_scn;
|
||||
snapshot.schema_version_ = schema_version;
|
||||
snapshot.comment_ = comment;
|
||||
if (OB_UNLIKELY(!snapshot.is_valid() || tablet_ids.count() <= 0)) {
|
||||
if (OB_UNLIKELY(!trans.is_started() || !snapshot.is_valid() || tablet_ids.count() <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(tablet_ids.count()));
|
||||
LOG_WARN("invalid argument", K(ret), "trans_started", trans.is_started(), K(snapshot), K(tablet_ids.count()));
|
||||
} else {
|
||||
int64_t rpc_timeout = 0;
|
||||
int64_t trx_timeout = 0;
|
||||
@ -89,20 +116,10 @@ int ObSnapshotInfoManager::batch_acquire_snapshot(
|
||||
LOG_WARN("set trx timeout failed", K(ret), K(trx_timeout));
|
||||
} else if (OB_FAIL(timeout_ctx.set_timeout(rpc_timeout))) {
|
||||
LOG_WARN("set timeout failed", K(ret), K(rpc_timeout));
|
||||
} else if (OB_FAIL(trans.start(&proxy, tenant_id))) {
|
||||
LOG_WARN("fail to start trans", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(snapshot_proxy.batch_add_snapshot(trans, snapshot_type,
|
||||
tenant_id, schema_version, snapshot.snapshot_scn_, comment, tablet_ids))) {
|
||||
LOG_WARN("batch add snapshot failed", K(ret));
|
||||
}
|
||||
if (trans.is_started()) {
|
||||
bool need_commit = (ret == OB_SUCCESS);
|
||||
int tmp_ret = trans.end(need_commit);
|
||||
if (OB_SUCCESS != tmp_ret) {
|
||||
LOG_WARN("fail to end trans", K(tmp_ret), K(need_commit));
|
||||
}
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
}
|
||||
ROOTSERVICE_EVENT_ADD("snapshot", "batch_acquire_snapshot", K(ret), K(snapshot), "rs_addr", self_addr_);
|
||||
}
|
||||
|
||||
|
@ -62,6 +62,14 @@ public:
|
||||
share::ObSnapShotType snapshot_type,
|
||||
int64_t &count);
|
||||
int batch_acquire_snapshot(
|
||||
common::ObMySQLTransaction &trans,
|
||||
share::ObSnapShotType snapshot_type,
|
||||
const uint64_t tenant_id,
|
||||
const int64_t schema_version,
|
||||
const share::SCN &snapshot_scn,
|
||||
const char *comment,
|
||||
const common::ObIArray<ObTabletID> &tablet_ids);
|
||||
int batch_acquire_snapshot_in_trans(
|
||||
common::ObMySQLProxy &proxy,
|
||||
share::ObSnapShotType snapshot_type,
|
||||
const uint64_t tenant_id,
|
||||
|
@ -1435,12 +1435,15 @@ int ObDDLUtil::obtain_snapshot(
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("snapshot version is invalid", K(ret), KPC(wait_trans_ctx));
|
||||
} else if (snapshot_version > 0 && !snapshot_held) {
|
||||
if (OB_FAIL(rootserver::ObDDLTaskRecordOperator::update_snapshot_version(root_service->get_sql_proxy(),
|
||||
ObMySQLTransaction trans;
|
||||
if (OB_FAIL(trans.start(&root_service->get_sql_proxy(), tenant_id))) {
|
||||
LOG_WARN("fail to start trans", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(rootserver::ObDDLTaskRecordOperator::update_snapshot_version(trans,
|
||||
tenant_id,
|
||||
task->get_task_id(),
|
||||
snapshot_version))) {
|
||||
LOG_WARN("update snapshot version failed", K(ret), K(task->get_task_id()), K(tenant_id));
|
||||
} else if (OB_FAIL(hold_snapshot(task, table_id, target_table_id, root_service, snapshot_version))) {
|
||||
} else if (OB_FAIL(hold_snapshot(trans, task, table_id, target_table_id, root_service, snapshot_version))) {
|
||||
if (OB_SNAPSHOT_DISCARDED == ret) {
|
||||
snapshot_version = 0;
|
||||
snapshot_held = false;
|
||||
@ -1451,6 +1454,14 @@ int ObDDLUtil::obtain_snapshot(
|
||||
} else {
|
||||
snapshot_held = true;
|
||||
}
|
||||
if (trans.is_started()) {
|
||||
bool need_commit = (ret == OB_SUCCESS);
|
||||
int tmp_ret = trans.end(need_commit);
|
||||
if (OB_SUCCESS != tmp_ret) {
|
||||
LOG_WARN("fail to end trans", K(ret), K(tmp_ret), K(need_commit));
|
||||
}
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
@ -1475,6 +1486,7 @@ int ObDDLUtil::obtain_snapshot(
|
||||
}
|
||||
|
||||
int ObDDLUtil::hold_snapshot(
|
||||
common::ObMySQLTransaction &trans,
|
||||
rootserver::ObDDLTask* task,
|
||||
const uint64_t table_id,
|
||||
const uint64_t target_table_id,
|
||||
@ -1532,7 +1544,7 @@ int ObDDLUtil::hold_snapshot(
|
||||
} else {
|
||||
rootserver::ObDDLService &ddl_service = root_service->get_ddl_service();
|
||||
if (OB_FAIL(ddl_service.get_snapshot_mgr().batch_acquire_snapshot(
|
||||
ddl_service.get_sql_proxy(), SNAPSHOT_FOR_DDL, tenant_id, schema_version, snapshot_scn, nullptr, tablet_ids))) {
|
||||
trans, SNAPSHOT_FOR_DDL, tenant_id, schema_version, snapshot_scn, nullptr, tablet_ids))) {
|
||||
LOG_WARN("batch acquire snapshot failed", K(ret), K(tablet_ids));
|
||||
}
|
||||
}
|
||||
|
@ -906,6 +906,7 @@ public:
|
||||
bool &all_dag_exit);
|
||||
private:
|
||||
static int hold_snapshot(
|
||||
common::ObMySQLTransaction &trans,
|
||||
rootserver::ObDDLTask* task,
|
||||
const uint64_t table_id,
|
||||
const uint64_t target_table_id,
|
||||
|
Loading…
x
Reference in New Issue
Block a user