[Fix] update check sstable status ret code
This commit is contained in:
parent
59111c1ece
commit
4666798151
@ -1922,7 +1922,6 @@ int ObRpcRemoteWriteDDLPrepareLogP::process()
|
||||
ObDDLSSTableRedoWriter sstable_redo_writer;
|
||||
ObLSService *ls_service = MTL(ObLSService*);
|
||||
ObLSHandle ls_handle;
|
||||
ObArray<uint64_t> column_ids;
|
||||
ObTabletHandle tablet_handle;
|
||||
ObDDLKvMgrHandle ddl_kv_mgr_handle;
|
||||
if (OB_UNLIKELY(!arg_.is_valid())) {
|
||||
@ -1937,20 +1936,16 @@ int ObRpcRemoteWriteDDLPrepareLogP::process()
|
||||
} else if (OB_FAIL(sstable_redo_writer.init(arg_.ls_id_, table_key.tablet_id_))) {
|
||||
LOG_WARN("init sstable redo writer", K(ret), K(table_key));
|
||||
} else if (FALSE_IT(sstable_redo_writer.set_start_scn(arg_.start_scn_))) {
|
||||
} else if (OB_FAIL(ObDDLUtil::get_tenant_schema_column_ids(tenant_id, arg_.table_id_, column_ids))) {
|
||||
LOG_WARN("fail to get tenant schema column ids", K(ret), K(arg_));
|
||||
} else {
|
||||
SCN prepare_scn;
|
||||
if (OB_FAIL(sstable_redo_writer.write_prepare_log(table_key,
|
||||
arg_.table_id_,
|
||||
arg_.execution_id_,
|
||||
arg_.ddl_task_id_,
|
||||
prepare_scn,
|
||||
column_ids))) {
|
||||
prepare_scn))) {
|
||||
LOG_WARN("fail to remote write commit log", K(ret), K(table_key), K_(arg));
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_prepare(arg_.start_scn_,
|
||||
prepare_scn,
|
||||
column_ids,
|
||||
arg_.table_id_,
|
||||
arg_.ddl_task_id_))) {
|
||||
LOG_WARN("failed to do ddl kv prepare", K(ret), K(arg_));
|
||||
@ -1972,7 +1967,6 @@ int ObRpcRemoteWriteDDLCommitLogP::process()
|
||||
ObDDLSSTableRedoWriter sstable_redo_writer;
|
||||
ObLSService *ls_service = MTL(ObLSService*);
|
||||
ObLSHandle ls_handle;
|
||||
ObArray<uint64_t> column_ids;
|
||||
ObTabletHandle tablet_handle;
|
||||
ObDDLKvMgrHandle ddl_kv_mgr_handle;
|
||||
if (OB_UNLIKELY(!arg_.is_valid())) {
|
||||
@ -1987,15 +1981,11 @@ int ObRpcRemoteWriteDDLCommitLogP::process()
|
||||
} else if (OB_FAIL(sstable_redo_writer.init(arg_.ls_id_, table_key.tablet_id_))) {
|
||||
LOG_WARN("init sstable redo writer", K(ret), K(table_key));
|
||||
} else if (FALSE_IT(sstable_redo_writer.set_start_scn(arg_.start_scn_))) {
|
||||
} else if (OB_FAIL(ObDDLUtil::get_tenant_schema_column_ids(tenant_id, arg_.table_id_, column_ids))) {
|
||||
LOG_WARN("fail to get tenant schema column ids", K(ret), K(arg_));
|
||||
} else {
|
||||
// wait in rpc framework may cause rpc timeout, need sync commit via rs @xiajin
|
||||
if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_commit(
|
||||
arg_.start_scn_, arg_.prepare_scn_, arg_.table_id_, arg_.ddl_task_id_, column_ids))) {
|
||||
if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_commit(arg_.start_scn_, arg_.prepare_scn_))) {
|
||||
LOG_WARN("failed to wait ddl kv commit", K(ret), K(arg_));
|
||||
} else if (OB_FAIL(sstable_redo_writer.write_commit_log(
|
||||
table_key, arg_.prepare_scn_, arg_.table_id_, arg_.ddl_task_id_, column_ids))) {
|
||||
} else if (OB_FAIL(sstable_redo_writer.write_commit_log(table_key, arg_.prepare_scn_))) {
|
||||
LOG_WARN("fail to remote write commit log", K(ret), K(table_key), K_(arg));
|
||||
}
|
||||
}
|
||||
|
@ -954,48 +954,6 @@ int ObDDLUtil::check_table_exist(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLUtil::get_tenant_schema_column_ids(
|
||||
const uint64_t tenant_id,
|
||||
const uint64_t table_id,
|
||||
ObIArray<uint64_t> &column_ids)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
ObMultiVersionSchemaService *schema_service = GCTX.schema_service_;
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
const ObTableSchema *table_schema = nullptr;
|
||||
ObArray<ObColDesc> column_desc;
|
||||
column_ids.reset(); // make clear
|
||||
|
||||
if (OB_UNLIKELY(!is_valid_id(table_id) || 0 == table_id || !is_valid_tenant_id(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(table_id), K(tenant_id));
|
||||
} else if (OB_ISNULL(schema_service)) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("schema service is null pointer", K(ret), K(tenant_id), KP(schema_service));
|
||||
} else if (OB_FAIL(schema_service->get_tenant_schema_guard(tenant_id, schema_guard))) {
|
||||
LOG_WARN("get tenant schema guard failed", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
|
||||
LOG_WARN("get table schema failed", K(ret), K(tenant_id), K(table_id));
|
||||
} else if (OB_ISNULL(table_schema)) {
|
||||
ret = OB_TABLE_NOT_EXIST;
|
||||
LOG_INFO("table not exit", K(ret), K(tenant_id), K(table_id));
|
||||
} else if (OB_FAIL(table_schema->get_multi_version_column_descs(column_desc))) {
|
||||
LOG_WARN("fail to get column ids", K(ret), K(ls_id), K(tenant_id), K(table_id));
|
||||
} else if (column_desc.count() <= 0) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("columns desc should not be zero", K(ret), K(tenant_id), K(table_id), K(column_desc));
|
||||
} else {
|
||||
int64_t column_idx = 0;
|
||||
for (column_idx = 0; OB_SUCC(ret) && column_idx < column_desc.count(); ++column_idx) {
|
||||
if (OB_FAIL(column_ids.push_back(column_desc.at(column_idx).col_id_))) {
|
||||
LOG_WARN("fail to get columns ids", K(ret), K(tenant_id), K(table_id), K(column_desc));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t ObDDLUtil::get_ddl_rpc_timeout()
|
||||
{
|
||||
return max(GCONF.rpc_timeout, 9 * 1000 * 1000L);
|
||||
@ -1509,11 +1467,13 @@ int ObCheckTabletDataComplementOp::check_all_tablet_sstable_status(
|
||||
LOG_WARN("fail to get tablets", K(ret), K(tenant_id), K(index_table_id));
|
||||
} else if (OB_FAIL(check_tablet_merge_status(tenant_id, dest_tablet_ids, snapshot_version, is_all_sstable_build_finished))){
|
||||
LOG_WARN("fail to check tablet merge status.", K(ret), K(tenant_id), K(dest_tablet_ids), K(snapshot_version));
|
||||
} else if (OB_FAIL(is_all_sstable_build_finished
|
||||
&& check_tablet_checksum_update_status(tenant_id, table_id, ddl_task_id, execution_id, dest_tablet_ids, tablet_checksum_status))) {
|
||||
LOG_WARN("fail to check tablet checksum update status.", K(ret), K(tenant_id), K(dest_tablet_ids), K(execution_id));
|
||||
} else {
|
||||
is_all_sstable_build_finished &= tablet_checksum_status;
|
||||
if (is_all_sstable_build_finished) {
|
||||
if (OB_FAIL(check_tablet_checksum_update_status(tenant_id, index_table_id, ddl_task_id, execution_id, dest_tablet_ids, tablet_checksum_status))) {
|
||||
LOG_WARN("fail to check tablet checksum update status.", K(ret), K(tenant_id), K(dest_tablet_ids), K(execution_id));
|
||||
}
|
||||
is_all_sstable_build_finished &= tablet_checksum_status;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -1561,6 +1521,9 @@ int ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(
|
||||
usleep(10 * 1000); // sleep 10ms
|
||||
}
|
||||
}
|
||||
if (OB_EAGAIN == ret) { // retry
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
need_wait = !is_all_sstable_build_finished && is_old_task_session_exist;
|
||||
} while (OB_SUCC(ret) && need_wait); // TODO: time out
|
||||
///// end
|
||||
|
@ -274,10 +274,6 @@ public:
|
||||
const ObTabletID &tablet_id,
|
||||
storage::ObTabletHandle &tablet_handle,
|
||||
const int64_t timeout_us = storage::ObTabletCommon::DEFAULT_GET_TABLET_TIMEOUT_US);
|
||||
static int get_tenant_schema_column_ids(
|
||||
const uint64_t tenant_id,
|
||||
const uint64_t table_id,
|
||||
ObIArray<uint64_t> &column_ids);
|
||||
|
||||
static int clear_ddl_checksum(sql::ObPhysicalPlan *phy_plan);
|
||||
|
||||
|
@ -6741,36 +6741,31 @@ OB_SERIALIZE_MEMBER(ObRpcRemoteWriteDDLPrepareLogArg, tenant_id_, ls_id_, table_
|
||||
table_id_, execution_id_, ddl_task_id_);
|
||||
|
||||
ObRpcRemoteWriteDDLCommitLogArg::ObRpcRemoteWriteDDLCommitLogArg()
|
||||
: tenant_id_(OB_INVALID_ID), ls_id_(), table_key_(), start_scn_(SCN::min_scn()), prepare_scn_(SCN::min_scn()),
|
||||
table_id_(0), ddl_task_id_(0)
|
||||
: tenant_id_(OB_INVALID_ID), ls_id_(), table_key_(), start_scn_(SCN::min_scn()), prepare_scn_(SCN::min_scn())
|
||||
{}
|
||||
|
||||
int ObRpcRemoteWriteDDLCommitLogArg::init(const uint64_t tenant_id,
|
||||
const share::ObLSID &ls_id,
|
||||
const storage::ObITable::TableKey &table_key,
|
||||
const SCN &start_scn,
|
||||
const SCN &prepare_scn,
|
||||
const uint64_t table_id,
|
||||
const int64_t ddl_task_id)
|
||||
const SCN &prepare_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(tenant_id == OB_INVALID_ID || !ls_id.is_valid() || !table_key.is_valid() || !start_scn.is_valid_and_not_min()
|
||||
|| !prepare_scn.is_valid_and_not_min() || table_id <= 0 || ddl_task_id <= 0)) {
|
||||
|| !prepare_scn.is_valid_and_not_min())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("tablet id is not valid", K(ret), K(tenant_id), K(ls_id), K(table_key), K(start_scn), K(prepare_scn), K(table_id), K(ddl_task_id));
|
||||
LOG_WARN("tablet id is not valid", K(ret), K(tenant_id), K(ls_id), K(table_key), K(start_scn), K(prepare_scn));
|
||||
} else {
|
||||
tenant_id_ = tenant_id;
|
||||
ls_id_ = ls_id;
|
||||
table_key_ = table_key;
|
||||
start_scn_ = start_scn;
|
||||
prepare_scn_ = prepare_scn;
|
||||
table_id_ = table_id;
|
||||
ddl_task_id_ = ddl_task_id;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObRpcRemoteWriteDDLCommitLogArg, tenant_id_, ls_id_, table_key_, start_scn_, prepare_scn_, table_id_, ddl_task_id_);
|
||||
OB_SERIALIZE_MEMBER(ObRpcRemoteWriteDDLCommitLogArg, tenant_id_, ls_id_, table_key_, start_scn_, prepare_scn_);
|
||||
|
||||
bool ObCheckLSCanOfflineArg::is_valid() const
|
||||
{
|
||||
|
@ -7555,23 +7555,19 @@ public:
|
||||
const share::ObLSID &ls_id,
|
||||
const storage::ObITable::TableKey &table_key,
|
||||
const share::SCN &start_scn,
|
||||
const share::SCN &prepare_scn,
|
||||
const uint64_t table_id,
|
||||
const int64_t ddl_task_id);
|
||||
const share::SCN &prepare_scn);
|
||||
bool is_valid() const
|
||||
{
|
||||
return tenant_id_ != OB_INVALID_ID && ls_id_.is_valid() && table_key_.is_valid() && start_scn_.is_valid_and_not_min()
|
||||
&& prepare_scn_.is_valid_and_not_min() && table_id_ > 0 && ddl_task_id_ > 0;
|
||||
&& prepare_scn_.is_valid_and_not_min();
|
||||
}
|
||||
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(table_key), K_(start_scn), K_(prepare_scn), K_(table_id), K_(ddl_task_id));
|
||||
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(table_key), K_(start_scn), K_(prepare_scn));
|
||||
public:
|
||||
uint64_t tenant_id_;
|
||||
share::ObLSID ls_id_;
|
||||
storage::ObITable::TableKey table_key_;
|
||||
share::SCN start_scn_;
|
||||
share::SCN prepare_scn_;
|
||||
uint64_t table_id_;
|
||||
int64_t ddl_task_id_;
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObRpcRemoteWriteDDLCommitLogArg);
|
||||
};
|
||||
|
@ -1143,7 +1143,6 @@ int ObComplementMergeTask::process()
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObIDag *tmp_dag = get_dag();
|
||||
ObArray<uint64_t> column_ids;
|
||||
ObComplementDataDag *dag = nullptr;
|
||||
ObTabletHandle tablet_handle;
|
||||
ObTablet *tablet = nullptr;
|
||||
@ -1162,17 +1161,12 @@ int ObComplementMergeTask::process()
|
||||
} else if (OB_ISNULL(latest_major_sstable)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error, major sstable shoud not be null", K(ret), K(*param_));
|
||||
} else if (OB_FAIL(ObDDLUtil::get_tenant_schema_column_ids(param_->hidden_table_schema_->get_tenant_id(),
|
||||
param_->hidden_table_schema_->get_table_id(),
|
||||
column_ids))) {
|
||||
LOG_WARN("fail to get tenant schema column ids", K(ret), K(*param_));
|
||||
} else if (OB_FAIL(ObTabletDDLUtil::report_ddl_checksum(param_->ls_id_,
|
||||
param_->dest_tablet_id_,
|
||||
param_->hidden_table_schema_->get_table_id(),
|
||||
1 /* execution_id */,
|
||||
param_->task_id_,
|
||||
latest_major_sstable->get_meta().get_col_checksum(),
|
||||
column_ids))) {
|
||||
latest_major_sstable->get_meta().get_col_checksum()))) {
|
||||
LOG_WARN("report ddl column checksum failed", K(ret), K(*param_));
|
||||
} else if (OB_FAIL(GCTX.ob_service_->submit_tablet_update_task(param_->tenant_id_, param_->ls_id_, param_->dest_tablet_id_))) {
|
||||
LOG_WARN("fail to submit tablet update task", K(ret), K(*param_));
|
||||
@ -1200,7 +1194,6 @@ int ObComplementMergeTask::add_build_hidden_table_sstable()
|
||||
ObTablet *tablet = nullptr;
|
||||
ObTabletHandle tablet_handle;
|
||||
ObITable::TableKey hidden_table_key;
|
||||
ObArray<uint64_t> column_ids;
|
||||
SCN prepare_scn;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
@ -1225,14 +1218,11 @@ int ObComplementMergeTask::add_build_hidden_table_sstable()
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(ObDDLUtil::get_tenant_schema_column_ids(MTL_ID(), param_->hidden_table_schema_->get_table_id(), column_ids))) {
|
||||
LOG_WARN("fail to get tenant schema column ids", K(ret), K(param_));
|
||||
} else if (OB_FAIL(context_->data_sstable_redo_writer_.write_prepare_log(hidden_table_key,
|
||||
param_->hidden_table_schema_->get_table_id(),
|
||||
param_->execution_id_,
|
||||
param_->task_id_,
|
||||
prepare_scn,
|
||||
column_ids))) {
|
||||
prepare_scn))) {
|
||||
if (OB_TASK_EXPIRED == ret) {
|
||||
LOG_INFO("ddl task expired", K(ret), K(hidden_table_key), KPC(param_));
|
||||
} else {
|
||||
@ -1250,20 +1240,16 @@ int ObComplementMergeTask::add_build_hidden_table_sstable()
|
||||
LOG_WARN("get ddl kv manager failed", K(ret));
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_prepare(ddl_start_scn,
|
||||
prepare_scn,
|
||||
column_ids,
|
||||
table_id,
|
||||
ddl_task_id))) {
|
||||
LOG_WARN("commit ddl log failed", K(ret), K(ls_id), K(tablet_id), K(prepare_scn), K(hidden_table_key), K(column_ids),
|
||||
LOG_WARN("commit ddl log failed", K(ret), K(ls_id), K(tablet_id), K(prepare_scn), K(hidden_table_key),
|
||||
K(ddl_start_scn), "new_ddl_start_scn", ddl_kv_mgr_handle.get_obj()->get_start_scn());
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_commit(ddl_start_scn, prepare_scn, table_id, ddl_task_id, column_ids))) {
|
||||
LOG_WARN("wait ddl commit failed", K(ret), K(ls_id), K(tablet_id), K(hidden_table_key), K(column_ids),
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_commit(ddl_start_scn, prepare_scn))) {
|
||||
LOG_WARN("wait ddl commit failed", K(ret), K(ls_id), K(tablet_id), K(hidden_table_key),
|
||||
K(ddl_start_scn), "new_ddl_start_scn", ddl_kv_mgr_handle.get_obj()->get_start_scn());
|
||||
} else if (OB_FAIL(context_->data_sstable_redo_writer_.write_commit_log(hidden_table_key,
|
||||
prepare_scn,
|
||||
table_id,
|
||||
ddl_task_id,
|
||||
column_ids))) {
|
||||
LOG_WARN("fail write ddl commit log", K(ret), K(hidden_table_key), K(column_ids));
|
||||
prepare_scn))) {
|
||||
LOG_WARN("fail write ddl commit log", K(ret), K(hidden_table_key));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -305,67 +305,50 @@ int ObDDLRedoLog::init(const blocksstable::ObDDLMacroBlockRedoInfo &redo_info)
|
||||
OB_SERIALIZE_MEMBER(ObDDLRedoLog, redo_info_);
|
||||
|
||||
ObDDLPrepareLog::ObDDLPrepareLog()
|
||||
: table_key_(), start_scn_(SCN::min_scn()), table_id_(0), ddl_task_id_(0), column_ids_()
|
||||
: table_key_(), start_scn_(SCN::min_scn())
|
||||
{
|
||||
}
|
||||
|
||||
int ObDDLPrepareLog::init(const ObITable::TableKey &table_key,
|
||||
const SCN &start_scn,
|
||||
const uint64_t table_id,
|
||||
const int64_t ddl_task_id,
|
||||
const ObIArray<uint64_t> &column_ids)
|
||||
const SCN &start_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!table_key.is_valid() || !start_scn.is_valid_and_not_min() || table_id <= 0 || ddl_task_id <= 0) {
|
||||
if (!table_key.is_valid() || !start_scn.is_valid_and_not_min()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(table_key), K(start_scn), K(table_id), K(ddl_task_id));
|
||||
} else if (OB_FAIL(column_ids_.assign(column_ids))) {
|
||||
LOG_WARN("columns ids assigned fail.", K(ret), K(column_ids));
|
||||
LOG_WARN("invalid argument", K(ret), K(table_key), K(start_scn));
|
||||
} else {
|
||||
table_key_ = table_key;
|
||||
start_scn_ = start_scn;
|
||||
table_id_ = table_id;
|
||||
ddl_task_id_ = ddl_task_id;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObDDLPrepareLog, table_key_, start_scn_, table_id_, ddl_task_id_, column_ids_);
|
||||
OB_SERIALIZE_MEMBER(ObDDLPrepareLog, table_key_, start_scn_);
|
||||
|
||||
ObDDLCommitLog::ObDDLCommitLog()
|
||||
: table_key_(), start_scn_(SCN::min_scn()), prepare_scn_(SCN::min_scn()),
|
||||
table_id_(0), ddl_task_id_(0), column_ids_()
|
||||
: table_key_(), start_scn_(SCN::min_scn()), prepare_scn_(SCN::min_scn())
|
||||
{
|
||||
}
|
||||
|
||||
int ObDDLCommitLog::init(const ObITable::TableKey &table_key,
|
||||
const SCN &start_scn,
|
||||
const SCN &prepare_scn,
|
||||
const uint64_t table_id,
|
||||
const int64_t ddl_task_id,
|
||||
const ObIArray<uint64_t> &column_ids)
|
||||
const SCN &prepare_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!table_key.is_valid() ||
|
||||
!start_scn.is_valid_and_not_min() ||
|
||||
!prepare_scn.is_valid_and_not_min() ||
|
||||
table_id <= 0 ||
|
||||
ddl_task_id <= 0) {
|
||||
!prepare_scn.is_valid_and_not_min()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(table_key), K(start_scn), K(prepare_scn), K(table_id), K(ddl_task_id));
|
||||
} else if (OB_FAIL(column_ids_.assign(column_ids))) {
|
||||
LOG_WARN("columns ids assigned fail.", K(ret), K(column_ids));
|
||||
LOG_WARN("invalid argument", K(ret), K(table_key), K(start_scn), K(prepare_scn));
|
||||
} else {
|
||||
table_key_ = table_key;
|
||||
start_scn_ = start_scn;
|
||||
prepare_scn_ = prepare_scn;
|
||||
table_id_ = table_id;
|
||||
ddl_task_id_ = ddl_task_id;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObDDLCommitLog, table_key_, start_scn_, prepare_scn_, table_id_, ddl_task_id_, column_ids_);
|
||||
OB_SERIALIZE_MEMBER(ObDDLCommitLog, table_key_, start_scn_, prepare_scn_);
|
||||
|
||||
ObTabletSchemaVersionChangeLog::ObTabletSchemaVersionChangeLog()
|
||||
: tablet_id_(), schema_version_(-1)
|
||||
|
@ -178,24 +178,14 @@ public:
|
||||
ObDDLPrepareLog();
|
||||
~ObDDLPrepareLog() = default;
|
||||
int init(const ObITable::TableKey &table_key,
|
||||
const share::SCN &start_scn,
|
||||
const uint64_t table_id,
|
||||
const int64_t ddl_task_id,
|
||||
const ObIArray<uint64_t> &column_ids);
|
||||
bool is_valid() const { return table_key_.is_valid() && start_scn_.is_valid() &&
|
||||
table_id_ > 0 && ddl_task_id_ > 0 && column_ids_.count() > 0; }
|
||||
const share::SCN &start_scn);
|
||||
bool is_valid() const { return table_key_.is_valid() && start_scn_.is_valid(); }
|
||||
ObITable::TableKey get_table_key() const { return table_key_; }
|
||||
share::SCN get_start_scn() const { return start_scn_; }
|
||||
uint64_t get_table_id() const { return table_id_; }
|
||||
int64_t get_ddl_task_id() const { return ddl_task_id_; }
|
||||
const ObSArray<uint64_t> &get_ddl_column_ids() const { return column_ids_; }
|
||||
TO_STRING_KV(K_(table_key), K_(start_scn));
|
||||
private:
|
||||
ObITable::TableKey table_key_;
|
||||
share::SCN start_scn_;
|
||||
uint64_t table_id_;
|
||||
int64_t ddl_task_id_;
|
||||
ObSArray<uint64_t> column_ids_;
|
||||
};
|
||||
|
||||
class ObDDLCommitLog final
|
||||
@ -206,31 +196,19 @@ public:
|
||||
~ObDDLCommitLog() = default;
|
||||
int init(const ObITable::TableKey &table_key,
|
||||
const share::SCN &start_scn,
|
||||
const share::SCN &prepare_scn,
|
||||
const uint64_t table_id,
|
||||
const int64_t ddl_task_id,
|
||||
const ObIArray<uint64_t> &column_ids);
|
||||
const share::SCN &prepare_scn);
|
||||
bool is_valid() const {
|
||||
return table_key_.is_valid() &&
|
||||
start_scn_.is_valid() &&
|
||||
prepare_scn_.is_valid() &&
|
||||
table_id_ > 0 &&
|
||||
ddl_task_id_ > 0 &&
|
||||
column_ids_.count() > 0;}
|
||||
prepare_scn_.is_valid(); }
|
||||
ObITable::TableKey get_table_key() const { return table_key_; }
|
||||
share::SCN get_start_scn() const { return start_scn_; }
|
||||
share::SCN get_prepare_scn() const { return prepare_scn_; }
|
||||
uint64_t get_table_id() const { return table_id_; }
|
||||
int64_t get_ddl_task_id() const { return ddl_task_id_; }
|
||||
const ObSArray<uint64_t> &get_ddl_column_ids() const { return column_ids_; }
|
||||
TO_STRING_KV(K_(table_key), K_(start_scn), K_(prepare_scn));
|
||||
private:
|
||||
ObITable::TableKey table_key_;
|
||||
share::SCN start_scn_;
|
||||
share::SCN prepare_scn_;
|
||||
uint64_t table_id_; // used for report ddl checksum
|
||||
int64_t ddl_task_id_; // used for report ddl checksum
|
||||
ObSArray<uint64_t> column_ids_;
|
||||
};
|
||||
|
||||
class ObTabletSchemaVersionChangeLog final
|
||||
|
@ -409,8 +409,7 @@ int ObDDLTableMergeTask::process()
|
||||
merge_param_.table_id_,
|
||||
merge_param_.execution_id_,
|
||||
merge_param_.ddl_task_id_,
|
||||
sstable->get_meta().get_col_checksum(),
|
||||
merge_param_.column_ids_))) {
|
||||
sstable->get_meta().get_col_checksum()))) {
|
||||
LOG_WARN("report ddl column checksum failed", K(ret), K(merge_param_));
|
||||
} else if (OB_FAIL(GCTX.ob_service_->submit_tablet_update_task(tenant_id, merge_param_.ls_id_, merge_param_.tablet_id_))) {
|
||||
LOG_WARN("fail to submit tablet update task", K(ret), K(tenant_id), K(merge_param_));
|
||||
@ -769,11 +768,11 @@ int ObTabletDDLUtil::report_ddl_checksum(const share::ObLSID &ls_id,
|
||||
const uint64_t table_id,
|
||||
const int64_t execution_id,
|
||||
const int64_t ddl_task_id,
|
||||
const ObIArray<int64_t> &column_checksums,
|
||||
const ObIArray<uint64_t> &column_ids)
|
||||
const ObIArray<int64_t> &column_checksums)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMySQLProxy *sql_proxy = GCTX.sql_proxy_;
|
||||
ObMultiVersionSchemaService *schema_service = GCTX.schema_service_;
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
const ObTableSchema *table_schema = nullptr;
|
||||
const uint64_t tenant_id = MTL_ID();
|
||||
@ -781,12 +780,22 @@ int ObTabletDDLUtil::report_ddl_checksum(const share::ObLSID &ls_id,
|
||||
|| !is_valid_id(table_id) || 0 == table_id || execution_id < 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(ls_id), K(tablet_id), K(table_id), K(execution_id));
|
||||
} else if (!is_valid_tenant_id(tenant_id) || OB_ISNULL(sql_proxy) || column_ids.count() <= 0) {
|
||||
} else if (!is_valid_tenant_id(tenant_id) || OB_ISNULL(sql_proxy) || OB_ISNULL(schema_service)) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("ls service or sql proxy is null", K(ret), K(tenant_id), KP(sql_proxy), K(column_ids));
|
||||
LOG_WARN("ls service or sql proxy is null", K(ret), K(tenant_id), KP(sql_proxy), KP(schema_service));
|
||||
} else if (OB_FAIL(schema_service->get_tenant_schema_guard(tenant_id, schema_guard))) {
|
||||
LOG_WARN("get tenant schema guard failed", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
|
||||
LOG_WARN("get table schema failed", K(ret), K(tenant_id), K(table_id));
|
||||
} else if (OB_ISNULL(table_schema)) {
|
||||
ret = OB_TABLE_NOT_EXIST;
|
||||
LOG_INFO("table not exit", K(ret), K(tenant_id), K(table_id));
|
||||
} else {
|
||||
ObArray<ObColDesc> column_ids;
|
||||
ObArray<ObDDLChecksumItem> ddl_checksum_items;
|
||||
if (OB_UNLIKELY(column_checksums.count() != column_ids.count())) {
|
||||
if (OB_FAIL(table_schema->get_multi_version_column_descs(column_ids))) {
|
||||
LOG_WARN("fail to get column ids", K(ret), K(ls_id), K(tablet_id));
|
||||
} else if (OB_UNLIKELY(column_checksums.count() != column_ids.count())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpect error, column checksums count didn't equal to column ids count", K(ret),
|
||||
K(ls_id), K(tablet_id), K(column_checksums.count()), K(column_ids.count()));
|
||||
@ -797,7 +806,7 @@ int ObTabletDDLUtil::report_ddl_checksum(const share::ObLSID &ls_id,
|
||||
item.tenant_id_ = tenant_id;
|
||||
item.table_id_ = table_id;
|
||||
item.ddl_task_id_ = ddl_task_id;
|
||||
item.column_id_ = column_ids.at(i);
|
||||
item.column_id_ = column_ids.at(i).col_id_;
|
||||
item.task_id_ = tablet_id.id();
|
||||
item.checksum_ = column_checksums.at(i);
|
||||
#ifdef ERRSIM
|
||||
|
@ -47,16 +47,15 @@ public:
|
||||
start_scn_(share::SCN::min_scn()),
|
||||
table_id_(0),
|
||||
execution_id_(-1),
|
||||
ddl_task_id_(0),
|
||||
column_ids_()
|
||||
ddl_task_id_(0)
|
||||
{ }
|
||||
bool is_valid() const
|
||||
{
|
||||
return ls_id_.is_valid() && tablet_id_.is_valid() && start_scn_.is_valid_and_not_min() && column_ids_.count() > 0;
|
||||
return ls_id_.is_valid() && tablet_id_.is_valid() && start_scn_.is_valid_and_not_min();
|
||||
}
|
||||
virtual ~ObDDLTableMergeDagParam() = default;
|
||||
TO_STRING_KV(K_(ls_id), K_(tablet_id), K_(rec_scn), K_(is_commit), K_(start_scn),
|
||||
K_(table_id), K_(execution_id), K_(ddl_task_id), K_(column_ids));
|
||||
K_(table_id), K_(execution_id), K_(ddl_task_id));
|
||||
public:
|
||||
share::ObLSID ls_id_;
|
||||
ObTabletID tablet_id_;
|
||||
@ -66,7 +65,6 @@ public:
|
||||
uint64_t table_id_; // used for report ddl checksum
|
||||
int64_t execution_id_; // used for report ddl checksum
|
||||
int64_t ddl_task_id_; // used for report ddl checksum
|
||||
ObArray<uint64_t> column_ids_;
|
||||
};
|
||||
|
||||
class ObDDLTableMergeDag : public share::ObIDag
|
||||
@ -175,8 +173,7 @@ public:
|
||||
const uint64_t table_id,
|
||||
const int64_t execution_id,
|
||||
const int64_t ddl_task_id,
|
||||
const ObIArray<int64_t> &column_checksums,
|
||||
const ObIArray<uint64_t> &column_ids);
|
||||
const ObIArray<int64_t> &column_checksums);
|
||||
static int check_and_get_major_sstable(const share::ObLSID &ls_id,
|
||||
const ObTabletID &tablet_id,
|
||||
const blocksstable::ObSSTable *&latest_major_sstable);
|
||||
|
@ -181,11 +181,7 @@ int ObDDLRedoLogReplayer::replay_prepare(const ObDDLPrepareLog &log, const SCN &
|
||||
LOG_WARN("need replay but tablet handle is invalid", K(ret), K(need_replay), K(tablet_handle), K(log), K(scn));
|
||||
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
|
||||
LOG_WARN("get ddl kv mgr failed", K(ret), K(log), K(scn));
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_prepare(log.get_start_scn(),
|
||||
scn,
|
||||
log.get_ddl_column_ids(),
|
||||
log.get_table_id(),
|
||||
log.get_ddl_task_id()))) {
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_prepare(log.get_start_scn(), scn))) {
|
||||
LOG_WARN("replay ddl prepare log failed", K(ret), K(log), K(scn));
|
||||
} else {
|
||||
LOG_INFO("replay ddl prepare log success", K(ret), K(log), K(scn));
|
||||
@ -223,12 +219,7 @@ int ObDDLRedoLogReplayer::replay_commit(const ObDDLCommitLog &log, const SCN &sc
|
||||
LOG_WARN("need replay but tablet handle is invalid", K(ret), K(need_replay), K(tablet_handle), K(log), K(scn));
|
||||
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
|
||||
LOG_WARN("get ddl kv mgr failed", K(ret), K(log), K(scn));
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(log.get_start_scn(),
|
||||
log.get_prepare_scn(),
|
||||
true/*is_replay*/,
|
||||
log.get_table_id(),
|
||||
log.get_ddl_task_id(),
|
||||
log.get_ddl_column_ids()))) {
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(log.get_start_scn(), log.get_prepare_scn(), true/*is_replay*/))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
LOG_WARN("replay ddl commit log failed", K(ret), K(log), K(scn));
|
||||
}
|
||||
|
@ -1061,8 +1061,7 @@ int ObDDLSSTableRedoWriter::write_prepare_log(const ObITable::TableKey &table_ke
|
||||
const int64_t table_id,
|
||||
const int64_t execution_id,
|
||||
const int64_t ddl_task_id,
|
||||
SCN &prepare_scn,
|
||||
const ObIArray<uint64_t> &column_ids)
|
||||
SCN &prepare_scn)
|
||||
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1082,10 +1081,10 @@ int ObDDLSSTableRedoWriter::write_prepare_log(const ObITable::TableKey &table_ke
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObDDLSSTableRedoWriter has not been inited", K(ret));
|
||||
} else if (OB_UNLIKELY(!table_key.is_valid() || !start_scn_.is_valid_and_not_min() || column_ids.count() < 0)) {
|
||||
} else if (OB_UNLIKELY(!table_key.is_valid() || !start_scn_.is_valid_and_not_min())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(table_key), K(start_scn_), K(column_ids));
|
||||
} else if (OB_FAIL(log.init(table_key, get_start_scn(), table_id, ddl_task_id, column_ids))) {
|
||||
LOG_WARN("invalid arguments", K(ret), K(table_key), K(start_scn_));
|
||||
} else if (OB_FAIL(log.init(table_key, get_start_scn()))) {
|
||||
LOG_WARN("fail to init DDLCommitLog", K(ret), K(table_key), K(start_scn_));
|
||||
} else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) {
|
||||
LOG_WARN("get ls failed", K(ret), K(ls_id_));
|
||||
@ -1126,10 +1125,7 @@ int ObDDLSSTableRedoWriter::write_prepare_log(const ObITable::TableKey &table_ke
|
||||
}
|
||||
|
||||
int ObDDLSSTableRedoWriter::write_commit_log(const ObITable::TableKey &table_key,
|
||||
const SCN &prepare_scn,
|
||||
const uint64_t table_id,
|
||||
const int64_t ddl_task_id,
|
||||
const ObIArray<uint64_t> &column_ids)
|
||||
const SCN &prepare_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSHandle ls_handle;
|
||||
@ -1139,11 +1135,10 @@ int ObDDLSSTableRedoWriter::write_commit_log(const ObITable::TableKey &table_key
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObDDLSSTableRedoWriter has not been inited", K(ret));
|
||||
} else if (OB_UNLIKELY(!table_key.is_valid() || !start_scn_.is_valid_and_not_min() ||
|
||||
!prepare_scn.is_valid_and_not_min() || column_ids.count() < 0)) {
|
||||
} else if (OB_UNLIKELY(!table_key.is_valid() || !start_scn_.is_valid_and_not_min() || !prepare_scn.is_valid_and_not_min())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(table_key), K(start_scn_), K(prepare_scn), K(column_ids));
|
||||
} else if (OB_FAIL(log.init(table_key, get_start_scn(), prepare_scn, table_id, ddl_task_id, column_ids))) {
|
||||
LOG_WARN("invalid arguments", K(ret), K(table_key), K(start_scn_), K(prepare_scn));
|
||||
} else if (OB_FAIL(log.init(table_key, get_start_scn(), prepare_scn))) {
|
||||
LOG_WARN("fail to init DDLCommitLog", K(ret), K(table_key), K(start_scn_), K(prepare_scn));
|
||||
} else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) {
|
||||
LOG_WARN("get ls failed", K(ret), K(ls_id_));
|
||||
@ -1167,7 +1162,7 @@ int ObDDLSSTableRedoWriter::write_commit_log(const ObITable::TableKey &table_key
|
||||
ObSrvRpcProxy *srv_rpc_proxy = GCTX.srv_rpc_proxy_;
|
||||
obrpc::ObRpcRemoteWriteDDLCommitLogArg arg;
|
||||
obrpc::Int64 log_ns;
|
||||
if (OB_FAIL(arg.init(MTL_ID(), leader_ls_id_, table_key, get_start_scn(), prepare_scn, table_id, ddl_task_id))) {
|
||||
if (OB_FAIL(arg.init(MTL_ID(), leader_ls_id_, table_key, get_start_scn(), prepare_scn))) {
|
||||
LOG_WARN("fail to init ObRpcRemoteWriteDDLCommitLogArg", K(ret));
|
||||
} else if (OB_ISNULL(srv_rpc_proxy)) {
|
||||
ret = OB_ERR_SYS;
|
||||
|
@ -256,13 +256,9 @@ public:
|
||||
const int64_t table_id,
|
||||
const int64_t execution_id,
|
||||
const int64_t ddl_task_id,
|
||||
share::SCN &prepare_scn,
|
||||
const ObIArray<uint64_t> &column_ids);
|
||||
share::SCN &prepare_scn);
|
||||
int write_commit_log(const ObITable::TableKey &table_key,
|
||||
const share::SCN &prepare_scn,
|
||||
const uint64_t table_id,
|
||||
const int64_t ddl_task_id,
|
||||
const ObIArray<uint64_t> &column_ids);
|
||||
const share::SCN &prepare_scn);
|
||||
OB_INLINE void set_start_scn(const share::SCN &start_scn) { start_scn_.atomic_set(start_scn); }
|
||||
OB_INLINE share::SCN get_start_scn() const { return start_scn_.atomic_get(); }
|
||||
private:
|
||||
|
@ -655,7 +655,6 @@ int ObSSTableInsertTabletContext::create_sstable_with_clog(
|
||||
share::schema::ObMultiVersionSchemaService *schema_service = nullptr;
|
||||
const share::schema::ObTableSchema *table_schema = nullptr;
|
||||
const uint64_t tenant_id = MTL_ID();
|
||||
ObArray<uint64_t> column_ids;
|
||||
SCN prepare_scn;
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
if (OB_UNLIKELY(!table_key.is_valid())) {
|
||||
@ -677,14 +676,11 @@ int ObSSTableInsertTabletContext::create_sstable_with_clog(
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(ObDDLUtil::get_tenant_schema_column_ids(tenant_id, table_schema->get_table_id(), column_ids))) {
|
||||
LOG_WARN("fail to get tenant schema column ids", K(ret), K(tenant_id), K(build_param_));
|
||||
} else if (OB_FAIL(data_sstable_redo_writer_.write_prepare_log(table_key,
|
||||
table_schema->get_table_id(),
|
||||
build_param_.execution_id_,
|
||||
build_param_.ddl_task_id_,
|
||||
prepare_scn,
|
||||
column_ids))) {
|
||||
prepare_scn))) {
|
||||
if (OB_TASK_EXPIRED == ret) {
|
||||
LOG_INFO("ddl task expired", K(ret), K(table_key), K(build_param_));
|
||||
} else {
|
||||
@ -706,30 +702,25 @@ int ObSSTableInsertTabletContext::create_sstable_with_clog(
|
||||
LOG_WARN("get ddl kv manager failed", K(ret), K(ls_id), K(tablet_id));
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_prepare(ddl_start_scn,
|
||||
prepare_scn,
|
||||
column_ids,
|
||||
table_id,
|
||||
ddl_task_id))) {
|
||||
if (OB_TASK_EXPIRED == ret) {
|
||||
LOG_INFO("ddl task expired", K(ret), K(ls_id), K(tablet_id), K(column_ids),
|
||||
LOG_INFO("ddl task expired", K(ret), K(ls_id), K(tablet_id),
|
||||
K(ddl_start_scn), "new_ddl_start_scn", ddl_kv_mgr_handle.get_obj()->get_start_scn());
|
||||
} else {
|
||||
LOG_WARN("failed to do ddl kv prepare", K(ret), K(ddl_start_scn), K(prepare_scn), K(build_param_), K(column_ids));
|
||||
LOG_WARN("failed to do ddl kv prepare", K(ret), K(ddl_start_scn), K(prepare_scn), K(build_param_));
|
||||
}
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_commit(ddl_start_scn, prepare_scn, table_id, ddl_task_id, column_ids))) {
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_commit(ddl_start_scn, prepare_scn))) {
|
||||
if (OB_TASK_EXPIRED == ret) {
|
||||
LOG_INFO("ddl task expired, but return success", K(ret), K(ls_id), K(tablet_id),
|
||||
K(ddl_start_scn), "new_ddl_start_scn",
|
||||
ddl_kv_mgr_handle.get_obj()->get_start_scn(), K(build_param_),
|
||||
K(table_id), K(ddl_task_id), K(column_ids));
|
||||
ddl_kv_mgr_handle.get_obj()->get_start_scn(), K(build_param_));
|
||||
} else {
|
||||
LOG_WARN("failed to wait ddl kv commit", K(ret), K(ddl_start_scn), K(build_param_), K(column_ids));
|
||||
LOG_WARN("failed to wait ddl kv commit", K(ret), K(ddl_start_scn), K(build_param_));
|
||||
}
|
||||
} else if (OB_FAIL(data_sstable_redo_writer_.write_commit_log(table_key,
|
||||
prepare_scn,
|
||||
table_id,
|
||||
ddl_task_id,
|
||||
column_ids))) {
|
||||
LOG_WARN("fail write ddl commit log", K(ret), K(table_key), K(table_id), K(ddl_task_id), K(column_ids));
|
||||
prepare_scn))) {
|
||||
LOG_WARN("fail write ddl commit log", K(ret), K(table_key));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -151,7 +151,6 @@ int ObTabletDDLKvMgr::ddl_start(const ObITable::TableKey &table_key,
|
||||
int ObTabletDDLKvMgr::ddl_prepare(
|
||||
const SCN &start_scn,
|
||||
const SCN &prepare_scn,
|
||||
const ObIArray<uint64_t> &column_ids,
|
||||
const uint64_t table_id,
|
||||
const int64_t ddl_task_id)
|
||||
{
|
||||
@ -182,9 +181,7 @@ int ObTabletDDLKvMgr::ddl_prepare(
|
||||
param.execution_id_ = execution_id_;
|
||||
param.ddl_task_id_ = ddl_task_id_;
|
||||
const int64_t start_ts = ObTimeUtility::fast_current_time();
|
||||
if (OB_FAIL(param.column_ids_.assign(column_ids))) {
|
||||
LOG_WARN("fail to assign columns ids", K(ret), K(column_ids));
|
||||
}
|
||||
|
||||
while (OB_SUCC(ret) && is_started()) {
|
||||
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) {
|
||||
if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) {
|
||||
@ -210,10 +207,7 @@ int ObTabletDDLKvMgr::ddl_prepare(
|
||||
int ObTabletDDLKvMgr::ddl_commit(
|
||||
const SCN &start_scn,
|
||||
const SCN &prepare_scn,
|
||||
const bool is_replay,
|
||||
const uint64_t table_id,
|
||||
const int64_t ddl_task_id,
|
||||
const ObIArray<uint64_t> &column_ids)
|
||||
const bool is_replay)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSHandle ls_handle;
|
||||
@ -228,8 +222,6 @@ int ObTabletDDLKvMgr::ddl_commit(
|
||||
} else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) {
|
||||
LOG_WARN("failed to get log stream", K(ret), K(ls_id_));
|
||||
} else {
|
||||
table_id_ = table_id;
|
||||
ddl_task_id_ = ddl_task_id;
|
||||
|
||||
ObDDLTableMergeDagParam param;
|
||||
param.ls_id_ = ls_id_;
|
||||
@ -240,9 +232,8 @@ int ObTabletDDLKvMgr::ddl_commit(
|
||||
param.table_id_ = table_id_;
|
||||
param.execution_id_ = execution_id_;
|
||||
param.ddl_task_id_ = ddl_task_id_;
|
||||
if (OB_FAIL(param.column_ids_.assign(column_ids))) {
|
||||
LOG_WARN("fail to assign columns ids", K(ret), K(column_ids));
|
||||
} else if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) { // retry submit dag in case of the previous dag failed
|
||||
// retry submit dag in case of the previous dag failed
|
||||
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) {
|
||||
if (OB_SIZE_OVERFLOW == ret || OB_EAGAIN == ret) {
|
||||
ret = OB_EAGAIN;
|
||||
} else {
|
||||
@ -267,19 +258,15 @@ int ObTabletDDLKvMgr::ddl_commit(
|
||||
|
||||
int ObTabletDDLKvMgr::wait_ddl_commit(
|
||||
const SCN &start_scn,
|
||||
const SCN &prepare_scn,
|
||||
const uint64_t table_id,
|
||||
const int64_t ddl_task_id,
|
||||
const ObIArray<uint64_t> &column_ids)
|
||||
const SCN &prepare_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret), K(is_inited_));
|
||||
} else if (OB_UNLIKELY(!start_scn.is_valid_and_not_min() || !prepare_scn.is_valid_and_not_min() ||
|
||||
table_id <= 0 || ddl_task_id <= 0 || column_ids.count() <= 0)) {
|
||||
} else if (OB_UNLIKELY(!start_scn.is_valid_and_not_min() || !prepare_scn.is_valid_and_not_min())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(start_scn), K(prepare_scn), K(table_id), K(ddl_task_id), K(column_ids));
|
||||
LOG_WARN("invalid argument", K(ret), K(start_scn), K(prepare_scn));
|
||||
} else if (!is_started()) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
LOG_WARN("ddl not started", K(ret));
|
||||
@ -289,12 +276,12 @@ int ObTabletDDLKvMgr::wait_ddl_commit(
|
||||
} else {
|
||||
const int64_t wait_start_ts = ObTimeUtility::fast_current_time();
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(ddl_commit(start_scn, prepare_scn, false/*is_replay*/, table_id, ddl_task_id, column_ids))) {
|
||||
if (OB_FAIL(ddl_commit(start_scn, prepare_scn, false/*is_replay*/ ))) {
|
||||
if (OB_EAGAIN == ret) {
|
||||
ob_usleep(10L * 1000L);
|
||||
ret = OB_SUCCESS; // retry
|
||||
} else {
|
||||
LOG_WARN("commit ddl log failed", K(ret), K(start_scn), K(prepare_scn), K(ls_id_), K(tablet_id_), K(column_ids));
|
||||
LOG_WARN("commit ddl log failed", K(ret), K(start_scn), K(prepare_scn), K(ls_id_), K(tablet_id_));
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
|
@ -39,9 +39,9 @@ public:
|
||||
~ObTabletDDLKvMgr();
|
||||
int init(const share::ObLSID &ls_id, const common::ObTabletID &tablet_id); // init before memtable mgr
|
||||
int ddl_start(const ObITable::TableKey &table_key, const share::SCN &start_scn, const int64_t cluster_version, const int64_t execution_id, const share::SCN &checkpoint_scn);
|
||||
int ddl_prepare(const share::SCN &start_scn, const share::SCN &commit_scn, const ObIArray<uint64_t> &column_ids, const uint64_t table_id = 0, const int64_t ddl_task_id = 0); // schedule build a major sstable
|
||||
int ddl_commit(const share::SCN &start_scn, const share::SCN &prepare_scn, const bool is_replay, const uint64_t table_id, const int64_t ddl_task_id, const ObIArray<uint64_t> &column_ids); // try wait build major sstable
|
||||
int wait_ddl_commit(const share::SCN &start_scn, const share::SCN &prepare_scn, const uint64_t table_id, const int64_t ddl_task_id, const ObIArray<uint64_t> &column_ids);
|
||||
int ddl_prepare(const share::SCN &start_scn, const share::SCN &commit_scn, const uint64_t table_id = 0, const int64_t ddl_task_id = 0); // schedule build a major sstable
|
||||
int ddl_commit(const share::SCN &start_scn, const share::SCN &prepare_scn, const bool is_replay); // try wait build major sstable
|
||||
int wait_ddl_commit(const share::SCN &start_scn, const share::SCN &prepare_scn);
|
||||
int get_ddl_param(ObTabletDDLParam &ddl_param);
|
||||
int get_or_create_ddl_kv(const share::SCN &scn, ObDDLKVHandle &kv_handle); // used in active ddl kv guard
|
||||
int get_freezed_ddl_kv(const share::SCN &freeze_scn, ObDDLKVHandle &kv_handle); // locate ddl kv with exeact freeze log ts
|
||||
|
@ -531,6 +531,8 @@ int ObTablet::check_sstable_column_checksum() const
|
||||
if (OB_ISNULL(cur)) {
|
||||
ret = OB_ERR_NULL_VALUE;
|
||||
LOG_WARN("invalid null sstable", K(ret), K(i), KP(cur), KPC(this));
|
||||
} else if (cur->is_major_sstable() && cur->get_meta().is_empty()) {
|
||||
// since empty major sstable may have wrong column count, skip for compatibility from 4.0 to 4.1
|
||||
} else if ((sstable_col_cnt = cur->get_meta().get_col_checksum().count()) > schema_col_cnt) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("The storage schema is older than the sstable, and cann’t explain the data.",
|
||||
|
Loading…
x
Reference in New Issue
Block a user