|
|
|
|
@ -180,8 +180,11 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
|
|
|
|
|
}
|
|
|
|
|
needAutoID := common.TableHasAutoRowID(m.tr.tableInfo.Core) || m.tr.tableInfo.Core.GetAutoIncrementColInfo() != nil || m.tr.tableInfo.Core.ContainsAutoRandomBits()
|
|
|
|
|
err = exec.Transact(ctx, "init table allocator base", func(ctx context.Context, tx *sql.Tx) error {
|
|
|
|
|
query := fmt.Sprintf("SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status from %s WHERE table_id = ? FOR UPDATE", m.tableName) // nolint:gosec
|
|
|
|
|
rows, err := tx.QueryContext(ctx, query, m.tr.tableInfo.ID)
|
|
|
|
|
rows, err := tx.QueryContext(
|
|
|
|
|
ctx,
|
|
|
|
|
fmt.Sprintf("SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status from %s WHERE table_id = ? FOR UPDATE", m.tableName),
|
|
|
|
|
m.tr.tableInfo.ID,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Trace(err)
|
|
|
|
|
}
|
|
|
|
|
@ -282,7 +285,7 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
|
|
|
|
|
if needAutoID && newRowIDBase == 0 && newStatus < metaStatusRestoreStarted {
|
|
|
|
|
newStatus = metaStatusRestoreStarted
|
|
|
|
|
}
|
|
|
|
|
query = fmt.Sprintf("update %s set row_id_base = ?, row_id_max = ?, status = ? where table_id = ? and task_id = ?", m.tableName)
|
|
|
|
|
query := fmt.Sprintf("update %s set row_id_base = ?, row_id_max = ?, status = ? where table_id = ? and task_id = ?", m.tableName)
|
|
|
|
|
_, err := tx.ExecContext(ctx, query, newRowIDBase, newRowIDMax, newStatus.String(), m.tr.tableInfo.ID, m.taskID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Trace(err)
|
|
|
|
|
@ -381,9 +384,11 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks
|
|
|
|
|
needChecksum = true
|
|
|
|
|
needRemoteDupe = true
|
|
|
|
|
err = exec.Transact(ctx, "checksum pre-check", func(ctx context.Context, tx *sql.Tx) error {
|
|
|
|
|
// nolint:gosec
|
|
|
|
|
query := fmt.Sprintf("SELECT task_id, total_kvs_base, total_bytes_base, checksum_base, total_kvs, total_bytes, checksum, status, has_duplicates from %s WHERE table_id = ? FOR UPDATE", m.tableName)
|
|
|
|
|
rows, err := tx.QueryContext(ctx, query, m.tr.tableInfo.ID)
|
|
|
|
|
rows, err := tx.QueryContext(
|
|
|
|
|
ctx,
|
|
|
|
|
fmt.Sprintf("SELECT task_id, total_kvs_base, total_bytes_base, checksum_base, total_kvs, total_bytes, checksum, status, has_duplicates from %s WHERE table_id = ? FOR UPDATE", m.tableName),
|
|
|
|
|
m.tr.tableInfo.ID,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Annotate(err, "fetch task meta failed")
|
|
|
|
|
}
|
|
|
|
|
@ -449,7 +454,7 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks
|
|
|
|
|
return errors.Trace(rows.Err())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
query = fmt.Sprintf("update %s set total_kvs = ?, total_bytes = ?, checksum = ?, status = ?, has_duplicates = ? where table_id = ? and task_id = ?", m.tableName)
|
|
|
|
|
query := fmt.Sprintf("update %s set total_kvs = ?, total_bytes = ?, checksum = ?, status = ?, has_duplicates = ? where table_id = ? and task_id = ?", m.tableName)
|
|
|
|
|
_, err = tx.ExecContext(ctx, query, checksum.SumKVS(), checksum.SumSize(), checksum.Sum(), newStatus.String(), hasLocalDupes, m.tr.tableInfo.ID, m.taskID)
|
|
|
|
|
return errors.Annotate(err, "update local checksum failed")
|
|
|
|
|
})
|
|
|
|
|
@ -594,8 +599,10 @@ func (m *dbTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
|
|
|
|
|
// avoid override existing metadata if the meta is already inserted.
|
|
|
|
|
exist := false
|
|
|
|
|
err := exec.Transact(ctx, "check whether this task has started before", func(ctx context.Context, tx *sql.Tx) error {
|
|
|
|
|
query := fmt.Sprintf("SELECT task_id from %s WHERE task_id = %d", m.tableName, m.taskID) // nolint:gosec
|
|
|
|
|
rows, err := tx.QueryContext(ctx, query)
|
|
|
|
|
rows, err := tx.QueryContext(ctx,
|
|
|
|
|
fmt.Sprintf("SELECT task_id from %s WHERE task_id = ?", m.tableName),
|
|
|
|
|
m.taskID,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Annotate(err, "fetch task meta failed")
|
|
|
|
|
}
|
|
|
|
|
@ -636,8 +643,10 @@ func (m *dbTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(t
|
|
|
|
|
return errors.Annotate(err, "enable pessimistic transaction failed")
|
|
|
|
|
}
|
|
|
|
|
return exec.Transact(ctx, "check tasks exclusively", func(ctx context.Context, tx *sql.Tx) error {
|
|
|
|
|
query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from %s FOR UPDATE", m.tableName) // nolint:gosec
|
|
|
|
|
rows, err := tx.QueryContext(ctx, query)
|
|
|
|
|
rows, err := tx.QueryContext(
|
|
|
|
|
ctx,
|
|
|
|
|
fmt.Sprintf("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from %s FOR UPDATE", m.tableName),
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Annotate(err, "fetch task metas failed")
|
|
|
|
|
}
|
|
|
|
|
@ -696,8 +705,10 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U
|
|
|
|
|
paused := false
|
|
|
|
|
var pausedCfg storedCfgs
|
|
|
|
|
err = exec.Transact(ctx, "check and pause schedulers", func(ctx context.Context, tx *sql.Tx) error {
|
|
|
|
|
query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state from %s FOR UPDATE", m.tableName) // nolint:gosec
|
|
|
|
|
rows, err := tx.QueryContext(ctx, query)
|
|
|
|
|
rows, err := tx.QueryContext(
|
|
|
|
|
ctx,
|
|
|
|
|
fmt.Sprintf("SELECT task_id, pd_cfgs, status, state from %s FOR UPDATE", m.tableName),
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Annotate(err, "fetch task meta failed")
|
|
|
|
|
}
|
|
|
|
|
@ -770,7 +781,7 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U
|
|
|
|
|
return errors.Trace(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
query = fmt.Sprintf("update %s set pd_cfgs = ?, status = ? where task_id = ?", m.tableName)
|
|
|
|
|
query := fmt.Sprintf("update %s set pd_cfgs = ?, status = ? where task_id = ?", m.tableName)
|
|
|
|
|
_, err = tx.ExecContext(ctx, query, string(jsonByts), taskMetaStatusScheduleSet.String(), m.taskID)
|
|
|
|
|
|
|
|
|
|
return errors.Annotate(err, "update task pd configs failed")
|
|
|
|
|
@ -822,8 +833,7 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context, finished bool
|
|
|
|
|
switchBack := true
|
|
|
|
|
allFinished := finished
|
|
|
|
|
err = exec.Transact(ctx, "check and finish schedulers", func(ctx context.Context, tx *sql.Tx) error {
|
|
|
|
|
query := fmt.Sprintf("SELECT task_id, status, state from %s FOR UPDATE", m.tableName) // nolint:gosec
|
|
|
|
|
rows, err := tx.QueryContext(ctx, query)
|
|
|
|
|
rows, err := tx.QueryContext(ctx, fmt.Sprintf("SELECT task_id, status, state from %s FOR UPDATE", m.tableName))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Annotate(err, "fetch task meta failed")
|
|
|
|
|
}
|
|
|
|
|
@ -883,7 +893,7 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context, finished bool
|
|
|
|
|
newStatus = taskMetaStatusSwitchSkipped
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
query = fmt.Sprintf("update %s set status = ?, state = ? where task_id = ?", m.tableName)
|
|
|
|
|
query := fmt.Sprintf("update %s set status = ?, state = ? where task_id = ?", m.tableName)
|
|
|
|
|
if _, err = tx.ExecContext(ctx, query, newStatus.String(), newState, m.taskID); err != nil {
|
|
|
|
|
return errors.Trace(err)
|
|
|
|
|
}
|
|
|
|
|
|