From b42f28d4c36eddce1df039566f3615b0753a2dc6 Mon Sep 17 00:00:00 2001 From: Robin Ward Date: Tue, 14 Jun 2016 11:44:35 -0400 Subject: [PATCH] Improved mailing list import. Now uses a SQLite database to store messages rather than JSON files for performance and memory considerations. --- script/import_scripts/base.rb | 12 +- script/import_scripts/mbox.rb | 257 ++++++++++++++++++++++++---------- 2 files changed, 188 insertions(+), 81 deletions(-) diff --git a/script/import_scripts/base.rb b/script/import_scripts/base.rb index f4b378b4f3c..e7829a27013 100644 --- a/script/import_scripts/base.rb +++ b/script/import_scripts/base.rb @@ -198,10 +198,14 @@ class ImportScripts::Base def all_records_exist?(type, import_ids) return false if import_ids.empty? - Post.exec_sql('CREATE TEMP TABLE import_ids(val varchar(200) PRIMARY KEY)') + orig_conn = ActiveRecord::Base.connection + conn = orig_conn.raw_connection + + conn.exec('CREATE TEMP TABLE import_ids(val varchar(200) PRIMARY KEY)') import_id_clause = import_ids.map { |id| "('#{PG::Connection.escape_string(id.to_s)}')" }.join(",") - Post.exec_sql("INSERT INTO import_ids VALUES #{import_id_clause}") + + conn.exec("INSERT INTO import_ids VALUES #{import_id_clause}") existing = "#{type.to_s.classify}CustomField".constantize.where(name: 'import_id') existing = existing.joins('JOIN import_ids ON val = value') @@ -211,7 +215,7 @@ class ImportScripts::Base return true end ensure - Post.exec_sql('DROP TABLE import_ids') + conn.exec('DROP TABLE import_ids') end # Iterate through a list of user records to be imported. @@ -366,7 +370,7 @@ class ImportScripts::Base params[:parent_category_id] = top.id if top end - new_category = create_category(params, params[:id]) + create_category(params, params[:id]) created += 1 end diff --git a/script/import_scripts/mbox.rb b/script/import_scripts/mbox.rb index cf50d1aed91..978b30cc236 100755 --- a/script/import_scripts/mbox.rb +++ b/script/import_scripts/mbox.rb @@ -1,3 +1,4 @@ +require 'sqlite3' require File.expand_path(File.dirname(__FILE__) + "/base.rb") class ImportScripts::Mbox < ImportScripts::Base @@ -5,96 +6,169 @@ class ImportScripts::Mbox < ImportScripts::Base BATCH_SIZE = 1000 CATEGORY_ID = 6 - MBOX_DIR = "/tmp/mbox-input" - USER_INDEX_PATH = "#{MBOX_DIR}/user-index.json" - TOPIC_INDEX_PATH = "#{MBOX_DIR}/topic-index.json" - REPLY_INDEX_PATH = "#{MBOX_DIR}/replies-index.json" + MBOX_DIR = File.expand_path("~/import/site") + + # Remove to not split individual files + SPLIT_AT = /^From owner-/ def execute - create_indices + create_email_indices + create_user_indices + massage_indices import_users create_forum_topics import_replies end - def all_messages + def open_db + SQLite3::Database.new("#{MBOX_DIR}/index.db") + end - files = Dir["#{MBOX_DIR}/*/*"] + def all_messages + files = Dir["#{MBOX_DIR}/messages/*"] files.each_with_index do |f, idx| - raw = File.read(f) - mail = Mail.read_from_string(raw) - yield mail, f - print_status(idx, files.size) + if SPLIT_AT.present? + msg = "" + File.foreach(f).with_index do |line, line_num| + line = line.scrub + if line =~ SPLIT_AT + if !msg.empty? + mail = Mail.read_from_string(msg) + yield mail + print_status(idx, files.size) + msg = "" + end + end + msg << line + end + if !msg.empty? + mail = Mail.read_from_string(msg) + yield mail + print_status(idx, files.size) + msg = "" + end + else + raw = File.read(f) + mail = Mail.read_from_string(raw) + yield mail + print_status(idx, files.size) + end + end end - def create_indices - return if File.exist?(USER_INDEX_PATH) && File.exist?(TOPIC_INDEX_PATH) && File.exist?(REPLY_INDEX_PATH) - puts "", "creating indices" - users = {} + def massage_indices + db = open_db + db.execute "UPDATE emails SET reply_to = null WHERE reply_to = ''" - topics = [] + rows = db.execute "SELECT msg_id, title, reply_to FROM emails ORDER BY email_date ASC" - topic_lookup = {} - topic_titles = {} - replies = [] + msg_ids = {} + titles = {} + rows.each do |row| + msg_ids[row[0]] = true + titles[row[1]] = row[0] + end - all_messages do |mail, filename| - users[mail.from.first] = mail[:from].display_names.first - - msg_id = mail['Message-ID'].to_s - reply_to = mail['In-Reply-To'].to_s - title = clean_title(mail['Subject'].to_s) - date = Time.parse(mail['date'].to_s).to_i + # First, any replies where the parent doesn't exist should have that field cleared + not_found = [] + rows.each do |row| + msg_id, _, reply_to = row if reply_to.present? - topic = topic_lookup[reply_to] || reply_to - topic_lookup[msg_id] = topic - replies << {id: msg_id, topic: topic, file: filename, title: title, date: date} - else - topics << {id: msg_id, file: filename, title: title, date: date} - topic_titles[title] ||= msg_id + not_found << msg_id if msg_ids[reply_to].blank? end end - replies.sort! {|a, b| a[:date] <=> b[:date]} - topics.sort! {|a, b| a[:date] <=> b[:date]} - - # Replies without parents should be hoisted to topics - to_hoist = [] - replies.each do |r| - to_hoist << r if !topic_lookup[r[:topic]] + puts "#{not_found.size} records couldn't be associated with parents" + if not_found.present? + db.execute "UPDATE emails SET reply_to = NULL WHERE msg_id IN (#{not_found.map {|nf| "'#{nf}'"}.join(',')})" end - to_hoist.each do |h| - replies.delete(h) - topics << {id: h[:id], file: h[:file], title: h[:title], date: h[:date]} - topic_titles[h[:title]] ||= h[:id] + dupe_titles = db.execute "SELECT title, COUNT(*) FROM emails GROUP BY title HAVING count(*) > 1" + puts "#{dupe_titles.size} replies to wire up" + dupe_titles.each do |t| + title = t[0] + first = titles[title] + db.execute "UPDATE emails SET reply_to = ? WHERE title = ? and msg_id <> ?", [first, title, first] end - # Topics with duplicate replies should be replies - to_group = [] - topics.each do |t| - first = topic_titles[t[:title]] - to_group << t if first && first != t[:id] + ensure + db.close + end + + def create_email_indices + db = open_db + db.execute "DROP TABLE IF EXISTS emails" + db.execute <<-SQL + CREATE TABLE emails ( + msg_id VARCHAR(995) PRIMARY KEY, + from_email VARCHAR(255) NOT NULL, + from_name VARCHAR(255) NOT NULL, + title VARCHAR(255) NOT NULL, + reply_to VARCHAR(955) NULL, + email_date DATETIME NOT NULL, + message TEXT NOT NULL + ); + SQL + + db.execute "CREATE INDEX by_title ON emails (title)" + db.execute "CREATE INDEX by_email ON emails (from_email)" + + puts "", "creating indices" + + all_messages do |mail| + msg_id = mail['Message-ID'].to_s + + # Many ways to get a name + from = mail[:from] + from_name = nil + + from_email = nil + if mail.from.present? + from_email = mail.from.first + end + + display_names = from.try(:display_names) + if display_names.present? + from_name = display_names.first + end + + if from_name.blank? && from.to_s =~ /\(([^\)]+)\)/ + from_name = Regexp.last_match[1] + end + from_name = from.to_s if from_name.blank? + + title = clean_title(mail['Subject'].to_s) + reply_to = mail['In-Reply-To'].to_s + email_date = mail['date'].to_s + + db.execute "INSERT OR IGNORE INTO emails (msg_id, from_email, from_name, title, reply_to, email_date, message) + VALUES (?, ?, ?, ?, ?, ?, ?)", + [msg_id, from_email, from_name, title, reply_to, email_date, mail.to_s] end + ensure + db.close + end - to_group.each do |t| - topics.delete(t) - replies << {id: t[:id], topic: topic_titles[t[:title]], file: t[:file], title: t[:title], date: t[:date]} - end + def create_user_indices + db = open_db + db.execute "DROP TABLE IF EXISTS users" + db.execute <<-SQL + CREATE TABLE users ( + email VARCHAR(995) PRIMARY KEY, + name VARCHAR(255) NOT NULL + ); + SQL - replies.sort! {|a, b| a[:date] <=> b[:date]} - topics.sort! {|a, b| a[:date] <=> b[:date]} - - - File.write(USER_INDEX_PATH, {users: users}.to_json) - File.write(TOPIC_INDEX_PATH, {topics: topics}.to_json) - File.write(REPLY_INDEX_PATH, {replies: replies}.to_json) + db.execute "INSERT OR IGNORE INTO users (email, name) SELECT from_email, from_name FROM emails" + ensure + db.close end def clean_title(title) + title ||= "" #Strip mailing list name from subject title = title.gsub(/\[[^\]]+\]+/, '').strip @@ -125,24 +199,26 @@ class ImportScripts::Mbox < ImportScripts::Base def import_users puts "", "importing users" + db = open_db - all_users = ::JSON.parse(File.read(USER_INDEX_PATH))['users'] - user_keys = all_users.keys - total_count = user_keys.size + all_users = db.execute("SELECT name, email FROM users") + total_count = all_users.size batches(BATCH_SIZE) do |offset| - users = user_keys[offset..offset+BATCH_SIZE-1] + users = all_users[offset..offset+BATCH_SIZE-1] break if users.nil? - next if all_records_exist? :users, users + next if all_records_exist? :users, users.map {|u| u[1]} - create_users(users, total: total_count, offset: offset) do |email| + create_users(users, total: total_count, offset: offset) do |u| { - id: email, - email: email, - name: all_users[email] + id: u[1], + email: u[1], + name: u[0] } end end + ensure + db.close end def parse_email(msg) @@ -157,23 +233,33 @@ class ImportScripts::Mbox < ImportScripts::Base def create_forum_topics puts "", "creating forum topics" - all_topics = ::JSON.parse(File.read(TOPIC_INDEX_PATH))['topics'] + db = open_db + all_topics = db.execute("SELECT msg_id, + from_email, + from_name, + title, + email_date, + message + FROM emails + WHERE reply_to IS NULL") + topic_count = all_topics.size batches(BATCH_SIZE) do |offset| topics = all_topics[offset..offset+BATCH_SIZE-1] break if topics.nil? - next if all_records_exist? :posts, topics.map {|t| t['id']} + next if all_records_exist? :posts, topics.map {|t| t[0]} create_posts(topics, total: topic_count, offset: offset) do |t| - raw_email = File.read(t['file']) + raw_email = t[5] receiver = Email::Receiver.new(raw_email) mail = Mail.read_from_string(raw_email) mail.body selected = receiver.select_body next unless selected + selected = selected.join('') if selected.kind_of?(Array) raw = selected.force_encoding(selected.encoding).encode("UTF-8") @@ -195,7 +281,7 @@ class ImportScripts::Mbox < ImportScripts::Base end end - { id: t['id'], + { id: t[0], title: clean_title(title), user_id: user_id_from_imported_user_id(mail.from.first) || Discourse::SYSTEM_USER_ID, created_at: mail.date, @@ -204,34 +290,49 @@ class ImportScripts::Mbox < ImportScripts::Base cook_method: Post.cook_methods[:email] } end end + ensure + db.close end def import_replies puts "", "creating topic replies" - replies = ::JSON.parse(File.read(REPLY_INDEX_PATH))['replies'] + db = open_db + replies = db.execute("SELECT msg_id, + from_email, + from_name, + title, + email_date, + message, + reply_to + FROM emails + WHERE reply_to IS NOT NULL") + post_count = replies.size batches(BATCH_SIZE) do |offset| posts = replies[offset..offset+BATCH_SIZE-1] break if posts.nil? - next if all_records_exist? :posts, posts.map {|p| p['id']} + next if all_records_exist? :posts, posts.map {|p| p[0]} create_posts(posts, total: post_count, offset: offset) do |p| - parent_id = p['topic'] - id = p['id'] + parent_id = p[6] + id = p[0] topic = topic_lookup_from_imported_post_id(parent_id) topic_id = topic[:topic_id] if topic next unless topic_id - raw_email = File.read(p['file']) + raw_email = p[5] receiver = Email::Receiver.new(raw_email) mail = Mail.read_from_string(raw_email) mail.body selected = receiver.select_body + selected = selected.join('') if selected.kind_of?(Array) + next unless selected + raw = selected.force_encoding(selected.encoding).encode("UTF-8") # import the attachments @@ -258,6 +359,8 @@ class ImportScripts::Mbox < ImportScripts::Base cook_method: Post.cook_methods[:email] } end end + ensure + db.close end end