DEV: Adds a basic importer for the IntermediateDB

* It only imports users and emails so far
* It stores mapped IDs and usernames in a SQLite DB. In the future, we might want to copy those into the Discourse DB at the end of a migration.
* The importer is split into steps which can mostly be configured with a simple DSL
* Data that needs to be shared between steps can be stored in an instance of the `SharedData` class
* Steps are automatically sorted via their defined dependencies before they are executed
* Common logic for finding unique names (username, group name) is extracted into a helper class
* If possible, steps try to avoid loading already imported data (via `mapping.ids` table)
* And steps should select the `discourse_id` instead of the `original_id` of mapped IDs via SQL
This commit is contained in:
Gerhard Schlager 2025-04-07 17:06:20 +02:00 committed by Gerhard Schlager
parent 7c6b116dfd
commit 251cac39af
20 changed files with 744 additions and 4 deletions

View File

@ -0,0 +1,2 @@
intermediate_db: /shared/import/intermediate.db
mappings_db: /shared/import/mappings.db

View File

@ -19,6 +19,11 @@ en:
default_step_title: "Converting %{type}"
max_progress_calculation: "Calculating items took %{duration}"
importer:
default_step_title: "Importing %{type}"
loading_required_data: "Loading required data..."
done: "Done. Total runtime: %{runtime}"
schema:
validator:
include_exclude_not_allowed: "Cannot use `include` and `exclude` together at %{path}"

View File

@ -0,0 +1,7 @@
CREATE TABLE ids
(
original_id NUMERIC NOT NULL,
type INTEGER NOT NULL,
discourse_id NUMERIC NOT NULL,
PRIMARY KEY (original_id, type)
);

View File

@ -0,0 +1,9 @@
CREATE TABLE usernames
(
discourse_user_id NUMERIC NOT NULL,
original_username TEXT NOT NULL,
discourse_username TEXT NOT NULL,
PRIMARY KEY (discourse_user_id)
);
CREATE INDEX usernames_original_username ON usernames (original_username);

View File

@ -9,10 +9,9 @@ module Migrations::CLI
end
def execute
::Migrations.load_rails_environment
::Migrations.load_rails_environment(quiet: true)
puts "Importing into Discourse #{Discourse::VERSION::STRING}"
puts "Extralite SQLite version: #{Extralite.sqlite3_version}"
::Migrations::Importer.execute
end
end
end

View File

@ -8,6 +8,7 @@ require "oj"
module Migrations
module Database
INTERMEDIATE_DB_SCHEMA_PATH = File.join(::Migrations.root_path, "db", "intermediate_db_schema")
MAPPINGS_DB_SCHEMA_PATH = File.join(::Migrations.root_path, "db", "mappings_db_schema")
UPLOADS_DB_SCHEMA_PATH = File.join(::Migrations.root_path, "db", "uploads_db_schema")
def self.migrate(db_path, migrations_path:)

View File

@ -0,0 +1,13 @@
# frozen_string_literal: true
module Migrations
module Importer
def self.execute
config_path = File.join(::Migrations.root_path, "config", "importer.yml")
config = YAML.load_file(config_path, symbolize_names: true)
executor = Executor.new(config)
executor.start
end
end
end

View File

