mirror of
https://github.com/discourse/discourse.git
synced 2025-06-07 18:34:42 +08:00
DEV: Introduce bulk channel tracking publisher (#20838)
This commit introduces a Chat::Publisher and MessageBus endpoint that allows for updating a user's channel tracking state in bulk for multiple channels, rather than having to do it for one channel at a time. This also required an improvement to ChannelUnreadsQuery -- now multiple channel IDs can be passed to this to get the unread counts and mention counts for those channels for a user, also increasing efficiency rather than having to do a query for every individual channel. Followup to #20802
This commit is contained in:
@ -2,41 +2,42 @@
|
|||||||
|
|
||||||
module Chat
|
module Chat
|
||||||
class ChannelUnreadsQuery
|
class ChannelUnreadsQuery
|
||||||
def self.call(channel_id:, user_id:)
|
def self.call(channel_ids:, user_id:)
|
||||||
sql = <<~SQL
|
sql = <<~SQL
|
||||||
SELECT (
|
SELECT (
|
||||||
SELECT COUNT(*) AS unread_count
|
SELECT COUNT(*) AS unread_count
|
||||||
FROM chat_messages
|
FROM chat_messages
|
||||||
INNER JOIN chat_channels ON chat_channels.id = chat_messages.chat_channel_id
|
INNER JOIN chat_channels ON chat_channels.id = chat_messages.chat_channel_id
|
||||||
INNER JOIN user_chat_channel_memberships ON user_chat_channel_memberships.chat_channel_id = chat_channels.id
|
INNER JOIN user_chat_channel_memberships ON user_chat_channel_memberships.chat_channel_id = chat_channels.id
|
||||||
WHERE chat_channels.id = :channel_id
|
WHERE chat_channels.id = memberships.chat_channel_id
|
||||||
AND chat_messages.user_id != :user_id
|
AND chat_messages.user_id != :user_id
|
||||||
AND user_chat_channel_memberships.user_id = :user_id
|
AND user_chat_channel_memberships.user_id = :user_id
|
||||||
AND chat_messages.id > COALESCE(user_chat_channel_memberships.last_read_message_id, 0)
|
AND chat_messages.id > COALESCE(user_chat_channel_memberships.last_read_message_id, 0)
|
||||||
AND chat_messages.deleted_at IS NULL
|
AND chat_messages.deleted_at IS NULL
|
||||||
) AS unread_count,
|
) AS unread_count,
|
||||||
(
|
(
|
||||||
SELECT COUNT(*) AS mention_count
|
SELECT COUNT(*) AS mention_count
|
||||||
FROM notifications
|
FROM notifications
|
||||||
INNER JOIN user_chat_channel_memberships ON user_chat_channel_memberships.chat_channel_id = :channel_id
|
INNER JOIN user_chat_channel_memberships ON user_chat_channel_memberships.user_id = :user_id
|
||||||
AND user_chat_channel_memberships.user_id = :user_id
|
WHERE NOT read
|
||||||
WHERE NOT read
|
AND user_chat_channel_memberships.chat_channel_id = memberships.chat_channel_id
|
||||||
AND notifications.user_id = :user_id
|
AND notifications.user_id = :user_id
|
||||||
AND notifications.notification_type = :notification_type
|
AND notifications.notification_type = :notification_type
|
||||||
AND (data::json->>'chat_message_id')::bigint > COALESCE(user_chat_channel_memberships.last_read_message_id, 0)
|
AND (data::json->>'chat_message_id')::bigint > COALESCE(user_chat_channel_memberships.last_read_message_id, 0)
|
||||||
AND (data::json->>'chat_channel_id')::bigint = :channel_id
|
AND (data::json->>'chat_channel_id')::bigint = memberships.chat_channel_id
|
||||||
) AS mention_count;
|
) AS mention_count,
|
||||||
|
memberships.chat_channel_id AS channel_id
|
||||||
|
FROM user_chat_channel_memberships AS memberships
|
||||||
|
WHERE memberships.user_id = :user_id AND memberships.chat_channel_id IN (:channel_ids)
|
||||||
|
GROUP BY memberships.chat_channel_id
|
||||||
SQL
|
SQL
|
||||||
|
|
||||||
DB
|
DB.query(
|
||||||
.query(
|
sql,
|
||||||
sql,
|
channel_ids: channel_ids,
|
||||||
channel_id: channel_id,
|
user_id: user_id,
|
||||||
user_id: user_id,
|
notification_type: Notification.types[:chat_mention],
|
||||||
notification_type: Notification.types[:chat_mention],
|
)
|
||||||
)
|
|
||||||
.first
|
|
||||||
.to_h
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -54,13 +54,14 @@ module Chat
|
|||||||
end
|
end
|
||||||
|
|
||||||
def publish_user_tracking_state(guardian:, updated_memberships:, **)
|
def publish_user_tracking_state(guardian:, updated_memberships:, **)
|
||||||
updated_memberships.each do |membership|
|
data =
|
||||||
Chat::Publisher.publish_user_tracking_state(
|
updated_memberships.each_with_object({}) do |membership, data_hash|
|
||||||
guardian.user,
|
data_hash[membership.channel_id] = {
|
||||||
membership.channel_id,
|
last_read_message_id: membership.last_read_message_id,
|
||||||
membership.last_read_message_id,
|
membership_id: membership.membership_id,
|
||||||
)
|
}
|
||||||
end
|
end
|
||||||
|
Chat::Publisher.publish_bulk_user_tracking_state(guardian.user, data)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -129,8 +129,8 @@ module Chat
|
|||||||
end
|
end
|
||||||
|
|
||||||
def self.publish_user_tracking_state(user, chat_channel_id, chat_message_id)
|
def self.publish_user_tracking_state(user, chat_channel_id, chat_message_id)
|
||||||
data = { chat_channel_id: chat_channel_id, chat_message_id: chat_message_id }.merge(
|
data = { channel_id: chat_channel_id, last_read_message_id: chat_message_id }.merge(
|
||||||
Chat::ChannelUnreadsQuery.call(channel_id: chat_channel_id, user_id: user.id),
|
Chat::ChannelUnreadsQuery.call(channel_ids: [chat_channel_id], user_id: user.id).first.to_h,
|
||||||
)
|
)
|
||||||
|
|
||||||
MessageBus.publish(
|
MessageBus.publish(
|
||||||
@ -140,6 +140,30 @@ module Chat
|
|||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def self.bulk_user_tracking_state_message_bus_channel(user_id)
|
||||||
|
"/chat/bulk-user-tracking-state/#{user_id}"
|
||||||
|
end
|
||||||
|
|
||||||
|
def self.publish_bulk_user_tracking_state(user, channel_last_read_map)
|
||||||
|
unread_data =
|
||||||
|
Chat::ChannelUnreadsQuery.call(
|
||||||
|
channel_ids: channel_last_read_map.keys,
|
||||||
|
user_id: user.id,
|
||||||
|
).map(&:to_h)
|
||||||
|
|
||||||
|
channel_last_read_map.each do |key, value|
|
||||||
|
channel_last_read_map[key] = value.merge(
|
||||||
|
unread_data.find { |data| data[:channel_id] == key }.except(:channel_id),
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
MessageBus.publish(
|
||||||
|
self.bulk_user_tracking_state_message_bus_channel(user.id),
|
||||||
|
channel_last_read_map.as_json,
|
||||||
|
user_ids: [user.id],
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
def self.new_mentions_message_bus_channel(chat_channel_id)
|
def self.new_mentions_message_bus_channel(chat_channel_id)
|
||||||
"/chat/#{chat_channel_id}/new-mentions"
|
"/chat/#{chat_channel_id}/new-mentions"
|
||||||
end
|
end
|
||||||
|
@ -213,6 +213,11 @@ export default class ChatSubscriptionsManager extends Service {
|
|||||||
this._onUserTrackingStateUpdate,
|
this._onUserTrackingStateUpdate,
|
||||||
lastId
|
lastId
|
||||||
);
|
);
|
||||||
|
this.messageBus.subscribe(
|
||||||
|
`/chat/bulk-user-tracking-state/${this.currentUser.id}`,
|
||||||
|
this._onBulkUserTrackingStateUpdate,
|
||||||
|
lastId
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
_stopUserTrackingStateSubscription() {
|
_stopUserTrackingStateSubscription() {
|
||||||
@ -224,20 +229,38 @@ export default class ChatSubscriptionsManager extends Service {
|
|||||||
`/chat/user-tracking-state/${this.currentUser.id}`,
|
`/chat/user-tracking-state/${this.currentUser.id}`,
|
||||||
this._onUserTrackingStateUpdate
|
this._onUserTrackingStateUpdate
|
||||||
);
|
);
|
||||||
|
|
||||||
|
this.messageBus.unsubscribe(
|
||||||
|
`/chat/bulk-user-tracking-state/${this.currentUser.id}`,
|
||||||
|
this._onBulkUserTrackingStateUpdate
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@bind
|
||||||
|
_onBulkUserTrackingStateUpdate(busData) {
|
||||||
|
Object.keys(busData).forEach((channelId) => {
|
||||||
|
this._updateChannelTrackingData(channelId, busData[channelId]);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@bind
|
@bind
|
||||||
_onUserTrackingStateUpdate(busData) {
|
_onUserTrackingStateUpdate(busData) {
|
||||||
this.chatChannelsManager.find(busData.chat_channel_id).then((channel) => {
|
this._updateChannelTrackingData(busData.channel_id, busData);
|
||||||
|
}
|
||||||
|
|
||||||
|
@bind
|
||||||
|
_updateChannelTrackingData(channelId, trackingData) {
|
||||||
|
this.chatChannelsManager.find(channelId).then((channel) => {
|
||||||
if (
|
if (
|
||||||
!channel?.currentUserMembership?.last_read_message_id ||
|
!channel?.currentUserMembership?.last_read_message_id ||
|
||||||
parseInt(channel?.currentUserMembership?.last_read_message_id, 10) <=
|
parseInt(channel?.currentUserMembership?.last_read_message_id, 10) <=
|
||||||
busData.chat_message_id
|
trackingData.last_read_message_id
|
||||||
) {
|
) {
|
||||||
channel.currentUserMembership.last_read_message_id =
|
channel.currentUserMembership.last_read_message_id =
|
||||||
busData.chat_message_id;
|
trackingData.last_read_message_id;
|
||||||
channel.currentUserMembership.unread_count = busData.unread_count;
|
channel.currentUserMembership.unread_count = trackingData.unread_count;
|
||||||
channel.currentUserMembership.unread_mentions = busData.unread_mentions;
|
channel.currentUserMembership.unread_mentions =
|
||||||
|
trackingData.mention_count;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -13,39 +13,91 @@ describe Chat::ChannelUnreadsQuery do
|
|||||||
end
|
end
|
||||||
|
|
||||||
context "with unread message" do
|
context "with unread message" do
|
||||||
it "returns a correct unread count" do
|
before { Fabricate(:chat_message, chat_channel: channel_1) }
|
||||||
Fabricate(:chat_message, chat_channel: channel_1)
|
|
||||||
|
|
||||||
expect(described_class.call(channel_id: channel_1.id, user_id: current_user.id)).to eq(
|
it "returns a correct unread count" do
|
||||||
{ mention_count: 0, unread_count: 1 },
|
expect(
|
||||||
)
|
described_class.call(channel_ids: [channel_1.id], user_id: current_user.id).first.to_h,
|
||||||
|
).to eq({ mention_count: 0, unread_count: 1, channel_id: channel_1.id })
|
||||||
|
end
|
||||||
|
|
||||||
|
context "for multiple channels" do
|
||||||
|
fab!(:channel_2) { Fabricate(:category_channel) }
|
||||||
|
|
||||||
|
it "returns accurate counts" do
|
||||||
|
channel_2.add(current_user)
|
||||||
|
Fabricate(:chat_message, chat_channel: channel_2)
|
||||||
|
Fabricate(:chat_message, chat_channel: channel_2)
|
||||||
|
|
||||||
|
expect(
|
||||||
|
described_class.call(
|
||||||
|
channel_ids: [channel_1.id, channel_2.id],
|
||||||
|
user_id: current_user.id,
|
||||||
|
).map(&:to_h),
|
||||||
|
).to match_array(
|
||||||
|
[
|
||||||
|
{ mention_count: 0, unread_count: 1, channel_id: channel_1.id },
|
||||||
|
{ mention_count: 0, unread_count: 2, channel_id: channel_2.id },
|
||||||
|
],
|
||||||
|
)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context "with unread mentions" do
|
context "with unread mentions" do
|
||||||
before { Jobs.run_immediately! }
|
before { Jobs.run_immediately! }
|
||||||
|
|
||||||
it "returns a correct unread mention" do
|
def create_mention(message, channel)
|
||||||
message = Fabricate(:chat_message)
|
|
||||||
notification =
|
notification =
|
||||||
Notification.create!(
|
Notification.create!(
|
||||||
notification_type: Notification.types[:chat_mention],
|
notification_type: Notification.types[:chat_mention],
|
||||||
user_id: current_user.id,
|
user_id: current_user.id,
|
||||||
data: { chat_message_id: message.id, chat_channel_id: channel_1.id }.to_json,
|
data: { chat_message_id: message.id, chat_channel_id: channel.id }.to_json,
|
||||||
)
|
)
|
||||||
Chat::Mention.create!(notification: notification, user: current_user, chat_message: message)
|
Chat::Mention.create!(notification: notification, user: current_user, chat_message: message)
|
||||||
|
end
|
||||||
|
|
||||||
expect(described_class.call(channel_id: channel_1.id, user_id: current_user.id)).to eq(
|
it "returns a correct unread mention" do
|
||||||
{ mention_count: 1, unread_count: 0 },
|
message = Fabricate(:chat_message, chat_channel: channel_1)
|
||||||
)
|
create_mention(message, channel_1)
|
||||||
|
|
||||||
|
expect(
|
||||||
|
described_class.call(channel_ids: [channel_1.id], user_id: current_user.id).first.to_h,
|
||||||
|
).to eq({ mention_count: 1, unread_count: 1, channel_id: channel_1.id })
|
||||||
|
end
|
||||||
|
|
||||||
|
context "for multiple channels" do
|
||||||
|
fab!(:channel_2) { Fabricate(:category_channel) }
|
||||||
|
|
||||||
|
it "returns accurate counts" do
|
||||||
|
message = Fabricate(:chat_message, chat_channel: channel_1)
|
||||||
|
create_mention(message, channel_1)
|
||||||
|
|
||||||
|
channel_2.add(current_user)
|
||||||
|
Fabricate(:chat_message, chat_channel: channel_2)
|
||||||
|
message_2 = Fabricate(:chat_message, chat_channel: channel_2)
|
||||||
|
create_mention(message_2, channel_2)
|
||||||
|
|
||||||
|
expect(
|
||||||
|
described_class.call(
|
||||||
|
channel_ids: [channel_1.id, channel_2.id],
|
||||||
|
user_id: current_user.id,
|
||||||
|
).map(&:to_h),
|
||||||
|
).to match_array(
|
||||||
|
[
|
||||||
|
{ mention_count: 1, unread_count: 1, channel_id: channel_1.id },
|
||||||
|
{ mention_count: 1, unread_count: 2, channel_id: channel_2.id },
|
||||||
|
],
|
||||||
|
)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context "with nothing unread" do
|
context "with nothing unread" do
|
||||||
it "returns a correct state" do
|
it "returns a correct state" do
|
||||||
expect(described_class.call(channel_id: channel_1.id, user_id: current_user.id)).to eq(
|
expect(
|
||||||
{ mention_count: 0, unread_count: 0 },
|
described_class.call(channel_ids: [channel_1.id], user_id: current_user.id).first.to_h,
|
||||||
)
|
).to eq({ mention_count: 0, unread_count: 0, channel_id: channel_1.id })
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -129,11 +129,30 @@ RSpec.describe Chat::MarkAllUserChannelsRead do
|
|||||||
}.by(-2)
|
}.by(-2)
|
||||||
end
|
end
|
||||||
|
|
||||||
it "publishes tracking state for all affected channels" do
|
it "publishes tracking state in bulk for affected channels" do
|
||||||
messages = MessageBus.track_publish { result }
|
message =
|
||||||
expect(
|
messages.find { |m| m.channel == "/chat/bulk-user-tracking-state/#{current_user.id}" }
|
||||||
messages.select { |m| m.channel == "/chat/user-tracking-state/#{current_user.id}" }.count,
|
|
||||||
).to eq(3)
|
expect(message.data).to eq(
|
||||||
|
channel_1.id.to_s => {
|
||||||
|
"last_read_message_id" => message_2.id,
|
||||||
|
"membership_id" => membership_1.id,
|
||||||
|
"mention_count" => 0,
|
||||||
|
"unread_count" => 0,
|
||||||
|
},
|
||||||
|
channel_2.id.to_s => {
|
||||||
|
"last_read_message_id" => message_4.id,
|
||||||
|
"membership_id" => membership_2.id,
|
||||||
|
"mention_count" => 0,
|
||||||
|
"unread_count" => 0,
|
||||||
|
},
|
||||||
|
channel_3.id.to_s => {
|
||||||
|
"last_read_message_id" => message_6.id,
|
||||||
|
"membership_id" => membership_3.id,
|
||||||
|
"mention_count" => 0,
|
||||||
|
"unread_count" => 0,
|
||||||
|
},
|
||||||
|
)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
Reference in New Issue
Block a user