[Fix] commit ddl column checksum in follow replicas

This commit is contained in:
obdev 2022-12-24 04:38:02 +00:00 committed by ob-robot
parent caead7fcaa
commit d1c6e3e1d1
21 changed files with 486 additions and 161 deletions

View File

@ -1922,6 +1922,7 @@ int ObRpcRemoteWriteDDLPrepareLogP::process()
ObDDLSSTableRedoWriter sstable_redo_writer;
ObLSService *ls_service = MTL(ObLSService*);
ObLSHandle ls_handle;
ObArray<uint64_t> column_ids;
ObTabletHandle tablet_handle;
ObDDLKvMgrHandle ddl_kv_mgr_handle;
if (OB_UNLIKELY(!arg_.is_valid())) {
@ -1936,16 +1937,20 @@ int ObRpcRemoteWriteDDLPrepareLogP::process()
} else if (OB_FAIL(sstable_redo_writer.init(arg_.ls_id_, table_key.tablet_id_))) {
LOG_WARN("init sstable redo writer", K(ret), K(table_key));
} else if (FALSE_IT(sstable_redo_writer.set_start_scn(arg_.start_scn_))) {
} else if (OB_FAIL(ObDDLUtil::get_tenant_schema_column_ids(tenant_id, arg_.table_id_, column_ids))) {
LOG_WARN("fail to get tenant schema column ids", K(ret), K(arg_));
} else {
SCN prepare_scn;
if (OB_FAIL(sstable_redo_writer.write_prepare_log(table_key,
arg_.table_id_,
arg_.execution_id_,
arg_.ddl_task_id_,
prepare_scn))) {
prepare_scn,
column_ids))) {
LOG_WARN("fail to remote write commit log", K(ret), K(table_key), K_(arg));
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_prepare(arg_.start_scn_,
prepare_scn,
column_ids,
arg_.table_id_,
arg_.ddl_task_id_))) {
LOG_WARN("failed to do ddl kv prepare", K(ret), K(arg_));
@ -1967,6 +1972,7 @@ int ObRpcRemoteWriteDDLCommitLogP::process()
ObDDLSSTableRedoWriter sstable_redo_writer;
ObLSService *ls_service = MTL(ObLSService*);
ObLSHandle ls_handle;
ObArray<uint64_t> column_ids;
ObTabletHandle tablet_handle;
ObDDLKvMgrHandle ddl_kv_mgr_handle;
if (OB_UNLIKELY(!arg_.is_valid())) {
@ -1981,11 +1987,15 @@ int ObRpcRemoteWriteDDLCommitLogP::process()
} else if (OB_FAIL(sstable_redo_writer.init(arg_.ls_id_, table_key.tablet_id_))) {
LOG_WARN("init sstable redo writer", K(ret), K(table_key));
} else if (FALSE_IT(sstable_redo_writer.set_start_scn(arg_.start_scn_))) {
} else if (OB_FAIL(ObDDLUtil::get_tenant_schema_column_ids(tenant_id, arg_.table_id_, column_ids))) {
LOG_WARN("fail to get tenant schema column ids", K(ret), K(arg_));
} else {
// wait in rpc framework may cause rpc timeout, need sync commit via rs @xiajin
if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_commit(arg_.start_scn_, arg_.prepare_scn_))) {
if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_commit(
arg_.start_scn_, arg_.prepare_scn_, arg_.table_id_, arg_.ddl_task_id_, column_ids))) {
LOG_WARN("failed to wait ddl kv commit", K(ret), K(arg_));
} else if (OB_FAIL(sstable_redo_writer.write_commit_log(table_key, arg_.prepare_scn_))) {
} else if (OB_FAIL(sstable_redo_writer.write_commit_log(
table_key, arg_.prepare_scn_, arg_.table_id_, arg_.ddl_task_id_, column_ids))) {
LOG_WARN("fail to remote write commit log", K(ret), K(table_key), K_(arg));
}
}

View File

@ -106,6 +106,8 @@ int ObDDLRedefinitionSSTableBuildTask::process()
} else {
(void)ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(tenant_id_,
dest_table_id_,
task_id_,
execution_id_,
inner_sql_exec_addr_,
trace_id_,
schema_version_,

View File

@ -82,6 +82,8 @@ int ObIndexSSTableBuildTask::process()
LOG_WARN("error unexpected, table schema must not be nullptr", K(ret));
} else {
(void)ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(tenant_id_, dest_table_id_,
task_id_,
execution_id_,
inner_sql_exec_addr_,
trace_id_,
table_schema->get_schema_version(),

View File

@ -94,7 +94,7 @@ int ObTableRedefinitionTask::init(const ObDDLTaskRecord &task_record)
} else if (OB_FAIL(set_ddl_stmt_str(task_record.ddl_stmt_str_))) {
LOG_WARN("set ddl stmt str failed", K(ret));
} else {
set_sql_exec_addr(alter_table_arg_.inner_sql_exec_addr_); // set to switch_status, if task cancel, we should kill session with inner_sql_exec_addr_
set_sql_exec_addr(alter_table_arg_.inner_sql_exec_addr_); // set to ddl_task, switch_status, if task cancel, we should kill session with inner_sql_exec_addr_
task_id_ = task_record.task_id_;
task_type_ = task_record.ddl_type_;
object_id_ = data_table_id;

View File

@ -205,6 +205,74 @@ int ObDDLChecksumOperator::get_column_checksum(const ObSqlString &sql, const uin
return ret;
}
int ObDDLChecksumOperator::get_tablet_checksum_status(
const ObSqlString &sql,
const uint64_t tenant_id,
common::ObMySQLProxy &sql_proxy,
common::hash::ObHashMap<uint64_t, bool> &tablet_checksum_map)
{
int ret = OB_SUCCESS;
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
sqlclient::ObMySQLResult *result = NULL;
if (!sql.is_valid() || !tablet_checksum_map.created()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(sql), K(tablet_checksum_map.created()));
} else if (OB_FAIL(sql_proxy.read(res, tenant_id, sql.ptr()))) {
LOG_WARN("fail to execute sql", K(ret));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("error unexpected, query result must not be NULL", K(ret));
} else {
while (OB_SUCC(ret)) {
if (OB_FAIL(result->next())) {
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
break;
} else {
LOG_WARN("fail to get next row", K(ret));
}
} else {
uint64_t tablet_id_id = 0;
EXTRACT_UINT_FIELD_MYSQL(*result, "task_id", tablet_id_id, uint64_t);
if (OB_FAIL(tablet_checksum_map.set_refactored(tablet_id_id, true))) {
LOG_WARN("fail to set column checksum to map", K(ret));
}
}
}
}
}
return ret;
}
int ObDDLChecksumOperator::get_tablet_checksum_record(
const uint64_t tenant_id,
const uint64_t execution_id,
const uint64_t table_id,
const int64_t ddl_task_id,
ObMySQLProxy &sql_proxy,
common::hash::ObHashMap<uint64_t, bool> &tablet_checksum_map)
{
int ret = OB_SUCCESS;
ObSqlString sql;
const uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id);
if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == execution_id || OB_INVALID_ID == table_id
|| OB_INVALID_ID == ddl_task_id || !tablet_checksum_map.created())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(execution_id), K(table_id), K(ddl_task_id),
K(tablet_checksum_map.created()));
} else if (OB_FAIL(sql.assign_fmt(
"SELECT task_id FROM %s "
"WHERE execution_id = %ld AND tenant_id = %ld AND table_id = %ld AND ddl_task_id = %ld "
"ORDER BY task_id", OB_ALL_DDL_CHECKSUM_TNAME,
execution_id, ObSchemaUtils::get_extract_tenant_id(exec_tenant_id, tenant_id),
ObSchemaUtils::get_extract_schema_id(exec_tenant_id, table_id), ddl_task_id))) {
LOG_WARN("fail to assign fmt", K(ret));
} else if (OB_FAIL(get_tablet_checksum_status(sql, tenant_id, sql_proxy, tablet_checksum_map))) {
LOG_WARN("fail to get column checksum", K(ret), K(sql));
}
return ret;
}
int ObDDLChecksumOperator::get_table_column_checksum(
const uint64_t tenant_id,
const int64_t execution_id,

View File

@ -70,6 +70,13 @@ public:
const uint64_t table_id,
const int64_t ddl_task_id,
common::hash::ObHashMap<int64_t, int64_t> &column_checksums, common::ObMySQLProxy &sql_proxy);
static int get_tablet_checksum_record(
const uint64_t tenant_id,
const uint64_t execution_id,
const uint64_t table_id,
const int64_t ddl_task_id,
ObMySQLProxy &sql_proxy,
common::hash::ObHashMap<uint64_t, bool> &tablet_checksum_map);
static int check_column_checksum(
const uint64_t tenant_id,
const int64_t execution_id,
@ -93,6 +100,11 @@ private:
const uint64_t tenant_id,
common::hash::ObHashMap<int64_t, int64_t> &column_checksum_map,
common::ObMySQLProxy &sql_proxy);
static int get_tablet_checksum_status(
const ObSqlString &sql,
const uint64_t tenant_id,
common::ObMySQLProxy &sql_proxy,
common::hash::ObHashMap<uint64_t, bool> &tablet_checksum_map);
};
} // end namespace share