@ -0,0 +1,166 @@
# frozen_string_literal: true
module Migrations::Importer
class CopyStep < Step
MappingType = ::Migrations::Importer::MappingType
NOW = "NOW()"
SYSTEM_USER_ID = Discourse::SYSTEM_USER_ID
INSERT_MAPPED_IDS_SQL = <<~SQL
INSERT INTO mapped.ids (original_id, type, discourse_id)
VALUES (?, ?, ?)
SQL
class << self
# stree-ignore
def table_name(value = (getter = true; nil))
return @table_name if getter
@table_name = value
end
# stree-ignore
def column_names(value = (getter = true; nil))
return @column_names if getter
@column_names = value
end
def timestamp_columns?
@timestamp_columns ||=
@column_names&.include?(:created_at) || @column_names&.include?(:updated_at)
end
def store_mapped_ids(value)
@store_mapped_ids = value
end
def store_mapped_ids?
!!@store_mapped_ids
end
# stree-ignore
def total_rows_query(query = (getter = true; nil), *parameters)
return [@total_rows_query, @total_rows_query_parameters] if getter
@total_rows_query = query
@total_rows_query_parameters = parameters
nil
end
# stree-ignore
def rows_query(query = (getter = true; nil), *parameters)
return [@rows_query, @rows_query_parameters] if getter
@rows_query = query
@rows_query_parameters = parameters
nil
end
end
def initialize(intermediate_db, discourse_db, shared_data)
super
@last_id = 0
@mapping_type = nil
end
def execute
super
with_progressbar(total_count) { copy_data }
nil
end
private
def copy_data
table_name = self.class.table_name || self.class.name&.demodulize&.underscore
column_names = self.class.column_names || @discourse_db.column_names(table_name)
skipped_rows = []
if self.class.store_mapped_ids?
@last_id = @discourse_db.last_id_of(table_name)
@mapping_type = find_mapping_type(table_name)
end
@discourse_db.copy_data(table_name, column_names, fetch_rows(skipped_rows)) do |inserted_rows|
after_commit_of_inserted_rows(inserted_rows)
if skipped_rows.any?
after_commit_of_skipped_rows(skipped_rows)
skipped_rows.clear
end
end
@discourse_db.fix_last_id_of(table_name) if self.class.store_mapped_ids?
@intermediate_db.commit_transaction
end
def fetch_rows(skipped_rows)
Enumerator.new do |enumerator|
query, parameters = self.class.rows_query
@intermediate_db.query(query, *parameters) do |row|
if (transformed_row = transform_row(row))
enumerator << transformed_row
@stats.reset
else
skipped_rows << row
@stats.reset(skip_count: 1)
end
update_progressbar
end
end
end
def after_commit_of_inserted_rows(rows)
return unless self.class.store_mapped_ids?
rows.each do |row|
@intermediate_db.insert(INSERT_MAPPED_IDS_SQL, [row[:original_id], @mapping_type, row[:id]])
end
nil
end
def after_commit_of_skipped_rows(rows)
return unless self.class.store_mapped_ids?
rows.each do |row|
if row[:id] && row[:original_id]
@intermediate_db.insert(
INSERT_MAPPED_IDS_SQL,
[row[:original_id], @mapping_type, row[:id]],
)
end
end
nil
end
def transform_row(row)
row[:id] = (@last_id += 1) if self.class.store_mapped_ids? && row[:id].nil?
if self.class.timestamp_columns?
row[:created_at] ||= NOW
row[:updated_at] = row[:created_at]
end
row
end
def find_mapping_type(table_name)
constant_name = table_name.to_s.upcase
if MappingType.const_defined?(constant_name)
MappingType.const_get(constant_name)
else
raise "MappingType::#{constant_name} is not defined"
end
end
def total_count
query, parameters = self.class.total_rows_query
@intermediate_db.count(query, *parameters)
end
end
end

View File

