diff --git a/app/models/optimized_image.rb b/app/models/optimized_image.rb index ec1827e36c6..2041a1669b8 100644 --- a/app/models/optimized_image.rb +++ b/app/models/optimized_image.rb @@ -192,7 +192,9 @@ class OptimizedImage < ActiveRecord::Base extension = File.extname(opts[:filename] || ext_path || path)[1..-1] end - raise Discourse::InvalidAccess if !extension || !extension.match?(IM_DECODERS) + if !extension || !extension.match?(IM_DECODERS) + raise Discourse::InvalidAccess.new("Unsupported extension: #{extension}") + end "#{extension}:#{path}" end diff --git a/script/bulk_import/uploads_importer.rb b/script/bulk_import/uploads_importer.rb new file mode 100644 index 00000000000..99e1cd42cca --- /dev/null +++ b/script/bulk_import/uploads_importer.rb @@ -0,0 +1,666 @@ +# frozen_string_literal: true +puts "Loading application..." +require_relative "../../config/environment" + +require "etc" +require "sqlite3" +require "colored2" + +# hack so that OptimizedImage.lock beliefs that it's running in a Sidekiq job +module Sidekiq + def self.server? + true + end +end + +module BulkImport + class UploadsImporter + TRANSACTION_SIZE = 1000 + QUEUE_SIZE = 1000 + + def initialize(settings_path) + @settings = YAML.load_file(settings_path, symbolize_names: true) + @settings[:path_replacements] ||= [] + + @root_paths = @settings[:root_paths] + @output_db = create_connection(@settings[:output_db_path]) + + initialize_output_db + configure_site_settings + end + + def run + # disable logging for EXIFR which is used by ImageOptim + EXIFR.logger = Logger.new(nil) + + if @settings[:fix_missing] + @source_db = create_connection(@settings[:output_db_path]) + + puts "Fixing missing uploads..." + fix_missing + else + @source_db = create_connection(@settings[:source_db_path]) + + puts "Uploading uploads..." + upload_files + + puts "", "Creating optimized images..." + create_optimized_images if @settings[:create_optimized_images] + end + puts "" + ensure + close + end + + def upload_files + queue = SizedQueue.new(QUEUE_SIZE) + consumer_threads = [] + + if @settings[:delete_missing_uploads] + puts "Deleting missing uploads from output database..." + @output_db.execute(<<~SQL) + DELETE FROM uploads + WHERE upload IS NULL + SQL + end + + output_existing_ids = Set.new + query("SELECT id FROM uploads", @output_db).tap do |result_set| + result_set.each { |row| output_existing_ids << row["id"] } + result_set.close + end + + source_existing_ids = Set.new + query("SELECT id FROM uploads", @source_db).tap do |result_set| + result_set.each { |row| source_existing_ids << row["id"] } + result_set.close + end + + if (surplus_upload_ids = output_existing_ids - source_existing_ids).any? + if @settings[:delete_surplus_uploads] + puts "Deleting #{surplus_upload_ids.size} uploads from output database..." + + surplus_upload_ids.each_slice(TRANSACTION_SIZE) do |ids| + placeholders = (["?"] * ids.size).join(",") + @output_db.execute(<<~SQL, ids) + DELETE FROM uploads + WHERE id IN (#{placeholders}) + SQL + end + + output_existing_ids -= surplus_upload_ids + else + puts "Found #{surplus_upload_ids.size} surplus uploads in output database. " \ + "Run with `delete_surplus_uploads: true` to delete them." + end + + surplus_upload_ids = nil + end + + max_count = (source_existing_ids - output_existing_ids).size + source_existing_ids = nil + puts "Found #{output_existing_ids.size} existing uploads. #{max_count} are missing." + + producer_thread = + Thread.new do + query("SELECT * FROM uploads", @source_db).tap do |result_set| + result_set.each { |row| queue << row unless output_existing_ids.include?(row["id"]) } + result_set.close + end + end + + status_queue = SizedQueue.new(QUEUE_SIZE) + status_thread = + Thread.new do + error_count = 0 + skipped_count = 0 + current_count = 0 + + while !(params = status_queue.pop).nil? + begin + if params.delete(:skipped) == true + skipped_count += 1 + elsif (error_message = params.delete(:error)) || params[:upload].nil? + error_count += 1 + puts "", "Failed to create upload: #{params[:id]} (#{error_message})", "" + end + + @output_db.execute(<<~SQL, params) + INSERT INTO uploads (id, upload, markdown, skip_reason) + VALUES (:id, :upload, :markdown, :skip_reason) + SQL + rescue StandardError => e + puts "", "Failed to insert upload: #{params[:id]} (#{e.message}))", "" + error_count += 1 + end + + current_count += 1 + error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors" + + print "\r%7d / %7d (%s, %d skipped)" % + [current_count, max_count, error_count_text, skipped_count] + end + end + + (Etc.nprocessors * @settings[:thread_count_factor]).to_i.times do |index| + consumer_threads << Thread.new do + Thread.current.name = "worker-#{index}" + + store = Discourse.store + + while (row = queue.pop) + begin + data_file = nil + path = nil + + if row["data"].present? + data_file = Tempfile.new("discourse-upload", binmode: true) + data_file.write(row["data"]) + data_file.rewind + path = data_file.path + else + relative_path = row["relative_path"] + file_exists = false + + @root_paths.each do |root_path| + path = File.join(root_path, relative_path, row["filename"]) + break if (file_exists = File.exist?(path)) + + @settings[:path_replacements].each do |from, to| + path = File.join(root_path, relative_path.sub(from, to), row["filename"]) + break if (file_exists = File.exist?(path)) + end + end + + if !file_exists + status_queue << { + id: row["id"], + upload: nil, + skipped: true, + skip_reason: "file not found", + } + next + end + end + + retry_count = 0 + + loop do + error_message = nil + upload = + copy_to_tempfile(path) do |file| + begin + UploadCreator.new(file, row["filename"], type: row["type"]).create_for( + Discourse::SYSTEM_USER_ID, + ) + rescue StandardError => e + error_message = e.message + nil + end + end + + if (upload_okay = upload.present? && upload.persisted? && upload.errors.blank?) + upload_path = store.get_path_for_upload(upload) + + file_exists = + if store.external? + store.object_from_path(upload_path).exists? + else + File.exist?(File.join(store.public_dir, upload_path)) + end + + unless file_exists + upload.destroy + upload = nil + upload_okay = false + end + end + + if upload_okay + status_queue << { + id: row["id"], + upload: upload.attributes.to_json, + markdown: UploadMarkdown.new(upload).to_markdown, + skip_reason: nil, + } + break + elsif retry_count >= 3 + error_message ||= upload&.errors&.full_messages&.join(", ") || "unknown error" + status_queue << { + id: row["id"], + upload: nil, + markdown: nil, + error: "too many retries: #{error_message}", + skip_reason: "too many retries", + } + break + end + + retry_count += 1 + sleep 0.25 * retry_count + end + rescue StandardError => e + status_queue << { + id: row["id"], + upload: nil, + markdown: nil, + error: e.message, + skip_reason: "error", + } + ensure + data_file&.close! + end + end + end + end + + producer_thread.join + queue.close + consumer_threads.each(&:join) + status_queue.close + status_thread.join + end + + def fix_missing + queue = SizedQueue.new(QUEUE_SIZE) + consumer_threads = [] + + max_count = + @source_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL") + + producer_thread = + Thread.new do + query( + "SELECT id, upload FROM uploads WHERE upload IS NOT NULL ORDER BY rowid DESC", + @source_db, + ).tap do |result_set| + result_set.each { |row| queue << row } + result_set.close + end + end + + status_queue = SizedQueue.new(QUEUE_SIZE) + status_thread = + Thread.new do + error_count = 0 + current_count = 0 + missing_count = 0 + + while !(result = status_queue.pop).nil? + current_count += 1 + + case result[:status] + when :ok + # ignore + when :error + error_count += 1 + puts "Error in #{result[:id]}" + when :missing + missing_count += 1 + puts "Missing #{result[:id]}" + + @output_db.execute("DELETE FROM uploads WHERE id = ?", result[:id]) + Upload.delete_by(id: result[:upload_id]) + end + + error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors" + + print "\r%7d / %7d (%s, %s missing)" % + [current_count, max_count, error_count_text, missing_count] + end + end + + store = Discourse.store + + (Etc.nprocessors * @settings[:thread_count_factor] * 2).to_i.times do |index| + consumer_threads << Thread.new do + Thread.current.name = "worker-#{index}" + fake_upload = OpenStruct.new(url: "") + while (row = queue.pop) + begin + upload = JSON.parse(row["upload"]) + fake_upload.url = upload["url"] + path = store.get_path_for_upload(fake_upload) + + file_exists = + if store.external? + store.object_from_path(path).exists? + else + File.exist?(File.join(store.public_dir, path)) + end + + if file_exists + status_queue << { id: row["id"], upload_id: upload["id"], status: :ok } + else + status_queue << { id: row["id"], upload_id: upload["id"], status: :missing } + end + rescue StandardError => e + puts e.message + status_queue << { id: row["id"], upload_id: upload["id"], status: :error } + end + end + end + end + + producer_thread.join + queue.close + consumer_threads.each(&:join) + status_queue.close + status_thread.join + end + + def create_optimized_images + init_threads = [] + optimized_upload_ids = Set.new + post_upload_ids = Set.new + avatar_upload_ids = Set.new + max_count = 0 + + init_threads << Thread.new do + query("SELECT id FROM optimized_images", @output_db).tap do |result_set| + result_set.each { |row| optimized_upload_ids << row["id"] } + result_set.close + end + end + + init_threads << Thread.new do + sql = <<~SQL + SELECT upload_ids + FROM posts + WHERE upload_ids IS NOT NULL + SQL + query(sql, @source_db).tap do |result_set| + result_set.each do |row| + JSON.parse(row["upload_ids"]).each { |id| post_upload_ids << id } + end + result_set.close + end + end + + init_threads << Thread.new do + sql = <<~SQL + SELECT avatar_upload_id + FROM users + WHERE avatar_upload_id IS NOT NULL + SQL + query(sql, @source_db).tap do |result_set| + result_set.each { |row| avatar_upload_ids << row["avatar_upload_id"] } + result_set.close + end + end + + init_threads << Thread.new do + max_count = + @output_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL") + end + + init_threads.each(&:join) + + status_queue = SizedQueue.new(QUEUE_SIZE) + status_thread = + Thread.new do + error_count = 0 + current_count = 0 + skipped_count = 0 + + while !(params = status_queue.pop).nil? + current_count += 1 + + case params.delete(:status) + when :ok + @output_db.execute(<<~SQL, params) + INSERT INTO optimized_images (id, optimized_images) + VALUES (:id, :optimized_images) + SQL + when :error + error_count += 1 + when :skipped + skipped_count += 1 + end + + error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors" + + print "\r%7d / %7d (%s, %d skipped)" % + [current_count, max_count, error_count_text, skipped_count] + end + end + + queue = SizedQueue.new(QUEUE_SIZE) + consumer_threads = [] + + producer_thread = + Thread.new do + sql = <<~SQL + SELECT id AS upload_id, upload ->> 'sha1' AS upload_sha1, markdown + FROM uploads + WHERE upload IS NOT NULL + ORDER BY rowid + SQL + + query(sql, @output_db).tap do |result_set| + result_set.each do |row| + upload_id = row["upload_id"] + + if optimized_upload_ids.include?(upload_id) || !row["markdown"].start_with?("![") + status_queue << { id: row["upload_id"], status: :skipped } + next + end + + if post_upload_ids.include?(upload_id) + row["type"] = "post" + queue << row + elsif avatar_upload_ids.include?(upload_id) + row["type"] = "avatar" + queue << row + else + status_queue << { id: row["upload_id"], status: :skipped } + end + end + result_set.close + end + end + + avatar_sizes = Discourse.avatar_sizes + store = Discourse.store + remote_factor = store.external? ? 2 : 1 + + Jobs.run_immediately! + + (Etc.nprocessors * @settings[:thread_count_factor] * remote_factor).to_i.times do |index| + consumer_threads << Thread.new do + Thread.current.name = "worker-#{index}" + + post = + PostCreator.new( + Discourse.system_user, + raw: "Topic created by uploads_importer", + acting_user: Discourse.system_user, + skip_validations: true, + title: "Topic created by uploads_importer - #{SecureRandom.hex}", + archetype: Archetype.default, + category: Category.last.id, + ).create! + + while (row = queue.pop) + retry_count = 0 + + loop do + upload = Upload.find_by(sha1: row["upload_sha1"]) + + optimized_images = + begin + case row["type"] + when "post" + post.update_columns(baked_at: nil, cooked: "", raw: row["markdown"]) + post.reload + post.rebake! + OptimizedImage.where(upload_id: upload.id).to_a + when "avatar" + avatar_sizes.map { |size| OptimizedImage.create_for(upload, size, size) } + end + rescue StandardError => e + puts e.message + puts e.stacktrace + nil + end + + begin + if optimized_images.present? + optimized_images.map! do |optimized_image| + next unless optimized_image.present? + optimized_image_path = store.get_path_for_optimized_image(optimized_image) + + file_exists = + if store.external? + store.object_from_path(optimized_image_path).exists? + else + File.exist?(File.join(store.public_dir, optimized_image_path)) + end + + unless file_exists + optimized_image.destroy + optimized_image = nil + end + + optimized_image + end + end + rescue StandardError + optimized_images = nil + end + + optimized_images_okay = + !optimized_images.nil? && optimized_images.all?(&:present?) && + optimized_images.all?(&:persisted?) && + optimized_images.all? { |o| o.errors.blank? } + + if optimized_images_okay + status_queue << { + id: row["upload_id"], + optimized_images: optimized_images.presence&.to_json, + status: :ok, + } + break + elsif retry_count >= 3 + status_queue << { id: row["upload_id"], status: :error } + break + end + + retry_count += 1 + sleep 0.25 * retry_count + end + end + end + end + + producer_thread.join + queue.close + consumer_threads.each(&:join) + status_queue.close + status_thread.join + end + + private + + def create_connection(path) + sqlite = SQLite3::Database.new(path, results_as_hash: true) + sqlite.busy_timeout = 60_000 # 60 seconds + sqlite.journal_mode = "WAL" + sqlite.synchronous = "off" + sqlite + end + + def query(sql, db) + db.prepare(sql).execute + end + + def initialize_output_db + @statement_counter = 0 + + @output_db.execute(<<~SQL) + CREATE TABLE IF NOT EXISTS uploads ( + id TEXT PRIMARY KEY NOT NULL, + upload JSON_TEXT, + markdown TEXT, + skip_reason TEXT + ) + SQL + + @output_db.execute(<<~SQL) + CREATE TABLE IF NOT EXISTS optimized_images ( + id TEXT PRIMARY KEY NOT NULL, + optimized_images JSON_TEXT + ) + SQL + end + + def insert(sql, bind_vars = []) + @output_db.transaction if @statement_counter == 0 + @output_db.execute(sql, bind_vars) + + if (@statement_counter += 1) > TRANSACTION_SIZE + @output_db.commit + @statement_counter = 0 + end + end + + def close + @source_db.close if @source_db + + if @output_db + @output_db.commit if @output_db.transaction_active? + @output_db.close + end + end + + def copy_to_tempfile(source_path) + extension = File.extname(source_path) + + Tempfile.open(["discourse-upload", extension]) do |tmpfile| + File.open(source_path, "rb") { |source_stream| IO.copy_stream(source_stream, tmpfile) } + tmpfile.rewind + yield(tmpfile) + end + end + + def configure_site_settings + settings = @settings[:site_settings] + + SiteSetting.clean_up_uploads = false + SiteSetting.authorized_extensions = settings[:authorized_extensions] + SiteSetting.max_attachment_size_kb = settings[:max_attachment_size_kb] + SiteSetting.max_image_size_kb = settings[:max_image_size_kb] + + if settings[:enable_s3_uploads] + SiteSetting.s3_access_key_id = settings[:s3_access_key_id] + SiteSetting.s3_secret_access_key = settings[:s3_secret_access_key] + SiteSetting.s3_upload_bucket = settings[:s3_upload_bucket] + SiteSetting.s3_region = settings[:s3_region] + SiteSetting.s3_cdn_url = settings[:s3_cdn_url] + SiteSetting.enable_s3_uploads = true + + raise "Failed to enable S3 uploads" if SiteSetting.enable_s3_uploads != true + + Tempfile.open("discourse-s3-test") do |tmpfile| + tmpfile.write("test") + tmpfile.rewind + + upload = + UploadCreator.new(tmpfile, "discourse-s3-test.txt").create_for( + Discourse::SYSTEM_USER_ID, + ) + + unless upload.present? && upload.persisted? && upload.errors.blank? && + upload.url.start_with?("//") + raise "Failed to upload to S3" + end + + upload.destroy + end + end + end + end +end + +# bundle exec ruby script/bulk_import/uploads_importer.rb /path/to/uploads_importer.yml +BulkImport::UploadsImporter.new(ARGV.first).run diff --git a/script/bulk_import/uploads_importer.yml b/script/bulk_import/uploads_importer.yml new file mode 100644 index 00000000000..18070ede8ed --- /dev/null +++ b/script/bulk_import/uploads_importer.yml @@ -0,0 +1,43 @@ +source_db_path: "/path/to/your/db.sqlite3" +output_db_path: "/path/to/your/uploads.sqlite3" + +root_paths: + - "/path/to/your/files" + - "/path/to/more/files" + +# The number of threads to use for processing uploads is calculated as: +# thread_count = [number of cores] * [thread_count_factor] +# The thread count will be doubled if uploads are stored on S3 because there's a higher latency. +thread_count_factor: 1.5 + +# Delete uploads from the output database that are not found in the source database. +delete_surplus_uploads: false + +# Delete uploads from the output database that do not have a Discourse upload record. +delete_missing_uploads: false + +# Check if files are missing in the upload store and update the database accordingly. +# Set to false and re-run the script afterwards if you want to create new uploads for missing files. +fix_missing: false + +# Create optimized images for post uploads and avatars. +create_optimized_images: false + +site_settings: + authorized_extensions: "*" + max_attachment_size_kb: 102_400 + max_image_size_kb: 102_400 + + enable_s3_uploads: true + s3_upload_bucket: "your-bucket-name" + s3_region: "your-region" + s3_access_key_id: "your-access-key-id" + s3_secret_access_key: "your-secret-access-key" + s3_cdn_url: "https://your-cdn-url.com" + +# Sometimes a file can be found at one of many locations. Here's a list of transformations that can +# be applied to the path to try and find the file. The first transformation that results in a file +# being found will be used. +path_replacements: +# - ["/foo/", "/bar"] +# - ["/foo/", "/bar/baz/"]