View File

@ -954,6 +954,48 @@ int ObDDLUtil::check_table_exist(
return ret;
}
int ObDDLUtil::get_tenant_schema_column_ids(
const uint64_t tenant_id,
const uint64_t table_id,
ObIArray<uint64_t> &column_ids)
{
int ret = OB_SUCCESS;
ObMultiVersionSchemaService *schema_service = GCTX.schema_service_;
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = nullptr;
ObArray<ObColDesc> column_desc;
column_ids.reset(); // make clear
if (OB_UNLIKELY(!is_valid_id(table_id) || 0 == table_id || !is_valid_tenant_id(tenant_id))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(table_id), K(tenant_id));
} else if (OB_ISNULL(schema_service)) {
ret = OB_ERR_SYS;
LOG_WARN("schema service is null pointer", K(ret), K(tenant_id), KP(schema_service));
} else if (OB_FAIL(schema_service->get_tenant_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("get tenant schema guard failed", K(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
LOG_WARN("get table schema failed", K(ret), K(tenant_id), K(table_id));
} else if (OB_ISNULL(table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_INFO("table not exit", K(ret), K(tenant_id), K(table_id));
} else if (OB_FAIL(table_schema->get_multi_version_column_descs(column_desc))) {
LOG_WARN("fail to get column ids", K(ret), K(ls_id), K(tenant_id), K(table_id));
} else if (column_desc.count() <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("columns desc should not be zero", K(ret), K(tenant_id), K(table_id), K(column_desc));
} else {
int64_t column_idx = 0;
for (column_idx = 0; OB_SUCC(ret) && column_idx < column_desc.count(); ++column_idx) {
if (OB_FAIL(column_ids.push_back(column_desc.at(column_idx).col_id_))) {
LOG_WARN("fail to get columns ids", K(ret), K(tenant_id), K(table_id), K(column_desc));
}
}
}
return ret;
}
int64_t ObDDLUtil::get_ddl_rpc_timeout()
{
return max(GCONF.rpc_timeout, 9 * 1000 * 1000L);
@ -1253,60 +1295,60 @@ int ObCheckTabletDataComplementOp::do_check_tablets_merge_status(
}
}
}
// handle batch result
int tmp_ret = OB_SUCCESS;
common::ObArray<int> return_ret_array;
if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_ret_array))) {
LOG_WARN("rpc proxy wait failed", K(tmp_ret));
ret = OB_SUCCESS == ret ? tmp_ret : ret;
} else if (return_ret_array.count() != ip_tablets_map.size()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("rpc proxy rsp size not equal to send size", K(ret),
K(return_ret_array.count()), K(ip_tablets_map.size()));
} else {
const ObIArray<const obrpc::ObDDLCheckTabletMergeStatusResult *> &result_array = proxy.get_results();
// 1. handle every ip addr result
for (int64_t i = 0; OB_SUCC(ret) && i < result_array.count(); i++) {
int return_ret = return_ret_array.at(i); // check return ret code
if (OB_SUCCESS == return_ret) {
const obrpc::ObDDLCheckTabletMergeStatusResult *cur_result = nullptr; // ip tablets status result
common::ObSArray<bool> tablet_rsp_array;
common::ObArray<ObTabletID> tablet_req_array;
const common::ObAddr &tablet_addr = proxy.get_dests().at(i); // get rpc dest addr
if (OB_SUCC(ret)) { // handle batch result
int tmp_ret = OB_SUCCESS;
common::ObArray<int> return_ret_array;
if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_ret_array))) {
LOG_WARN("rpc proxy wait failed", K(tmp_ret));
ret = OB_SUCCESS == ret ? tmp_ret : ret;
} else if (return_ret_array.count() != ip_tablets_map.size()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("rpc proxy rsp size not equal to send size", K(ret), K(return_ret_array.count()), K(ip_tablets_map.size()));
} else {
const ObIArray<const obrpc::ObDDLCheckTabletMergeStatusResult *> &result_array = proxy.get_results();
// 1. handle every ip addr result
for (int64_t i = 0; OB_SUCC(ret) && i < result_array.count(); i++) {
int return_ret = return_ret_array.at(i); // check return ret code
if (OB_SUCCESS == return_ret) {
const obrpc::ObDDLCheckTabletMergeStatusResult *cur_result = nullptr; // ip tablets status result
common::ObSArray<bool> tablet_rsp_array;
common::ObArray<ObTabletID> tablet_req_array;
const common::ObAddr &tablet_addr = proxy.get_dests().at(i); // get rpc dest addr
if (OB_ISNULL(cur_result = result_array.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("merge status result is null.", K(ret), K(cur_result));
} else if (FALSE_IT(tablet_rsp_array = cur_result->merge_status_)) {
} else if (OB_FAIL(ip_tablets_map.get_refactored(tablet_addr, tablet_req_array))) {
LOG_WARN("get from ip tablet map fail.", K(ret), K(tablet_addr));
} else if (tablet_req_array.count() != tablet_rsp_array.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet req count is not equal to tablet rsp count", K(ret), K(tablet_req_array), K(tablet_rsp_array));
} else {
// 2. handle every tablet status
for (int64_t idx = 0; OB_SUCC(ret) && idx < tablet_rsp_array.count(); ++idx) {
const common::ObTabletID &tablet_id = tablet_req_array.at(idx); // tablet id
const bool tablet_status = tablet_rsp_array.at(idx);
if (OB_FAIL(update_replica_merge_status(tablet_id, tablet_status, tablets_commited_map))) { // update tablet merge status from get
LOG_WARN("fail to update replica merge status", K(ret), K(tablet_id), K(tablet_addr));
} else {
LOG_INFO("succ to update replica merge status", K(tablet_addr), K(tablet_id), K(tablet_status));
if (OB_ISNULL(cur_result = result_array.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("merge status result is null.", K(ret), K(cur_result));
} else if (FALSE_IT(tablet_rsp_array = cur_result->merge_status_)) {
} else if (OB_FAIL(ip_tablets_map.get_refactored(tablet_addr, tablet_req_array))) {
LOG_WARN("get from ip tablet map fail.", K(ret), K(tablet_addr));
} else if (tablet_req_array.count() != tablet_rsp_array.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet req count is not equal to tablet rsp count", K(ret), K(tablet_req_array), K(tablet_rsp_array));
} else {
// 2. handle every tablet status
for (int64_t idx = 0; OB_SUCC(ret) && idx < tablet_rsp_array.count(); ++idx) {
const common::ObTabletID &tablet_id = tablet_req_array.at(idx); // tablet id
const bool tablet_status = tablet_rsp_array.at(idx);
if (OB_FAIL(update_replica_merge_status(tablet_id, tablet_status, tablets_commited_map))) { // update tablet merge status from get
LOG_WARN("fail to update replica merge status", K(ret), K(tablet_id), K(tablet_addr));
} else {
LOG_INFO("succ to update replica merge status", K(tablet_addr), K(tablet_id), K(tablet_status));
}
}
}
} else {
LOG_WARN("rpc proxy return fail.", K(return_ret));
}
} else {
LOG_WARN("rpc proxy return fail.", K(return_ret));
}
}
// 3. check any commit tablet
if (OB_SUCC(ret)) {
int64_t build_succ_count = 0;
if (OB_FAIL(calculate_build_finish(tenant_id, tablet_ids, tablets_commited_map, build_succ_count))) {
LOG_WARN("check and commit tbalets commit log fail.", K(ret), K(tablet_ids), K(build_succ_count));
} else {
DEBUG_SYNC(DDL_CHECK_TABLET_MERGE_STATUS);
tablet_build_succ_count += build_succ_count;
// 3. check any commit tablet
if (OB_SUCC(ret)) {
int64_t build_succ_count = 0;
if (OB_FAIL(calculate_build_finish(tenant_id, tablet_ids, tablets_commited_map, build_succ_count))) {
LOG_WARN("check and commit tbalets commit log fail.", K(ret), K(tablet_ids), K(build_succ_count));
} else {
DEBUG_SYNC(DDL_CHECK_TABLET_MERGE_STATUS);
tablet_build_succ_count += build_succ_count;
}
}
}
}
@ -1314,7 +1356,6 @@ int ObCheckTabletDataComplementOp::do_check_tablets_merge_status(
return ret;
}
int ObCheckTabletDataComplementOp::check_tablet_merge_status(
const uint64_t tenant_id,
const ObIArray<common::ObTabletID> &tablet_ids,
@ -1369,11 +1410,12 @@ int ObCheckTabletDataComplementOp::check_tablet_merge_status(
}
}
}
if (total_build_succ_count == tablet_ids.count()) {
int64_t total_tablets_count = tablet_ids.count();
if (total_build_succ_count == total_tablets_count) {
is_all_tablets_commited = true;
LOG_INFO("all tablet finished create sstables", K(ret), K(total_tablets_count), K(total_build_succ_count));
} else {
int64_t total_tablets_count = tablet_ids.count();
LOG_WARN("not all tablets finished create sstables", K(ret), K(total_build_succ_count), K(total_tablets_count));
LOG_WARN("not all tablets finished create sstables", K(ret), K(total_tablets_count), K(total_build_succ_count));
}
}
@ -1384,6 +1426,62 @@ int ObCheckTabletDataComplementOp::check_tablet_merge_status(
return ret;
}
int ObCheckTabletDataComplementOp::check_tablet_checksum_update_status(
const uint64_t tenant_id,
const uint64_t index_table_id,
const uint64_t ddl_task_id,
const int64_t execution_id,
ObIArray<ObTabletID> &tablet_ids,
bool &tablet_checksum_status)
{
int ret = OB_SUCCESS;
tablet_checksum_status = false;
common::hash::ObHashMap<uint64_t, bool> tablet_checksum_map;
int64_t tablet_count = tablet_ids.count();
if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == index_table_id ||
execution_id < 0 || tablet_count <= 0 || ddl_task_id == OB_INVALID_ID)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("fail to check and wait complement task",
K(ret), K(tenant_id), K(index_table_id), K(tablet_ids), K(execution_id), K(ddl_task_id));
} else if (OB_FAIL(tablet_checksum_map.create(tablet_count, ObModIds::OB_SSTABLE_CREATE_INDEX))) {
LOG_WARN("fail to create column checksum map", K(ret));
} else if (OB_FAIL(ObDDLChecksumOperator::get_tablet_checksum_record(
tenant_id,
execution_id,
index_table_id,
ddl_task_id,
GCTX.root_service_->get_sql_proxy(),
tablet_checksum_map))) {
LOG_WARN("fail to get tablet checksum status",
K(ret), K(tenant_id), K(execution_id), K(index_table_id), K(ddl_task_id));
} else {
int tablet_idx = 0;
for (tablet_idx = 0; OB_SUCC(ret) && tablet_idx < tablet_count; ++tablet_idx) {
const ObTabletID &tablet_id = tablet_ids.at(tablet_idx);
uint64_t tablet_id_id = tablet_id.id();
bool status = false;
if (OB_FAIL(tablet_checksum_map.get_refactored(tablet_id_id, status))) {
LOG_WARN("fail to get tablet checksum record from map", K(ret), K(tablet_id_id));
} else if (!status) {
break;
}
}
if (OB_SUCC(ret)) {
if (tablet_idx == tablet_count) {
tablet_checksum_status = true;
} else {
ret = OB_EAGAIN;
LOG_INFO("not all tablet has update checksum, will re-check", K(ret), K(tablet_idx), K(tablet_count));
}
}
}
if (tablet_checksum_map.created()) {
tablet_checksum_map.destroy();
}
return ret;
}
/*
* 1. get a batch of tablets and construct a tmp ls_tablet_map
* 2. get tablet_ip_map and send async batch rpc and get results
@ -1394,20 +1492,28 @@ int ObCheckTabletDataComplementOp::check_all_tablet_sstable_status(
const uint64_t tenant_id,
const uint64_t index_table_id,
const int64_t snapshot_version,
const int64_t execution_id,
const uint64_t ddl_task_id,
bool &is_all_sstable_build_finished)
{
int ret = OB_SUCCESS;
ObArray<ObTabletID> dest_tablet_ids;
bool tablet_checksum_status = false;
is_all_sstable_build_finished = false;
if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == index_table_id || OB_INVALID_TIMESTAMP == snapshot_version)) {
if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == index_table_id || OB_INVALID_TIMESTAMP == snapshot_version ||
ddl_task_id == OB_INVALID_ID || execution_id < 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("fail to check and wait complement task", K(ret), K(tenant_id), K(index_table_id), K(snapshot_version));
LOG_WARN("fail to check and wait complement task", K(ret), K(tenant_id), K(index_table_id), K(snapshot_version), K(execution_id), K(ddl_task_id));
} else if (OB_FAIL(ObDDLUtil::get_tablets(tenant_id, index_table_id, dest_tablet_ids))) {
LOG_WARN("fail to get tablets", K(ret), K(tenant_id), K(index_table_id));
} else if (OB_FAIL(check_tablet_merge_status(tenant_id, dest_tablet_ids, snapshot_version, is_all_sstable_build_finished))){
LOG_WARN("fail to check tablet merge status.", K(ret), K(tenant_id), K(dest_tablet_ids), K(snapshot_version));
} else if (OB_FAIL(is_all_sstable_build_finished
&& check_tablet_checksum_update_status(tenant_id, table_id, ddl_task_id, execution_id, dest_tablet_ids, tablet_checksum_status))) {
LOG_WARN("fail to check tablet checksum update status.", K(ret), K(tenant_id), K(dest_tablet_ids), K(execution_id));
} else {
is_all_sstable_build_finished &= tablet_checksum_status;
}
return ret;
}
@ -1422,6 +1528,8 @@ int ObCheckTabletDataComplementOp::check_all_tablet_sstable_status(
int ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(
const uint64_t tenant_id,
const uint64_t table_id,
const uint64_t ddl_task_id,
const int64_t execution_id,
const common::ObAddr &inner_sql_exec_addr,
const common::ObCurTraceId::TraceId &trace_id,
const int64_t schema_version,
@ -1440,20 +1548,12 @@ int ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(
} else {
LOG_INFO("start to check and wait complement task", K(tenant_id), K(table_id), K(inner_sql_exec_addr), K(trace_id));
do {
if (OB_FAIL(check_all_tablet_sstable_status(tenant_id,
table_id,
scn,
is_all_sstable_build_finished))) {
LOG_WARN("fail to check task tablet sstable status", K(ret), K(tenant_id), K(table_id), K(scn));
if (OB_FAIL(check_all_tablet_sstable_status(tenant_id, table_id, scn, execution_id, ddl_task_id, is_all_sstable_build_finished))) {
LOG_WARN("fail to check task tablet sstable status", K(ret), K(tenant_id), K(table_id), K(scn), K(execution_id), K(ddl_task_id));
} else if (is_all_sstable_build_finished) {
LOG_INFO("all tablet sstable has build finished");
} else {
if (OB_FAIL(check_task_inner_sql_session_status(inner_sql_exec_addr,
trace_id,
tenant_id,
schema_version,
scn,
is_old_task_session_exist))) {
if (OB_FAIL(check_task_inner_sql_session_status(inner_sql_exec_addr, trace_id, tenant_id, schema_version, scn, is_old_task_session_exist))) {
LOG_WARN("fail check task inner sql session status", K(ret), K(trace_id), K(inner_sql_exec_addr));
} else if (!is_old_task_session_exist) {
LOG_WARN("old inner sql session is not exist.", K(ret));

View File

@ -274,6 +274,10 @@ public:
const ObTabletID &tablet_id,
storage::ObTabletHandle &tablet_handle,
const int64_t timeout_us = storage::ObTabletCommon::DEFAULT_GET_TABLET_TIMEOUT_US);
static int get_tenant_schema_column_ids(
const uint64_t tenant_id,
const uint64_t table_id,
ObIArray<uint64_t> &column_ids);
static int clear_ddl_checksum(sql::ObPhysicalPlan *phy_plan);
@ -345,6 +349,8 @@ public:
static int check_and_wait_old_complement_task(
const uint64_t tenant_id,
const uint64_t index_table_id,
const uint64_t ddl_task_id,
const int64_t execution_id,
const common::ObAddr &inner_sql_exec_addr,
const common::ObCurTraceId::TraceId &trace_id,
const int64_t schema_version,
@ -357,6 +363,8 @@ private:
const uint64_t tenant_id,
const uint64_t index_table_id,
const int64_t snapshot_version,
const int64_t execution_id,
const uint64_t ddl_task_id,
bool &is_all_sstable_build_finished);
static int check_task_inner_sql_session_status(
@ -404,6 +412,14 @@ private:
const ObTabletID &tablet_id,
hash::ObHashMap<ObAddr, ObArray<ObTabletID>> &ip_tablets_map);
static int check_tablet_checksum_update_status(
const uint64_t tenant_id,
const uint64_t index_table_id,
const uint64_t ddl_task_id,
const int64_t execution_id,
ObIArray<ObTabletID> &tablet_ids,
bool &tablet_checksum_status);
};

View File

@ -6741,31 +6741,36 @@ OB_SERIALIZE_MEMBER(ObRpcRemoteWriteDDLPrepareLogArg, tenant_id_, ls_id_, table_
table_id_, execution_id_, ddl_task_id_);
ObRpcRemoteWriteDDLCommitLogArg::ObRpcRemoteWriteDDLCommitLogArg()
: tenant_id_(OB_INVALID_ID), ls_id_(), table_key_(), start_scn_(SCN::min_scn()), prepare_scn_(SCN::min_scn())
: tenant_id_(OB_INVALID_ID), ls_id_(), table_key_(), start_scn_(SCN::min_scn()), prepare_scn_(SCN::min_scn()),
table_id_(0), ddl_task_id_(0)
{}
int ObRpcRemoteWriteDDLCommitLogArg::init(const uint64_t tenant_id,
const share::ObLSID &ls_id,
const storage::ObITable::TableKey &table_key,
const SCN &start_scn,
const SCN &prepare_scn)
const SCN &prepare_scn,
const uint64_t table_id,
const int64_t ddl_task_id)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(tenant_id == OB_INVALID_ID || !ls_id.is_valid() || !table_key.is_valid() || !start_scn.is_valid_and_not_min()
|| !prepare_scn.is_valid_and_not_min())) {
|| !prepare_scn.is_valid_and_not_min() || table_id <= 0 || ddl_task_id <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("tablet id is not valid", K(ret), K(tenant_id), K(ls_id), K(table_key), K(start_scn), K(prepare_scn));
LOG_WARN("tablet id is not valid", K(ret), K(tenant_id), K(ls_id), K(table_key), K(start_scn), K(prepare_scn), K(table_id), K(ddl_task_id));
} else {
tenant_id_ = tenant_id;
ls_id_ = ls_id;
table_key_ = table_key;
start_scn_ = start_scn;
prepare_scn_ = prepare_scn;
table_id_ = table_id;
ddl_task_id_ = ddl_task_id;
}
return ret;
}
OB_SERIALIZE_MEMBER(ObRpcRemoteWriteDDLCommitLogArg, tenant_id_, ls_id_, table_key_, start_scn_, prepare_scn_);
OB_SERIALIZE_MEMBER(ObRpcRemoteWriteDDLCommitLogArg, tenant_id_, ls_id_, table_key_, start_scn_, prepare_scn_, table_id_, ddl_task_id_);
bool ObCheckLSCanOfflineArg::is_valid() const
{

View File

@ -7555,19 +7555,23 @@ public:
const share::ObLSID &ls_id,
const storage::ObITable::TableKey &table_key,
const share::SCN &start_scn,
const share::SCN &prepare_scn);
const share::SCN &prepare_scn,
const uint64_t table_id,
const int64_t ddl_task_id);
bool is_valid() const
{
return tenant_id_ != OB_INVALID_ID && ls_id_.is_valid() && table_key_.is_valid() && start_scn_.is_valid_and_not_min()
&& prepare_scn_.is_valid_and_not_min();
&& prepare_scn_.is_valid_and_not_min() && table_id_ > 0 && ddl_task_id_ > 0;
}
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(table_key), K_(start_scn), K_(prepare_scn));
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(table_key), K_(start_scn), K_(prepare_scn), K_(table_id), K_(ddl_task_id));
public:
uint64_t tenant_id_;
share::ObLSID ls_id_;
storage::ObITable::TableKey table_key_;
share::SCN start_scn_;
share::SCN prepare_scn_;
uint64_t table_id_;
int64_t ddl_task_id_;
private:
DISALLOW_COPY_AND_ASSIGN(ObRpcRemoteWriteDDLCommitLogArg);
};

View File

@ -1143,6 +1143,7 @@ int ObComplementMergeTask::process()
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
int tmp_ret = OB_SUCCESS;
ObIDag *tmp_dag = get_dag();
ObArray<uint64_t> column_ids;
ObComplementDataDag *dag = nullptr;
ObTabletHandle tablet_handle;
ObTablet *tablet = nullptr;
@ -1161,12 +1162,17 @@ int ObComplementMergeTask::process()
} else if (OB_ISNULL(latest_major_sstable)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error, major sstable shoud not be null", K(ret), K(*param_));
} else if (OB_FAIL(ObDDLUtil::get_tenant_schema_column_ids(param_->hidden_table_schema_->get_tenant_id(),
param_->hidden_table_schema_->get_table_id(),
column_ids))) {
LOG_WARN("fail to get tenant schema column ids", K(ret), K(*param_));
} else if (OB_FAIL(ObTabletDDLUtil::report_ddl_checksum(param_->ls_id_,
param_->dest_tablet_id_,
param_->hidden_table_schema_->get_table_id(),
1 /* execution_id */,
param_->task_id_,
latest_major_sstable->get_meta().get_col_checksum()))) {
latest_major_sstable->get_meta().get_col_checksum(),
column_ids))) {
LOG_WARN("report ddl column checksum failed", K(ret), K(*param_));
} else if (OB_FAIL(GCTX.ob_service_->submit_tablet_update_task(param_->tenant_id_, param_->ls_id_, param_->dest_tablet_id_))) {
LOG_WARN("fail to submit tablet update task", K(ret), K(*param_));
@ -1194,6 +1200,7 @@ int ObComplementMergeTask::add_build_hidden_table_sstable()
ObTablet *tablet = nullptr;
ObTabletHandle tablet_handle;
ObITable::TableKey hidden_table_key;
ObArray<uint64_t> column_ids;
SCN prepare_scn;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
@ -1218,11 +1225,14 @@ int ObComplementMergeTask::add_build_hidden_table_sstable()
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ObDDLUtil::get_tenant_schema_column_ids(MTL_ID(), param_->hidden_table_schema_->get_table_id(), column_ids))) {
LOG_WARN("fail to get tenant schema column ids", K(ret), K(param_));
} else if (OB_FAIL(context_->data_sstable_redo_writer_.write_prepare_log(hidden_table_key,
param_->hidden_table_schema_->get_table_id(),
param_->execution_id_,
param_->task_id_,
prepare_scn))) {
prepare_scn,
column_ids))) {
if (OB_TASK_EXPIRED == ret) {
LOG_INFO("ddl task expired", K(ret), K(hidden_table_key), KPC(param_));
} else {
@ -1234,20 +1244,26 @@ int ObComplementMergeTask::add_build_hidden_table_sstable()
const ObLSID &ls_id = param_->ls_id_;
const ObTabletID &tablet_id = tablet->get_tablet_meta().tablet_id_;
const SCN &ddl_start_scn = static_cast<ObComplementDataDag *>(get_dag())->get_context().data_sstable_redo_writer_.get_start_scn();
const uint64_t table_id = param_->hidden_table_schema_->get_table_id();
const int64_t ddl_task_id = param_->task_id_;
if (OB_FAIL(tablet->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
LOG_WARN("get ddl kv manager failed", K(ret));
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_prepare(ddl_start_scn,
prepare_scn,
param_->hidden_table_schema_->get_table_id(),
param_->task_id_))) {
LOG_WARN("commit ddl log failed", K(ret), K(ls_id), K(tablet_id), K(prepare_scn), K(hidden_table_key),
column_ids,
table_id,
ddl_task_id))) {
LOG_WARN("commit ddl log failed", K(ret), K(ls_id), K(tablet_id), K(prepare_scn), K(hidden_table_key), K(column_ids),
K(ddl_start_scn), "new_ddl_start_scn", ddl_kv_mgr_handle.get_obj()->get_start_scn());
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_commit(ddl_start_scn, prepare_scn))) {
LOG_WARN("wait ddl commit failed", K(ret), K(ls_id), K(tablet_id), K(hidden_table_key),
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_commit(ddl_start_scn, prepare_scn, table_id, ddl_task_id, column_ids))) {
LOG_WARN("wait ddl commit failed", K(ret), K(ls_id), K(tablet_id), K(hidden_table_key), K(column_ids),
K(ddl_start_scn), "new_ddl_start_scn", ddl_kv_mgr_handle.get_obj()->get_start_scn());
} else if (OB_FAIL(context_->data_sstable_redo_writer_.write_commit_log(hidden_table_key,
prepare_scn))) {
LOG_WARN("fail write ddl commit log", K(ret), K(hidden_table_key));
prepare_scn,
table_id,
ddl_task_id,
column_ids))) {
LOG_WARN("fail write ddl commit log", K(ret), K(hidden_table_key), K(column_ids));
}
}
return ret;