@ -0,0 +1,115 @@
# frozen_string_literal: true
module Migrations::Importer
class DiscourseDB
COPY_BATCH_SIZE = 1_000
def initialize
@encoder = PG::TextEncoder::CopyRow.new
@connection = PG::Connection.new(database_configuration)
@connection.type_map_for_results = PG::BasicTypeMapForResults.new(@connection)
end
def copy_data(table_name, column_names, rows)
quoted_column_name_list = column_names.map { |c| quote_identifier(c) }.join(",")
sql = "COPY #{table_name} (#{quoted_column_name_list}) FROM STDIN"
rows.each_slice(COPY_BATCH_SIZE) do |sliced_rows|
# TODO Maybe add error handling and check if all rows fail to insert, or only
# some of them fail. Currently, if a single row fails to insert, then an exception
# will stop the whole import. Which seems fine because ideally the import script
# should ensure all data is valid. We might need to see how this works out in
# actual migrations...
@connection.transaction do
@connection.copy_data(sql, @encoder) do
sliced_rows.each do |row|
data = column_names.map { |c| row[c] }
@connection.put_copy_data(data)
end
end
# give the caller a chance to do some work when a batch has been committed,
# for example, to store ID mappings
yield sliced_rows
end
end
nil
end
def last_id_of(table_name)
query = <<~SQL
SELECT COALESCE(MAX(id), 0)
FROM #{quote_identifier(table_name)}
WHERE id > 0
SQL
result = @connection.exec(query)
result.getvalue(0, 0)
end
def fix_last_id_of(table_name)
table_name = quote_identifier(table_name)
query = <<~SQL
SELECT SETVAL(PG_GET_SERIAL_SEQUENCE('#{table_name}', 'id'), MAX(id))
FROM #{table_name}
HAVING MAX(id) > 0
SQL
@connection.exec(query)
nil
end
def column_names(table_name)
query = <<~SQL
SELECT column_name
FROM information_schema.columns
WHERE table_name = $1
ORDER BY ordinal_position
SQL
result = @connection.exec_params(query, [table_name])
result.column_values(0).map(&:to_sym)
end
def query_array(sql, *params)
@connection.send_query_params(sql, params)
@connection.set_single_row_mode
Enumerator.new do |y|
while (result = @connection.get_result)
result.stream_each_row { |row| y.yield(row) }
result.clear
end
end
end
def close
@connection.finish
end
private
def database_configuration
db_config = ActiveRecord::Base.connection_db_config.configuration_hash
# credentials for PostgreSQL in CI environment
if Rails.env.test?
username = ENV["PGUSER"]
password = ENV["PGPASSWORD"]
end
{
host: db_config[:host],
port: db_config[:port],
username: db_config[:username] || username,
password: db_config[:password] || password,
dbname: db_config[:database],
}.compact
end
def quote_identifier(identifier)
PG::Connection.quote_ident(identifier.to_s)
end
end
end

View File

@ -0,0 +1,63 @@
# frozen_string_literal: true
module Migrations::Importer
class Executor
def initialize(config)
@intermediate_db = ::Migrations::Database.connect(config[:intermediate_db])
@discourse_db = DiscourseDB.new
@shared_data = SharedData.new(@discourse_db)
attach_mappings_db(config[:mappings_db])
end
def start
runtime =
::Migrations::DateHelper.track_time do
execute_steps
ensure
cleanup
end
puts I18n.t("importer.done", runtime: ::Migrations::DateHelper.human_readable_time(runtime))
end
private
def attach_mappings_db(db_path)
# ::Migrations::Database.reset!(db_path)
::Migrations::Database.migrate(
db_path,
migrations_path: ::Migrations::Database::MAPPINGS_DB_SCHEMA_PATH,
)
@intermediate_db.execute("ATTACH DATABASE ? AS mapped", db_path)
end
def step_classes
steps_module = ::Migrations::Importer::Steps
classes =
steps_module
.constants
.map { |c| steps_module.const_get(c) }
.select { |klass| klass.is_a?(Class) && klass < ::Migrations::Importer::Step }
TopologicalSorter.sort(classes)
end
def execute_steps
max = step_classes.size
step_classes
.each
.with_index(1) do |step_class, index|
puts "#{step_class.title} [#{index}/#{max}]"
step = step_class.new(@intermediate_db, @discourse_db, @shared_data)
step.execute
puts ""
end
end
def cleanup
@intermediate_db.close
@discourse_db.close
end
end
end

View File

@ -0,0 +1,7 @@
# frozen_string_literal: true
module Migrations::Importer
module MappingType
USERS = 1
end
end

View File

@ -0,0 +1,51 @@
# frozen_string_literal: true
module Migrations::Importer
class SharedData
def initialize(discourse_db)
@discourse_db = discourse_db
end
def load_set(sql)
@discourse_db.query_array(sql).map(&:first).to_set
end
def load_mapping(sql)
rows = @discourse_db.query_array(sql)
if rows.first && rows.first.size > 2
rows.to_h { |key, *values| [key, *values] }
else
rows.to_h
end
end
def load(type)
case type
when :usernames
@existing_usernames_lower ||= load_set <<~SQL
SELECT username_lower
FROM users
SQL
when :group_names
@existing_group_names_lower ||= load_set <<~SQL
SELECT LOWER(name)
FROM groups
SQL
else
raise "Unknown type: #{type}"
end
end
def unload_shared_data(type)
case type
when :usernames
@existing_usernames_lower = nil
when :group_names
@existing_group_names_lower = nil
else
raise "Unknown type: #{type}"
end
end
end
end

