mirror of
https://github.com/discourse/discourse.git
synced 2025-05-22 22:43:33 +08:00
FIX: distributed cache leak and potential infinite loop
This commit is contained in:
@ -12,6 +12,10 @@ class DistributedCache
|
|||||||
|
|
||||||
attr_reader :key
|
attr_reader :key
|
||||||
|
|
||||||
|
def self.subscribers
|
||||||
|
@subscribers
|
||||||
|
end
|
||||||
|
|
||||||
def self.process_message(message)
|
def self.process_message(message)
|
||||||
i = @subscribers.length-1
|
i = @subscribers.length-1
|
||||||
|
|
||||||
@ -22,7 +26,10 @@ class DistributedCache
|
|||||||
current = @subscribers[i]
|
current = @subscribers[i]
|
||||||
|
|
||||||
next if payload["origin"] == current.object_id
|
next if payload["origin"] == current.object_id
|
||||||
|
next if current.key != payload["hash_key"]
|
||||||
|
|
||||||
hash = current.hash(message.site_id)
|
hash = current.hash(message.site_id)
|
||||||
|
|
||||||
case payload["op"]
|
case payload["op"]
|
||||||
when "set" then hash[payload["key"]] = payload["value"]
|
when "set" then hash[payload["key"]] = payload["value"]
|
||||||
when "delete" then hash.delete(payload["key"])
|
when "delete" then hash.delete(payload["key"])
|
||||||
@ -31,8 +38,9 @@ class DistributedCache
|
|||||||
|
|
||||||
rescue WeakRef::RefError
|
rescue WeakRef::RefError
|
||||||
@subscribers.delete_at(i)
|
@subscribers.delete_at(i)
|
||||||
|
ensure
|
||||||
|
i -= 1
|
||||||
end
|
end
|
||||||
i -= 1
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -55,6 +63,7 @@ class DistributedCache
|
|||||||
|
|
||||||
def self.publish(hash, message)
|
def self.publish(hash, message)
|
||||||
message[:origin] = hash.object_id
|
message[:origin] = hash.object_id
|
||||||
|
message[:hash_key] = hash.key
|
||||||
MessageBus.publish(channel_name, message, {user_ids: [-1]})
|
MessageBus.publish(channel_name, message, {user_ids: [-1]})
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -11,6 +11,37 @@ describe DistributedCache do
|
|||||||
DistributedCache.new("test")
|
DistributedCache.new("test")
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def add_throw_away_cache
|
||||||
|
c = DistributedCache.new("test")
|
||||||
|
c["foofoo"] = "bar"
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'correctly clears up caches' do
|
||||||
|
start = DistributedCache.subscribers.length
|
||||||
|
|
||||||
|
add_throw_away_cache
|
||||||
|
GC.start
|
||||||
|
cache1["foofoo"] = "bar1"
|
||||||
|
wait_for do
|
||||||
|
cache2["foofoo"] == "bar1"
|
||||||
|
end
|
||||||
|
|
||||||
|
DistributedCache.subscribers.length.should == start
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'does not leak state across caches' do
|
||||||
|
c2 = DistributedCache.new("test1")
|
||||||
|
c3 = DistributedCache.new("test1")
|
||||||
|
c2["hi"] = "hi"
|
||||||
|
wait_for do
|
||||||
|
c3["hi"] == "hi"
|
||||||
|
end
|
||||||
|
|
||||||
|
Thread.pass
|
||||||
|
cache1["hi"].should == nil
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
it 'allows coerces symbol keys to strings' do
|
it 'allows coerces symbol keys to strings' do
|
||||||
cache1[:key] = "test"
|
cache1[:key] = "test"
|
||||||
cache1["key"].should == "test"
|
cache1["key"].should == "test"
|
||||||
|
Reference in New Issue
Block a user