View File

@ -305,48 +305,67 @@ int ObDDLRedoLog::init(const blocksstable::ObDDLMacroBlockRedoInfo &redo_info)
OB_SERIALIZE_MEMBER(ObDDLRedoLog, redo_info_);
ObDDLPrepareLog::ObDDLPrepareLog()
: table_key_(), start_scn_(SCN::min_scn())
: table_key_(), start_scn_(SCN::min_scn()), table_id_(0), ddl_task_id_(0), column_ids_()
{
}
int ObDDLPrepareLog::init(const ObITable::TableKey &table_key,
const SCN &start_scn)
const SCN &start_scn,
const uint64_t table_id,
const int64_t ddl_task_id,
const ObIArray<uint64_t> &column_ids)
{
int ret = OB_SUCCESS;
if (!table_key.is_valid() || !start_scn.is_valid_and_not_min()) {
if (!table_key.is_valid() || !start_scn.is_valid_and_not_min() || table_id <= 0 || ddl_task_id <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(table_key), K(start_scn));
LOG_WARN("invalid argument", K(ret), K(table_key), K(start_scn), K(table_id), K(ddl_task_id));
} else if (OB_FAIL(column_ids_.assign(column_ids))) {
LOG_WARN("columns ids assigned fail.", K(ret), K(column_ids));
} else {
table_key_ = table_key;
start_scn_ = start_scn;
table_id_ = table_id;
ddl_task_id_ = ddl_task_id;
}
return ret;
}
OB_SERIALIZE_MEMBER(ObDDLPrepareLog, table_key_, start_scn_);
OB_SERIALIZE_MEMBER(ObDDLPrepareLog, table_key_, start_scn_, table_id_, ddl_task_id_, column_ids_);
ObDDLCommitLog::ObDDLCommitLog()
: table_key_(), start_scn_(SCN::min_scn()), prepare_scn_(SCN::min_scn())
: table_key_(), start_scn_(SCN::min_scn()), prepare_scn_(SCN::min_scn()),
table_id_(0), ddl_task_id_(0), column_ids_()
{
}
int ObDDLCommitLog::init(const ObITable::TableKey &table_key,
const SCN &start_scn,
const SCN &prepare_scn)
const SCN &prepare_scn,
const uint64_t table_id,
const int64_t ddl_task_id,
const ObIArray<uint64_t> &column_ids)
{
int ret = OB_SUCCESS;
if (!table_key.is_valid() || !start_scn.is_valid_and_not_min() || !prepare_scn.is_valid_and_not_min()) {
if (!table_key.is_valid() ||
!start_scn.is_valid_and_not_min() ||
!prepare_scn.is_valid_and_not_min() ||
table_id <= 0 ||
ddl_task_id <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(table_key), K(start_scn), K(prepare_scn));
LOG_WARN("invalid argument", K(ret), K(table_key), K(start_scn), K(prepare_scn), K(table_id), K(ddl_task_id));
} else if (OB_FAIL(column_ids_.assign(column_ids))) {
LOG_WARN("columns ids assigned fail.", K(ret), K(column_ids));
} else {
table_key_ = table_key;
start_scn_ = start_scn;
prepare_scn_ = prepare_scn;
table_id_ = table_id;
ddl_task_id_ = ddl_task_id;
}
return ret;
}
OB_SERIALIZE_MEMBER(ObDDLCommitLog, table_key_, start_scn_, prepare_scn_);
OB_SERIALIZE_MEMBER(ObDDLCommitLog, table_key_, start_scn_, prepare_scn_, table_id_, ddl_task_id_, column_ids_);
ObTabletSchemaVersionChangeLog::ObTabletSchemaVersionChangeLog()
: tablet_id_(), schema_version_(-1)