View File

@ -0,0 +1,116 @@
# frozen_string_literal: true
module Migrations::Importer
class Step
class << self
# stree-ignore
def title(value = (getter = true; nil))
if getter
return(
@title ||=
I18n.t(
"importer.default_step_title",
type: name&.demodulize&.underscore&.humanize(capitalize: false),
)
)
end
@title = value
end
def depends_on(*step_names)
steps_module = ::Migrations::Importer::Steps
classes =
step_names.map do |name|
name = name.to_s.camelize
klass = steps_module.const_get(name) if steps_module.const_defined?(name)
unless klass.is_a?(Class) && klass < ::Migrations::Importer::Step
raise NameError, "Class #{class_name} not found"
end
klass
end
@dependencies ||= []
@dependencies.concat(classes)
end
def dependencies
@dependencies || []
end
def requires_mapping(name, sql)
@required_mappings ||= {}
@required_mappings[name] = sql
end
def required_mappings
@required_mappings || {}
end
def requires_set(name, sql)
@required_sets ||= {}
@required_sets[name] = sql
end
def required_sets
@required_sets || {}
end
end
def initialize(intermediate_db, discourse_db, shared_data)
@intermediate_db = intermediate_db
@discourse_db = discourse_db
@shared_data = shared_data
@stats = StepStats.new(skip_count: 0, warning_count: 0, error_count: 0)
end
def execute
load_required_data
end
private
def load_required_data
required_mappings = self.class.required_mappings
required_sets = self.class.required_sets
return if required_mappings.blank? && required_sets.blank?
print " #{I18n.t("importer.loading_required_data")} "
runtime =
::Migrations::DateHelper.track_time do
required_mappings.each do |name, sql|
instance_variable_set("@#{name}", @shared_data.load_mapping(sql))
end
required_sets.each do |name, sql|
instance_variable_set("@#{name}", @shared_data.load_set(sql))
end
end
puts ::Migrations::DateHelper.human_readable_time(runtime) if runtime >= 1
end
def update_progressbar(increment_by: 1)
@progressbar.update(
increment_by:,
skip_count: @stats.skip_count,
warning_count: @stats.warning_count,
error_count: @stats.error_count,
)
end
def with_progressbar(max_progress)
::Migrations::ExtendedProgressBar
.new(max_progress:)
.run do |progressbar|
@progressbar = progressbar
yield
@progressbar = nil
end
end
end
end

View File

@ -0,0 +1,12 @@
# frozen_string_literal: true
module Migrations::Importer
StepStats =
Struct.new(:skip_count, :warning_count, :error_count) do
def reset(skip_count: 0, warning_count: 0, error_count: 0)
self.skip_count = skip_count
self.warning_count = warning_count
self.error_count = error_count
end
end
end

View File

@ -0,0 +1,39 @@
# frozen_string_literal: true
module Migrations::Importer::Steps
class UserEmails < ::Migrations::Importer::CopyStep
depends_on :users
requires_set :existing_user_ids, "SELECT DISTINCT user_id FROM user_emails"
column_names %i[user_id email primary created_at updated_at]
total_rows_query <<~SQL, MappingType::USERS
SELECT COUNT(*)
FROM users u
JOIN mapped.ids mu ON u.original_id = mu.original_id AND mu.type = ?
LEFT JOIN user_emails ue ON u.original_id = ue.user_id
SQL
rows_query <<~SQL, MappingType::USERS
SELECT mu.discourse_id AS user_id,
ue.email,
COALESCE(ue."primary", TRUE) AS "primary",
COALESCE(ue.created_at, u.created_at) AS created_at
FROM users u
JOIN mapped.ids mu ON u.original_id = mu.original_id AND mu.type = ?
LEFT JOIN user_emails ue ON u.original_id = ue.user_id
ORDER BY ue.ROWID
SQL
private
def transform_row(row)
return nil if @existing_user_ids.include?(row[:user_id])
row[:email] ||= "#{SecureRandom.hex}@email.invalid"
super
end
end
end

View File

