diff --git a/plugins/chat/app/models/chat/message.rb b/plugins/chat/app/models/chat/message.rb index f82a2ec1ce6..b8c18249e41 100644 --- a/plugins/chat/app/models/chat/message.rb +++ b/plugins/chat/app/models/chat/message.rb @@ -312,11 +312,11 @@ module Chat end def thread_reply? - in_thread? && !is_thread_om? + in_thread? && !thread_om? end - def is_thread_om? - self.thread.original_message_id == self.id + def thread_om? + in_thread? && self.thread.original_message_id == self.id end private diff --git a/plugins/chat/app/serializers/chat/thread_serializer.rb b/plugins/chat/app/serializers/chat/thread_serializer.rb index 0408387c0f2..d2f37bd72f5 100644 --- a/plugins/chat/app/serializers/chat/thread_serializer.rb +++ b/plugins/chat/app/serializers/chat/thread_serializer.rb @@ -5,6 +5,22 @@ module Chat has_one :original_message_user, serializer: BasicUserWithStatusSerializer, embed: :objects has_one :original_message, serializer: Chat::ThreadOriginalMessageSerializer, embed: :objects - attributes :id, :title, :status + attributes :id, :title, :status, :channel_id, :meta + + def initialize(object, opts) + super(object, opts) + @opts = opts + end + + def meta + { message_bus_last_ids: { thread_message_bus_last_id: thread_message_bus_last_id } } + end + + private + + def thread_message_bus_last_id + @opts[:thread_message_bus_last_id] || + MessageBus.last_id(Chat::Publisher.thread_message_bus_channel(object.channel_id, object.id)) + end end end diff --git a/plugins/chat/app/services/chat/publisher.rb b/plugins/chat/app/services/chat/publisher.rb index f2964828d92..5aa87e9ad05 100644 --- a/plugins/chat/app/services/chat/publisher.rb +++ b/plugins/chat/app/services/chat/publisher.rb @@ -10,8 +10,27 @@ module Chat "/chat/#{chat_channel_id}" end + def self.thread_message_bus_channel(chat_channel_id, thread_id) + "#{root_message_bus_channel(chat_channel_id)}/thread/#{thread_id}" + end + + def self.calculate_publish_targets(channel, message) + targets = + if message.thread_om? + [ + root_message_bus_channel(channel.id), + thread_message_bus_channel(channel.id, message.thread_id), + ] + elsif message.thread_reply? + [thread_message_bus_channel(channel.id, message.thread_id)] + else + [root_message_bus_channel(channel.id)] + end + targets + end + def self.publish_new!(chat_channel, chat_message, staged_id) - return if chat_message.thread_reply? + message_bus_targets = calculate_publish_targets(chat_channel, chat_message) content = Chat::MessageSerializer.new( @@ -22,19 +41,38 @@ module Chat content[:staged_id] = staged_id permissions = permissions(chat_channel) - MessageBus.publish(root_message_bus_channel(chat_channel.id), content.as_json, permissions) + message_bus_targets.each do |message_bus_channel| + MessageBus.publish(message_bus_channel, content.as_json, permissions) + end - MessageBus.publish( - self.new_messages_message_bus_channel(chat_channel.id), - { - channel_id: chat_channel.id, - message_id: chat_message.id, - user_id: chat_message.user.id, - username: chat_message.user.username, - thread_id: chat_message.thread_id, - }, - permissions, - ) + if chat_message.thread_reply? + MessageBus.publish( + root_message_bus_channel(chat_channel.id), + { + type: :update_thread_original_message, + original_message_id: chat_message.thread.original_message_id, + action: :increment_reply_count, + }.as_json, + permissions, + ) + end + + # NOTE: This means that the read count is only updated in the client + # for new messages in the main channel stream, maybe in future we want to + # do this for thread messages as well? + if !chat_message.thread_reply? + MessageBus.publish( + self.new_messages_message_bus_channel(chat_channel.id), + { + channel_id: chat_channel.id, + message_id: chat_message.id, + user_id: chat_message.user.id, + username: chat_message.user.username, + thread_id: chat_message.thread_id, + }, + permissions, + ) + end end def self.publish_thread_created!(chat_channel, chat_message) @@ -50,7 +88,7 @@ module Chat end def self.publish_processed!(chat_message) - return if chat_message.thread_reply? + message_bus_targets = calculate_publish_targets(chat_channel, chat_message) chat_channel = chat_message.chat_channel content = { @@ -60,15 +98,14 @@ module Chat cooked: chat_message.cooked, }, } - MessageBus.publish( - root_message_bus_channel(chat_channel.id), - content.as_json, - permissions(chat_channel), - ) + + message_bus_targets.each do |message_bus_channel| + MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel)) + end end def self.publish_edit!(chat_channel, chat_message) - return if chat_message.thread_reply? + message_bus_targets = calculate_publish_targets(chat_channel, chat_message) content = Chat::MessageSerializer.new( @@ -76,15 +113,14 @@ module Chat { scope: anonymous_guardian, root: :chat_message }, ).as_json content[:type] = :edit - MessageBus.publish( - root_message_bus_channel(chat_channel.id), - content.as_json, - permissions(chat_channel), - ) + + message_bus_targets.each do |message_bus_channel| + MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel)) + end end def self.publish_refresh!(chat_channel, chat_message) - return if chat_message.thread_reply? + message_bus_targets = calculate_publish_targets(chat_channel, chat_message) content = Chat::MessageSerializer.new( @@ -92,15 +128,14 @@ module Chat { scope: anonymous_guardian, root: :chat_message }, ).as_json content[:type] = :refresh - MessageBus.publish( - root_message_bus_channel(chat_channel.id), - content.as_json, - permissions(chat_channel), - ) + + message_bus_targets.each do |message_bus_channel| + MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel)) + end end def self.publish_reaction!(chat_channel, chat_message, action, user, emoji) - return if chat_message.thread_reply? + message_bus_targets = calculate_publish_targets(chat_channel, chat_message) content = { action: action, @@ -109,11 +144,10 @@ module Chat type: :reaction, chat_message_id: chat_message.id, } - MessageBus.publish( - root_message_bus_channel(chat_channel.id), - content.as_json, - permissions(chat_channel), - ) + + message_bus_targets.each do |message_bus_channel| + MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel)) + end end def self.publish_presence!(chat_channel, user, typ) @@ -121,16 +155,20 @@ module Chat end def self.publish_delete!(chat_channel, chat_message) - return if chat_message.thread_reply? + message_bus_targets = calculate_publish_targets(chat_channel, chat_message) - MessageBus.publish( - root_message_bus_channel(chat_channel.id), - { type: "delete", deleted_id: chat_message.id, deleted_at: chat_message.deleted_at }, - permissions(chat_channel), - ) + message_bus_targets.each do |message_bus_channel| + MessageBus.publish( + message_bus_channel, + { type: "delete", deleted_id: chat_message.id, deleted_at: chat_message.deleted_at }, + permissions(chat_channel), + ) + end end def self.publish_bulk_delete!(chat_channel, deleted_message_ids) + # TODO (martin) Handle sending this through for all the threads that + # may contain the deleted messages as well. MessageBus.publish( root_message_bus_channel(chat_channel.id), { typ: "bulk_delete", deleted_ids: deleted_message_ids, deleted_at: Time.zone.now }, @@ -139,7 +177,7 @@ module Chat end def self.publish_restore!(chat_channel, chat_message) - return if chat_message.thread_reply? + message_bus_targets = calculate_publish_targets(chat_channel, chat_message) content = Chat::MessageSerializer.new( @@ -147,33 +185,36 @@ module Chat { scope: anonymous_guardian, root: :chat_message }, ).as_json content[:type] = :restore - MessageBus.publish( - root_message_bus_channel(chat_channel.id), - content.as_json, - permissions(chat_channel), - ) + + message_bus_targets.each do |message_bus_channel| + MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel)) + end end def self.publish_flag!(chat_message, user, reviewable, score) - return if chat_message.thread_reply? + message_bus_targets = calculate_publish_targets(chat_message.chat_channel, chat_message) - # Publish to user who created flag - MessageBus.publish( - "/chat/#{chat_message.chat_channel_id}", - { - type: "self_flagged", - user_flag_status: score.status_for_database, - chat_message_id: chat_message.id, - }.as_json, - user_ids: [user.id], - ) + message_bus_targets.each do |message_bus_channel| + # Publish to user who created flag + MessageBus.publish( + message_bus_channel, + { + type: "self_flagged", + user_flag_status: score.status_for_database, + chat_message_id: chat_message.id, + }.as_json, + user_ids: [user.id], + ) + end - # Publish flag with link to reviewable to staff - MessageBus.publish( - "/chat/#{chat_message.chat_channel_id}", - { type: "flag", chat_message_id: chat_message.id, reviewable_id: reviewable.id }.as_json, - group_ids: [Group::AUTO_GROUPS[:staff]], - ) + message_bus_targets.each do |message_bus_channel| + # Publish flag with link to reviewable to staff + MessageBus.publish( + message_bus_channel, + { type: "flag", chat_message_id: chat_message.id, reviewable_id: reviewable.id }.as_json, + group_ids: [Group::AUTO_GROUPS[:staff]], + ) + end end def self.user_tracking_state_message_bus_channel(user_id) diff --git a/plugins/chat/assets/javascripts/discourse/components/chat-live-pane.js b/plugins/chat/assets/javascripts/discourse/components/chat-live-pane.js index bb1408d99f8..37b91e116b9 100644 --- a/plugins/chat/assets/javascripts/discourse/components/chat-live-pane.js +++ b/plugins/chat/assets/javascripts/discourse/components/chat-live-pane.js @@ -4,7 +4,10 @@ import ChatMessage from "discourse/plugins/chat/discourse/models/chat-message"; import ChatMessageDraft from "discourse/plugins/chat/discourse/models/chat-message-draft"; import Component from "@glimmer/component"; import { bind, debounce } from "discourse-common/utils/decorators"; -import EmberObject, { action } from "@ember/object"; +import { action } from "@ember/object"; +// TODO (martin) Remove this when the handleSentMessage logic inside chatChannelPaneSubscriptionsManager +// is moved over from this file completely. +import { handleStagedMessage } from "discourse/plugins/chat/discourse/services/chat-pane-base-subscriptions-manager"; import { ajax } from "discourse/lib/ajax"; import { popupAjaxError } from "discourse/lib/ajax-error"; import { cancel, schedule, throttle } from "@ember/runloop"; @@ -34,6 +37,7 @@ export default class ChatLivePane extends Component { @service chatStateManager; @service chatChannelComposer; @service chatChannelPane; + @service chatChannelPaneSubscriptionsManager; @service chatApi; @service currentUser; @service appEvents; @@ -108,7 +112,7 @@ export default class ChatLivePane extends Component { } this.loadMessages(); - this._subscribeToUpdates(this.args.channel?.id); + this._subscribeToUpdates(this.args.channel); } @action @@ -209,8 +213,8 @@ export default class ChatLivePane extends Component { const loadingMoreKey = `loadingMore${capitalize(direction)}`; const canLoadMore = loadingPast - ? this.args.channel.messagesManager.canLoadMorePast - : this.args.channel.messagesManager.canLoadMoreFuture; + ? this.#messagesManager.canLoadMorePast + : this.#messagesManager.canLoadMoreFuture; if ( !canLoadMore || @@ -261,7 +265,7 @@ export default class ChatLivePane extends Component { } this.args.channel.details = meta; - this.args.channel.messagesManager.addMessages(messages); + this.#messagesManager.addMessages(messages); // Edge case for IOS to avoid blank screens // and/or scrolling to bottom losing track of scroll position @@ -508,9 +512,9 @@ export default class ChatLivePane extends Component { } removeMessage(msgData) { - const message = this.args.channel.messagesManager.findMessage(msgData.id); + const message = this.#messagesManager.findMessage(msgData.id); if (message) { - this.args.channel.messagesManager.removeMessage(message); + this.#messagesManager.removeMessage(message); } } @@ -520,72 +524,6 @@ export default class ChatLivePane extends Component { case "sent": this.handleSentMessage(data); break; - case "processed": - this.handleProcessedMessage(data); - break; - case "edit": - this.handleEditMessage(data); - break; - case "refresh": - this.handleRefreshMessage(data); - break; - case "delete": - this.handleDeleteMessage(data); - break; - case "bulk_delete": - this.handleBulkDeleteMessage(data); - break; - case "reaction": - this.handleReactionMessage(data); - break; - case "restore": - this.handleRestoreMessage(data); - break; - case "mention_warning": - this.handleMentionWarning(data); - break; - case "self_flagged": - this.handleSelfFlaggedMessage(data); - break; - case "flag": - this.handleFlaggedMessage(data); - break; - case "thread_created": - this.handleThreadCreated(data); - break; - } - } - - handleThreadCreated(data) { - const message = this.args.channel.messagesManager.findMessage( - data.chat_message.id - ); - if (message) { - message.threadId = data.chat_message.thread_id; - message.threadReplyCount = 1; - } - } - - _handleStagedMessage(stagedMessage, data) { - stagedMessage.error = null; - stagedMessage.id = data.chat_message.id; - stagedMessage.staged = false; - stagedMessage.excerpt = data.chat_message.excerpt; - stagedMessage.threadId = data.chat_message.thread_id; - stagedMessage.channelId = data.chat_message.chat_channel_id; - stagedMessage.createdAt = data.chat_message.created_at; - - const inReplyToMsg = this.args.channel.messagesManager.findMessage( - data.chat_message.in_reply_to?.id - ); - if (inReplyToMsg && !inReplyToMsg.threadId) { - inReplyToMsg.threadId = data.chat_message.thread_id; - } - - // some markdown is cooked differently on the server-side, e.g. - // quotes, avatar images etc. - if (data.chat_message?.cooked !== stagedMessage.cooked) { - stagedMessage.cooked = data.chat_message.cooked; } } @@ -595,139 +533,30 @@ export default class ChatLivePane extends Component { } if (data.chat_message.user.id === this.currentUser.id && data.staged_id) { - const stagedMessage = this.args.channel.messagesManager.findStagedMessage( - data.staged_id - ); + const stagedMessage = handleStagedMessage(this.#messagesManager, data); if (stagedMessage) { - return this._handleStagedMessage(stagedMessage, data); + return; } } - if (this.args.channel.messagesManager.canLoadMoreFuture) { + if (this.#messagesManager.canLoadMoreFuture) { // If we can load more messages, we just notice the user of new messages this.hasNewMessages = true; } else if (this.#isTowardsBottom()) { // If we are at the bottom, we append the message and scroll to it const message = ChatMessage.create(this.args.channel, data.chat_message); - this.args.channel.messagesManager.addMessages([message]); + this.#messagesManager.addMessages([message]); this.scrollToLatestMessage(); this.updateLastReadMessage(); } else { // If we are almost at the bottom, we append the message and notice the user const message = ChatMessage.create(this.args.channel, data.chat_message); - this.args.channel.messagesManager.addMessages([message]); + this.#messagesManager.addMessages([message]); this.hasNewMessages = true; } } - handleProcessedMessage(data) { - const message = this.args.channel.messagesManager.findMessage( - data.chat_message.id - ); - if (message) { - message.cooked = data.chat_message.cooked; - this.scrollToLatestMessage(); - } - } - - handleRefreshMessage(data) { - const message = this.args.channel.messagesManager.findMessage( - data.chat_message.id - ); - if (message) { - message.incrementVersion(); - } - } - - handleEditMessage(data) { - const message = this.args.channel.messagesManager.findMessage( - data.chat_message.id - ); - if (message) { - message.message = data.chat_message.message; - message.cooked = data.chat_message.cooked; - message.excerpt = data.chat_message.excerpt; - message.uploads = cloneJSON(data.chat_message.uploads || []); - message.edited = true; - message.incrementVersion(); - } - } - - handleBulkDeleteMessage(data) { - data.deleted_ids.forEach((deletedId) => { - this.handleDeleteMessage({ - deleted_id: deletedId, - deleted_at: data.deleted_at, - }); - }); - } - - handleDeleteMessage(data) { - const deletedId = data.deleted_id; - const targetMsg = this.args.channel.messagesManager.findMessage(deletedId); - - if (!targetMsg) { - return; - } - - if (this.currentUser.staff || this.currentUser.id === targetMsg.user.id) { - targetMsg.deletedAt = data.deleted_at; - targetMsg.expanded = false; - } else { - this.args.channel.messagesManager.removeMessage(targetMsg); - } - } - - handleReactionMessage(data) { - const message = this.args.channel.messagesManager.findMessage( - data.chat_message_id - ); - if (message) { - message.react(data.emoji, data.action, data.user, this.currentUser.id); - } - } - - handleRestoreMessage(data) { - const message = this.args.channel.messagesManager.findMessage( - data.chat_message.id - ); - if (message) { - message.deletedAt = null; - } else { - this.args.channel.messagesManager.addMessages([ - ChatMessage.create(this.args.channel, data.chat_message), - ]); - } - } - - handleMentionWarning(data) { - const message = this.args.channel.messagesManager.findMessage( - data.chat_message_id - ); - if (message) { - message.mentionWarning = EmberObject.create(data); - } - } - - handleSelfFlaggedMessage(data) { - const message = this.args.channel.messagesManager.findMessage( - data.chat_message_id - ); - if (message) { - message.userFlagStatus = data.user_flag_status; - } - } - - handleFlaggedMessage(data) { - const message = this.args.channel.messagesManager.findMessage( - data.chat_message_id - ); - if (message) { - message.reviewableId = data.reviewable_id; - } - } - // TODO (martin) Maybe change this to public, since its referred to by // livePanel.linkedComponent at the moment. get _selfDeleted() { @@ -788,13 +617,13 @@ export default class ChatLivePane extends Component { if (stagedMessage.inReplyTo) { if (!this.args.channel.threadingEnabled) { - this.args.channel.messagesManager.addMessages([stagedMessage]); + this.#messagesManager.addMessages([stagedMessage]); } } else { - this.args.channel.messagesManager.addMessages([stagedMessage]); + this.#messagesManager.addMessages([stagedMessage]); } - if (!this.args.channel.messagesManager.canLoadMoreFuture) { + if (!this.#messagesManager.canLoadMoreFuture) { this.scrollToLatestMessage(); } @@ -844,8 +673,7 @@ export default class ChatLivePane extends Component { } _onSendError(id, error) { - const stagedMessage = - this.args.channel.messagesManager.findStagedMessage(id); + const stagedMessage = this.#messagesManager.findStagedMessage(id); if (stagedMessage) { if (error.jqXHR?.responseJSON?.errors?.length) { // only network errors are retryable @@ -910,20 +738,22 @@ export default class ChatLivePane extends Component { return; } + this.chatChannelPaneSubscriptionsManager.unsubscribe(); this.messageBus.unsubscribe(`/chat/${channelId}`, this.onMessage); } - _subscribeToUpdates(channelId) { - if (!channelId) { + _subscribeToUpdates(channel) { + if (!channel) { return; } - this._unsubscribeToUpdates(channelId); + this._unsubscribeToUpdates(channel.id); this.messageBus.subscribe( - `/chat/${channelId}`, + `/chat/${channel.id}`, this.onMessage, - this.args.channel.channelMessageBusLastId + channel.channelMessageBusLastId ); + this.chatChannelPaneSubscriptionsManager.subscribe(channel); } @bind diff --git a/plugins/chat/assets/javascripts/discourse/components/chat-thread.hbs b/plugins/chat/assets/javascripts/discourse/components/chat-thread.hbs index baaa67e3f9c..1c00da944af 100644 --- a/plugins/chat/assets/javascripts/discourse/components/chat-thread.hbs +++ b/plugins/chat/assets/javascripts/discourse/components/chat-thread.hbs @@ -1,14 +1,17 @@