View File

@ -178,14 +178,24 @@ public:
ObDDLPrepareLog();
~ObDDLPrepareLog() = default;
int init(const ObITable::TableKey &table_key,
const share::SCN &start_scn);
bool is_valid() const { return table_key_.is_valid() && start_scn_.is_valid(); }
const share::SCN &start_scn,
const uint64_t table_id,
const int64_t ddl_task_id,
const ObIArray<uint64_t> &column_ids);
bool is_valid() const { return table_key_.is_valid() && start_scn_.is_valid() &&
table_id_ > 0 && ddl_task_id_ > 0 && column_ids_.count() > 0; }
ObITable::TableKey get_table_key() const { return table_key_; }
share::SCN get_start_scn() const { return start_scn_; }
uint64_t get_table_id() const { return table_id_; }
int64_t get_ddl_task_id() const { return ddl_task_id_; }
const ObSArray<uint64_t> &get_ddl_column_ids() const { return column_ids_; }
TO_STRING_KV(K_(table_key), K_(start_scn));
private:
ObITable::TableKey table_key_;
share::SCN start_scn_;
uint64_t table_id_;
int64_t ddl_task_id_;
ObSArray<uint64_t> column_ids_;
};
class ObDDLCommitLog final
@ -196,16 +206,31 @@ public:
~ObDDLCommitLog() = default;
int init(const ObITable::TableKey &table_key,
const share::SCN &start_scn,
const share::SCN &prepare_scn);
bool is_valid() const { return table_key_.is_valid() && start_scn_.is_valid() && prepare_scn_.is_valid(); }
const share::SCN &prepare_scn,
const uint64_t table_id,
const int64_t ddl_task_id,
const ObIArray<uint64_t> &column_ids);
bool is_valid() const {
return table_key_.is_valid() &&
start_scn_.is_valid() &&
prepare_scn_.is_valid() &&
table_id_ > 0 &&
ddl_task_id_ > 0 &&
column_ids_.count() > 0;}
ObITable::TableKey get_table_key() const { return table_key_; }
share::SCN get_start_scn() const { return start_scn_; }
share::SCN get_prepare_scn() const { return prepare_scn_; }
uint64_t get_table_id() const { return table_id_; }
int64_t get_ddl_task_id() const { return ddl_task_id_; }
const ObSArray<uint64_t> &get_ddl_column_ids() const { return column_ids_; }
TO_STRING_KV(K_(table_key), K_(start_scn), K_(prepare_scn));
private:
ObITable::TableKey table_key_;
share::SCN start_scn_;
share::SCN prepare_scn_;
uint64_t table_id_; // used for report ddl checksum
int64_t ddl_task_id_; // used for report ddl checksum
ObSArray<uint64_t> column_ids_;
};
class ObTabletSchemaVersionChangeLog final