@ -60,9 +60,10 @@ module Migrations::Importer::Steps
JSON_GROUP_ARRAY(LOWER(ue.email)) AS emails
FROM users u
LEFT JOIN user_emails ue ON u.original_id = ue.user_id
LEFT JOIN mapped.ids amu ON u.approved_by_id IS NOT NULL AND u.approved_by_id = amu.original_id AND amu.type = ?1
LEFT JOIN user_suspensions us ON u.original_id = us.user_id AND us.suspended_at < DATETIME() AND
(us.suspended_till IS NULL OR us.suspended_till > DATETIME())
LEFT JOIN mapped.ids mu ON u.original_id = mu.original_id AND mu.type = ?
LEFT JOIN mapped.ids mu ON u.original_id = mu.original_id AND mu.type = ?1
WHERE mu.original_id IS NULL
GROUP BY u.original_id
ORDER BY u.ROWID

View File

@ -0,0 +1,50 @@
# frozen_string_literal: true
module Migrations::Importer
class TopologicalSorter
def self.sort(classes)
new(classes).sort
end
def initialize(classes)
@classes = classes
@dependency_graph = build_dependency_graph
end
def sort
in_degree = Hash.new(0)
@dependency_graph.each_value { |edges| edges.each { |edge| in_degree[edge] += 1 } }
queue = @classes.reject { |cls| in_degree[cls] > 0 }
result = []
while queue.any?
node = queue.shift
result << node
@dependency_graph[node].each do |child|
in_degree[child] -= 1
queue << child if in_degree[child] == 0
end
end
raise "Circular dependency detected" if result.size < @classes.size
result
end
private
def build_dependency_graph
graph = Hash.new { |hash, key| hash[key] = [] }
@classes
.sort_by(&:to_s)
.each do |klass|
dependencies = klass.dependencies || []
dependencies.each { |dependency| graph[dependency] << klass }
graph[klass] ||= []
end
graph
end
end
end

View File

@ -0,0 +1,84 @@
# frozen_string_literal: true
module Migrations::Importer
class UniqueNameFinder
MAX_USERNAME_LENGTH = 60
def initialize(shared_data)
@used_usernames_lower = shared_data.load(:usernames)
@used_group_names_lower = shared_data.load(:group_names)
@last_suffixes = {}
@fallback_username =
UserNameSuggester.sanitize_username(I18n.t("fallback_username")).presence ||
UserNameSuggester::LAST_RESORT_USERNAME
@fallback_group_name = "group"
end
def find_available_username(username, allow_reserved_username: false)
username, username_lower =
find_available_name(
username,
fallback_name: @fallback_username,
max_name_length: MAX_USERNAME_LENGTH,
allow_reserved_username:,
)
@used_usernames_lower.add(username_lower)
username
end
def find_available_group_name(group_name)
group_name, group_name_lower =
find_available_name(group_name, fallback_name: @fallback_group_name)
@used_group_names_lower.add(group_name_lower)
group_name
end
private
def name_available?(name, allow_reserved_username: false)
name_lower = name.downcase
return false if @used_usernames_lower.include?(name_lower)
return false if @used_group_names_lower.include?(name_lower)
return false if !allow_reserved_username && User.reserved_username?(name_lower)
true
end
def find_available_name(
name,
fallback_name:,
max_name_length: nil,
allow_reserved_username: false
)
name = name.unicode_normalize
name = UserNameSuggester.sanitize_username(name)
name = fallback_name.dup if name.blank?
name = UserNameSuggester.truncate(name, max_name_length) if max_name_length
if !name_available?(name, allow_reserved_username:)
# if the name ends with a number, then use an underscore before appending the suffix
suffix_separator = name.match?(/\d$/) ? "_" : ""
suffix = next_suffix(name).to_s
# TODO This needs better logic, because it's possible that the max username length is exceeded
name = +"#{name}#{suffix_separator}#{suffix}"
name.next! until name_available?(name, allow_reserved_username:)
end
[name, name.downcase]
end
def next_suffix(name)
name_lower = name.downcase
@last_suffixes.fetch(name_lower, 0) + 1
end
def store_last_suffix(name)
name_lower = name.downcase
@last_suffixes[$1] = $2.to_i if name_lower =~ /^(.+?)(\d+)$/
end
end
end