Compare commits
5 Commits
master
...
pubsub-poo
Author | SHA1 | Date |
---|---|---|
![]() |
7b987d07d5 | |
![]() |
2b79976abd | |
![]() |
1ec72a582d | |
![]() |
9cad56cfee | |
![]() |
5cb6d2e02d |
|
@ -48,8 +48,8 @@ module ActionCable
|
|||
include InternalChannel
|
||||
include Authorization
|
||||
|
||||
attr_reader :server, :env
|
||||
delegate :worker_pool, :pubsub, to: :server
|
||||
attr_reader :server, :env, :pubsub
|
||||
delegate :worker_pool, to: :server
|
||||
|
||||
attr_reader :logger
|
||||
|
||||
|
@ -62,6 +62,8 @@ module ActionCable
|
|||
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
|
||||
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
|
||||
|
||||
@pubsub = server.pubsub_pool.sample
|
||||
|
||||
@started_at = Time.now
|
||||
end
|
||||
|
||||
|
@ -122,7 +124,6 @@ module ActionCable
|
|||
transmit ActiveSupport::JSON.encode(identifier: '_ping', message: Time.now.to_i)
|
||||
end
|
||||
|
||||
|
||||
protected
|
||||
# The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
|
||||
def request
|
||||
|
|
|
@ -47,14 +47,18 @@ module ActionCable
|
|||
end
|
||||
end
|
||||
|
||||
# The redis pubsub adapter used for all streams/broadcasting.
|
||||
def pubsub
|
||||
@pubsub ||= redis.pubsub
|
||||
# The redis pubsub pool used for all streams/broadcasting.
|
||||
def pubsub_pool
|
||||
@pubsub_pool ||= redis_pool.map { |redis| redis.pubsub }
|
||||
end
|
||||
|
||||
# The EventMachine Redis instance used by the pubsub adapter.
|
||||
def redis
|
||||
@redis ||= EM::Hiredis.connect(config.redis[:url]).tap do |redis|
|
||||
# The EventMachine Redis pool used by the pubsub adapter.
|
||||
def redis_pool
|
||||
@redis_pool ||= Array.new(config.redis_pool_size) { initialize_redis }
|
||||
end
|
||||
|
||||
def initialize_redis
|
||||
EM::Hiredis.connect(config.redis[:url]).tap do |redis|
|
||||
redis.on(:reconnect_failed) do
|
||||
logger.info "[ActionCable] Redis reconnect failed."
|
||||
# logger.info "[ActionCable] Redis reconnected. Closing all the open connections."
|
||||
|
|
|
@ -6,7 +6,7 @@ module ActionCable
|
|||
# in a Rails config initializer.
|
||||
class Configuration
|
||||
attr_accessor :logger, :log_tags
|
||||
attr_accessor :connection_class, :worker_pool_size
|
||||
attr_accessor :connection_class, :worker_pool_size, :redis_pool_size
|
||||
attr_accessor :redis_path, :channels_path
|
||||
attr_accessor :disable_request_forgery_protection, :allowed_request_origins
|
||||
|
||||
|
@ -16,6 +16,7 @@ module ActionCable
|
|||
|
||||
@connection_class = ApplicationCable::Connection
|
||||
@worker_pool_size = 100
|
||||
@redis_pool_size = 20
|
||||
|
||||
@redis_path = Rails.root.join('config/redis/cable.yml')
|
||||
@channels_path = Rails.root.join('app/channels')
|
||||
|
|
|
@ -6,6 +6,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
|
|||
class Connection < ActionCable::Connection::Base
|
||||
identified_by :current_user
|
||||
attr_reader :websocket
|
||||
attr_writer :pubsub
|
||||
|
||||
public :process_internal_message
|
||||
|
||||
|
@ -28,7 +29,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
|
|||
pubsub.expects(:unsubscribe_proc).with('action_cable/User#lifo', kind_of(Proc))
|
||||
|
||||
server = TestServer.new
|
||||
server.stubs(:pubsub).returns(pubsub)
|
||||
server.stubs(:pubsub_pool).returns([ pubsub ])
|
||||
|
||||
open_connection server: server
|
||||
close_connection
|
||||
|
@ -58,7 +59,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
|
|||
protected
|
||||
def open_connection_with_stubbed_pubsub
|
||||
server = TestServer.new
|
||||
server.stubs(:pubsub).returns(stub_everything('pubsub'))
|
||||
server.stubs(:pubsub_pool).returns([ stub_everything('pubsub') ])
|
||||
|
||||
open_connection server: server
|
||||
end
|
||||
|
|
|
@ -25,7 +25,7 @@ class ActionCable::Connection::StringIdentifierTest < ActionCable::TestCase
|
|||
protected
|
||||
def open_connection_with_stubbed_pubsub
|
||||
@server = TestServer.new
|
||||
@server.stubs(:pubsub).returns(stub_everything('pubsub'))
|
||||
@server.stubs(:pubsub_pool).returns([ stub_everything('pubsub') ])
|
||||
|
||||
open_connection
|
||||
end
|
||||
|
|
|
@ -2,6 +2,7 @@ require 'stubs/user'
|
|||
|
||||
class TestConnection
|
||||
attr_reader :identifiers, :logger, :current_user, :transmissions
|
||||
attr_accessor :pubsub
|
||||
|
||||
def initialize(user = User.new("lifo"))
|
||||
@identifiers = [ :current_user ]
|
||||
|
|
|
@ -4,10 +4,12 @@ class TestServer
|
|||
include ActionCable::Server::Connections
|
||||
|
||||
attr_reader :logger, :config
|
||||
attr_accessor :pubsub_pool
|
||||
|
||||
def initialize
|
||||
@logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
|
||||
@config = OpenStruct.new(log_tags: [])
|
||||
@pubsub_pool = []
|
||||
end
|
||||
|
||||
def send_async
|
||||
|
|
Loading…
Reference in New Issue