View File

@ -409,7 +409,8 @@ int ObDDLTableMergeTask::process()
merge_param_.table_id_,
merge_param_.execution_id_,
merge_param_.ddl_task_id_,
sstable->get_meta().get_col_checksum()))) {
sstable->get_meta().get_col_checksum(),
merge_param_.column_ids_))) {
LOG_WARN("report ddl column checksum failed", K(ret), K(merge_param_));
} else if (OB_FAIL(GCTX.ob_service_->submit_tablet_update_task(tenant_id, merge_param_.ls_id_, merge_param_.tablet_id_))) {
LOG_WARN("fail to submit tablet update task", K(ret), K(tenant_id), K(merge_param_));
@ -768,11 +769,11 @@ int ObTabletDDLUtil::report_ddl_checksum(const share::ObLSID &ls_id,
const uint64_t table_id,
const int64_t execution_id,
const int64_t ddl_task_id,
const ObIArray<int64_t> &column_checksums)
const ObIArray<int64_t> &column_checksums,
const ObIArray<uint64_t> &column_ids)
{
int ret = OB_SUCCESS;
ObMySQLProxy *sql_proxy = GCTX.sql_proxy_;
ObMultiVersionSchemaService *schema_service = GCTX.schema_service_;
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = nullptr;
const uint64_t tenant_id = MTL_ID();
@ -780,22 +781,12 @@ int ObTabletDDLUtil::report_ddl_checksum(const share::ObLSID &ls_id,
|| !is_valid_id(table_id) || 0 == table_id || execution_id < 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(ls_id), K(tablet_id), K(table_id), K(execution_id));
} else if (!is_valid_tenant_id(tenant_id) || OB_ISNULL(sql_proxy) || OB_ISNULL(schema_service)) {
} else if (!is_valid_tenant_id(tenant_id) || OB_ISNULL(sql_proxy) || column_ids.count() <= 0) {
ret = OB_ERR_SYS;
LOG_WARN("ls service or sql proxy is null", K(ret), K(tenant_id), KP(sql_proxy), KP(schema_service));
} else if (OB_FAIL(schema_service->get_tenant_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("get tenant schema guard failed", K(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
LOG_WARN("get table schema failed", K(ret), K(tenant_id), K(table_id));
} else if (OB_ISNULL(table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_INFO("table not exit", K(ret), K(tenant_id), K(table_id));
LOG_WARN("ls service or sql proxy is null", K(ret), K(tenant_id), KP(sql_proxy), K(column_ids));
} else {
ObArray<ObColDesc> column_ids;
ObArray<ObDDLChecksumItem> ddl_checksum_items;
if (OB_FAIL(table_schema->get_multi_version_column_descs(column_ids))) {
LOG_WARN("fail to get column ids", K(ret), K(ls_id), K(tablet_id));
} else if (OB_UNLIKELY(column_checksums.count() != column_ids.count())) {
if (OB_UNLIKELY(column_checksums.count() != column_ids.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect error, column checksums count didn't equal to column ids count", K(ret),
K(ls_id), K(tablet_id), K(column_checksums.count()), K(column_ids.count()));
@ -806,7 +797,7 @@ int ObTabletDDLUtil::report_ddl_checksum(const share::ObLSID &ls_id,
item.tenant_id_ = tenant_id;
item.table_id_ = table_id;
item.ddl_task_id_ = ddl_task_id;
item.column_id_ = column_ids.at(i).col_id_;
item.column_id_ = column_ids.at(i);
item.task_id_ = tablet_id.id();
item.checksum_ = column_checksums.at(i);
#ifdef ERRSIM

View File

@ -47,14 +47,16 @@ public:
start_scn_(share::SCN::min_scn()),
table_id_(0),
execution_id_(-1),
ddl_task_id_(0)
ddl_task_id_(0),
column_ids_()
{ }
bool is_valid() const
{
return ls_id_.is_valid() && tablet_id_.is_valid() && start_scn_.is_valid_and_not_min();
return ls_id_.is_valid() && tablet_id_.is_valid() && start_scn_.is_valid_and_not_min() && column_ids_.count() > 0;
}
virtual ~ObDDLTableMergeDagParam() = default;
TO_STRING_KV(K_(ls_id), K_(tablet_id), K_(rec_scn), K_(is_commit), K_(start_scn), K_(table_id), K_(execution_id), K_(ddl_task_id));
TO_STRING_KV(K_(ls_id), K_(tablet_id), K_(rec_scn), K_(is_commit), K_(start_scn),
K_(table_id), K_(execution_id), K_(ddl_task_id), K_(column_ids));
public:
share::ObLSID ls_id_;
ObTabletID tablet_id_;
@ -64,6 +66,7 @@ public:
uint64_t table_id_; // used for report ddl checksum
int64_t execution_id_; // used for report ddl checksum
int64_t ddl_task_id_; // used for report ddl checksum
ObArray<uint64_t> column_ids_;
};
class ObDDLTableMergeDag : public share::ObIDag
@ -172,7 +175,8 @@ public:
const uint64_t table_id,
const int64_t execution_id,
const int64_t ddl_task_id,
const ObIArray<int64_t> &column_checksums);
const ObIArray<int64_t> &column_checksums,
const ObIArray<uint64_t> &column_ids);
static int check_and_get_major_sstable(const share::ObLSID &ls_id,
const ObTabletID &tablet_id,
const blocksstable::ObSSTable *&latest_major_sstable);

View File

@ -181,7 +181,11 @@ int ObDDLRedoLogReplayer::replay_prepare(const ObDDLPrepareLog &log, const SCN &
LOG_WARN("need replay but tablet handle is invalid", K(ret), K(need_replay), K(tablet_handle), K(log), K(scn));
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
LOG_WARN("get ddl kv mgr failed", K(ret), K(log), K(scn));
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_prepare(log.get_start_scn(), scn))) {
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_prepare(log.get_start_scn(),
scn,
log.get_ddl_column_ids(),
log.get_table_id(),
log.get_ddl_task_id()))) {
LOG_WARN("replay ddl prepare log failed", K(ret), K(log), K(scn));
} else {
LOG_INFO("replay ddl prepare log success", K(ret), K(log), K(scn));
@ -219,7 +223,12 @@ int ObDDLRedoLogReplayer::replay_commit(const ObDDLCommitLog &log, const SCN &sc
LOG_WARN("need replay but tablet handle is invalid", K(ret), K(need_replay), K(tablet_handle), K(log), K(scn));
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
LOG_WARN("get ddl kv mgr failed", K(ret), K(log), K(scn));
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(log.get_start_scn(), log.get_prepare_scn(), true/*is_replay*/))) {
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(log.get_start_scn(),
log.get_prepare_scn(),
true/*is_replay*/,
log.get_table_id(),
log.get_ddl_task_id(),
log.get_ddl_column_ids()))) {
if (OB_EAGAIN != ret) {
LOG_WARN("replay ddl commit log failed", K(ret), K(log), K(scn));
}

View File

@ -1061,7 +1061,8 @@ int ObDDLSSTableRedoWriter::write_prepare_log(const ObITable::TableKey &table_ke
const int64_t table_id,
const int64_t execution_id,
const int64_t ddl_task_id,
SCN &prepare_scn)
SCN &prepare_scn,
const ObIArray<uint64_t> &column_ids)
{
int ret = OB_SUCCESS;
@ -1081,10 +1082,10 @@ int ObDDLSSTableRedoWriter::write_prepare_log(const ObITable::TableKey &table_ke
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObDDLSSTableRedoWriter has not been inited", K(ret));
} else if (OB_UNLIKELY(!table_key.is_valid() || !start_scn_.is_valid_and_not_min())) {
} else if (OB_UNLIKELY(!table_key.is_valid() || !start_scn_.is_valid_and_not_min() || column_ids.count() < 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(table_key), K(start_scn_));
} else if (OB_FAIL(log.init(table_key, get_start_scn()))) {
LOG_WARN("invalid arguments", K(ret), K(table_key), K(start_scn_), K(column_ids));
} else if (OB_FAIL(log.init(table_key, get_start_scn(), table_id, ddl_task_id, column_ids))) {
LOG_WARN("fail to init DDLCommitLog", K(ret), K(table_key), K(start_scn_));
} else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) {
LOG_WARN("get ls failed", K(ret), K(ls_id_));
@ -1125,7 +1126,10 @@ int ObDDLSSTableRedoWriter::write_prepare_log(const ObITable::TableKey &table_ke
}
int ObDDLSSTableRedoWriter::write_commit_log(const ObITable::TableKey &table_key,
const SCN &prepare_scn)
const SCN &prepare_scn,
const uint64_t table_id,
const int64_t ddl_task_id,
const ObIArray<uint64_t> &column_ids)
{
int ret = OB_SUCCESS;
ObLSHandle ls_handle;
@ -1135,10 +1139,11 @@ int ObDDLSSTableRedoWriter::write_commit_log(const ObITable::TableKey &table_key
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObDDLSSTableRedoWriter has not been inited", K(ret));
} else if (OB_UNLIKELY(!table_key.is_valid() || !start_scn_.is_valid_and_not_min() || !prepare_scn.is_valid_and_not_min())) {
} else if (OB_UNLIKELY(!table_key.is_valid() || !start_scn_.is_valid_and_not_min() ||
!prepare_scn.is_valid_and_not_min() || column_ids.count() < 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(table_key), K(start_scn_), K(prepare_scn));
} else if (OB_FAIL(log.init(table_key, get_start_scn(), prepare_scn))) {
LOG_WARN("invalid arguments", K(ret), K(table_key), K(start_scn_), K(prepare_scn), K(column_ids));
} else if (OB_FAIL(log.init(table_key, get_start_scn(), prepare_scn, table_id, ddl_task_id, column_ids))) {
LOG_WARN("fail to init DDLCommitLog", K(ret), K(table_key), K(start_scn_), K(prepare_scn));
} else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) {
LOG_WARN("get ls failed", K(ret), K(ls_id_));
@ -1162,7 +1167,7 @@ int ObDDLSSTableRedoWriter::write_commit_log(const ObITable::TableKey &table_key
ObSrvRpcProxy *srv_rpc_proxy = GCTX.srv_rpc_proxy_;
obrpc::ObRpcRemoteWriteDDLCommitLogArg arg;
obrpc::Int64 log_ns;
if (OB_FAIL(arg.init(MTL_ID(), leader_ls_id_, table_key, get_start_scn(), prepare_scn))) {
if (OB_FAIL(arg.init(MTL_ID(), leader_ls_id_, table_key, get_start_scn(), prepare_scn, table_id, ddl_task_id))) {
LOG_WARN("fail to init ObRpcRemoteWriteDDLCommitLogArg", K(ret));
} else if (OB_ISNULL(srv_rpc_proxy)) {
ret = OB_ERR_SYS;

View File

@ -256,9 +256,13 @@ public:
const int64_t table_id,
const int64_t execution_id,
const int64_t ddl_task_id,
share::SCN &prepare_scn);
share::SCN &prepare_scn,
const ObIArray<uint64_t> &column_ids);
int write_commit_log(const ObITable::TableKey &table_key,
const share::SCN &prepare_scn);
const share::SCN &prepare_scn,
const uint64_t table_id,
const int64_t ddl_task_id,
const ObIArray<uint64_t> &column_ids);
OB_INLINE void set_start_scn(const share::SCN &start_scn) { start_scn_.atomic_set(start_scn); }
OB_INLINE share::SCN get_start_scn() const { return start_scn_.atomic_get(); }
private:

View File

@ -655,6 +655,7 @@ int ObSSTableInsertTabletContext::create_sstable_with_clog(
share::schema::ObMultiVersionSchemaService *schema_service = nullptr;
const share::schema::ObTableSchema *table_schema = nullptr;
const uint64_t tenant_id = MTL_ID();
ObArray<uint64_t> column_ids;
SCN prepare_scn;
ObSchemaGetterGuard schema_guard;
if (OB_UNLIKELY(!table_key.is_valid())) {
@ -676,11 +677,14 @@ int ObSSTableInsertTabletContext::create_sstable_with_clog(
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ObDDLUtil::get_tenant_schema_column_ids(tenant_id, table_schema->get_table_id(), column_ids))) {
LOG_WARN("fail to get tenant schema column ids", K(ret), K(tenant_id), K(build_param_));
} else if (OB_FAIL(data_sstable_redo_writer_.write_prepare_log(table_key,
table_schema->get_table_id(),
build_param_.execution_id_,
build_param_.ddl_task_id_,
prepare_scn))) {
prepare_scn,
column_ids))) {
if (OB_TASK_EXPIRED == ret) {
LOG_INFO("ddl task expired", K(ret), K(table_key), K(build_param_));
} else {
@ -694,30 +698,38 @@ int ObSSTableInsertTabletContext::create_sstable_with_clog(
const ObLSID &ls_id = ls->get_ls_id();
const ObTabletID &tablet_id = tablet->get_tablet_meta().tablet_id_;
const SCN &ddl_start_scn = data_sstable_redo_writer_.get_start_scn();
const uint64_t table_id = table_schema->get_table_id();
const int64_t ddl_task_id = build_param_.ddl_task_id_;
if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle_, tablet_id, tablet_handle))) {
LOG_WARN("get tablet failed", K(ret));
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
LOG_WARN("get ddl kv manager failed", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_prepare(ddl_start_scn,
prepare_scn,
table_schema->get_table_id(),
build_param_.ddl_task_id_))) {
column_ids,
table_id,
ddl_task_id))) {
if (OB_TASK_EXPIRED == ret) {
LOG_INFO("ddl task expired", K(ret), K(ls_id), K(tablet_id),
LOG_INFO("ddl task expired", K(ret), K(ls_id), K(tablet_id), K(column_ids),
K(ddl_start_scn), "new_ddl_start_scn", ddl_kv_mgr_handle.get_obj()->get_start_scn());
} else {
LOG_WARN("failed to do ddl kv prepare", K(ret), K(ddl_start_scn), K(prepare_scn), K(build_param_));
LOG_WARN("failed to do ddl kv prepare", K(ret), K(ddl_start_scn), K(prepare_scn), K(build_param_), K(column_ids));
}
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_commit(ddl_start_scn, prepare_scn))) {
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_commit(ddl_start_scn, prepare_scn, table_id, ddl_task_id, column_ids))) {
if (OB_TASK_EXPIRED == ret) {
LOG_INFO("ddl task expired, but return success", K(ret), K(ls_id), K(tablet_id),
K(ddl_start_scn), "new_ddl_start_scn", ddl_kv_mgr_handle.get_obj()->get_start_scn());
K(ddl_start_scn), "new_ddl_start_scn",
ddl_kv_mgr_handle.get_obj()->get_start_scn(), K(build_param_),
K(table_id), K(ddl_task_id), K(column_ids));
} else {
LOG_WARN("failed to wait ddl kv commit", K(ret), K(ddl_start_scn), K(build_param_));
LOG_WARN("failed to wait ddl kv commit", K(ret), K(ddl_start_scn), K(build_param_), K(column_ids));
}
} else if (OB_FAIL(data_sstable_redo_writer_.write_commit_log(table_key,
prepare_scn))) {
LOG_WARN("fail write ddl commit log", K(ret), K(table_key));
prepare_scn,
table_id,
ddl_task_id,
column_ids))) {
LOG_WARN("fail write ddl commit log", K(ret), K(table_key), K(table_id), K(ddl_task_id), K(column_ids));
}
}
return ret;

View File

@ -148,10 +148,12 @@ int ObTabletDDLKvMgr::ddl_start(const ObITable::TableKey &table_key,
return ret;
}
int ObTabletDDLKvMgr::ddl_prepare(const SCN &start_scn,
const SCN &prepare_scn,
const uint64_t table_id,
const int64_t ddl_task_id)
int ObTabletDDLKvMgr::ddl_prepare(
const SCN &start_scn,
const SCN &prepare_scn,
const ObIArray<uint64_t> &column_ids,
const uint64_t table_id,
const int64_t ddl_task_id)
{
int ret = OB_SUCCESS;
ObDDLKVHandle kv_handle;
@ -180,6 +182,9 @@ int ObTabletDDLKvMgr::ddl_prepare(const SCN &start_scn,
param.execution_id_ = execution_id_;
param.ddl_task_id_ = ddl_task_id_;
const int64_t start_ts = ObTimeUtility::fast_current_time();
if (OB_FAIL(param.column_ids_.assign(column_ids))) {
LOG_WARN("fail to assign columns ids", K(ret), K(column_ids));
}
while (OB_SUCC(ret) && is_started()) {
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) {
if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) {
@ -202,7 +207,13 @@ int ObTabletDDLKvMgr::ddl_prepare(const SCN &start_scn,
return ret;
}
int ObTabletDDLKvMgr::ddl_commit(const SCN &start_scn, const SCN &prepare_scn, const bool is_replay)
int ObTabletDDLKvMgr::ddl_commit(
const SCN &start_scn,
const SCN &prepare_scn,
const bool is_replay,
const uint64_t table_id,
const int64_t ddl_task_id,
const ObIArray<uint64_t> &column_ids)
{
int ret = OB_SUCCESS;
ObLSHandle ls_handle;
@ -217,6 +228,9 @@ int ObTabletDDLKvMgr::ddl_commit(const SCN &start_scn, const SCN &prepare_scn, c
} else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) {
LOG_WARN("failed to get log stream", K(ret), K(ls_id_));
} else {
table_id_ = table_id;
ddl_task_id_ = ddl_task_id;
ObDDLTableMergeDagParam param;
param.ls_id_ = ls_id_;
param.tablet_id_ = tablet_id_;
@ -226,8 +240,9 @@ int ObTabletDDLKvMgr::ddl_commit(const SCN &start_scn, const SCN &prepare_scn, c
param.table_id_ = table_id_;
param.execution_id_ = execution_id_;
param.ddl_task_id_ = ddl_task_id_;
// retry submit dag in case of the previous dag failed
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) {
if (OB_FAIL(param.column_ids_.assign(column_ids))) {
LOG_WARN("fail to assign columns ids", K(ret), K(column_ids));
} else if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) { // retry submit dag in case of the previous dag failed
if (OB_SIZE_OVERFLOW == ret || OB_EAGAIN == ret) {
ret = OB_EAGAIN;
} else {
@ -250,15 +265,21 @@ int ObTabletDDLKvMgr::ddl_commit(const SCN &start_scn, const SCN &prepare_scn, c
return ret;
}
int ObTabletDDLKvMgr::wait_ddl_commit(const SCN &start_scn, const SCN &prepare_scn)
int ObTabletDDLKvMgr::wait_ddl_commit(
const SCN &start_scn,
const SCN &prepare_scn,
const uint64_t table_id,
const int64_t ddl_task_id,
const ObIArray<uint64_t> &column_ids)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret), K(is_inited_));
} else if (OB_UNLIKELY(!start_scn.is_valid_and_not_min() || !prepare_scn.is_valid_and_not_min())) {
} else if (OB_UNLIKELY(!start_scn.is_valid_and_not_min() || !prepare_scn.is_valid_and_not_min() ||
table_id <= 0 || ddl_task_id <= 0 || column_ids.count() <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(start_scn), K(prepare_scn));
LOG_WARN("invalid argument", K(ret), K(start_scn), K(prepare_scn), K(table_id), K(ddl_task_id), K(column_ids));
} else if (!is_started()) {
ret = OB_STATE_NOT_MATCH;
LOG_WARN("ddl not started", K(ret));
@ -268,12 +289,12 @@ int ObTabletDDLKvMgr::wait_ddl_commit(const SCN &start_scn, const SCN &prepare_s
} else {
const int64_t wait_start_ts = ObTimeUtility::fast_current_time();
while (OB_SUCC(ret)) {
if (OB_FAIL(ddl_commit(start_scn, prepare_scn, false/*is_replay*/))) {
if (OB_FAIL(ddl_commit(start_scn, prepare_scn, false/*is_replay*/, table_id, ddl_task_id, column_ids))) {
if (OB_EAGAIN == ret) {
ob_usleep(10L * 1000L);
ret = OB_SUCCESS; // retry
} else {
LOG_WARN("commit ddl log failed", K(ret), K(start_scn), K(prepare_scn), K(ls_id_), K(tablet_id_));
LOG_WARN("commit ddl log failed", K(ret), K(start_scn), K(prepare_scn), K(ls_id_), K(tablet_id_), K(column_ids));
}
} else {
break;

View File

@ -39,9 +39,9 @@ public:
~ObTabletDDLKvMgr();
int init(const share::ObLSID &ls_id, const common::ObTabletID &tablet_id); // init before memtable mgr
int ddl_start(const ObITable::TableKey &table_key, const share::SCN &start_scn, const int64_t cluster_version, const int64_t execution_id, const share::SCN &checkpoint_scn);
int ddl_prepare(const share::SCN &start_scn, const share::SCN &commit_scn, const uint64_t table_id = 0, const int64_t ddl_task_id = 0); // schedule build a major sstable
int ddl_commit(const share::SCN &start_scn, const share::SCN &prepare_scn, const bool is_replay); // try wait build major sstable
int wait_ddl_commit(const share::SCN &start_scn, const share::SCN &prepare_scn);
int ddl_prepare(const share::SCN &start_scn, const share::SCN &commit_scn, const ObIArray<uint64_t> &column_ids, const uint64_t table_id = 0, const int64_t ddl_task_id = 0); // schedule build a major sstable
int ddl_commit(const share::SCN &start_scn, const share::SCN &prepare_scn, const bool is_replay, const uint64_t table_id, const int64_t ddl_task_id, const ObIArray<uint64_t> &column_ids); // try wait build major sstable
int wait_ddl_commit(const share::SCN &start_scn, const share::SCN &prepare_scn, const uint64_t table_id, const int64_t ddl_task_id, const ObIArray<uint64_t> &column_ids);
int get_ddl_param(ObTabletDDLParam &ddl_param);
int get_or_create_ddl_kv(const share::SCN &scn, ObDDLKVHandle &kv_handle); // used in active ddl kv guard
int get_freezed_ddl_kv(const share::SCN &freeze_scn, ObDDLKVHandle &kv_handle); // locate ddl kv with exeact freeze log ts