Compare commits
5 Commits
master
...
pubsub-poo
Author | SHA1 | Date |
---|---|---|
![]() |
7b987d07d5 | |
![]() |
2b79976abd | |
![]() |
1ec72a582d | |
![]() |
9cad56cfee | |
![]() |
5cb6d2e02d |
|
@ -48,8 +48,8 @@ module ActionCable
|
||||||
include InternalChannel
|
include InternalChannel
|
||||||
include Authorization
|
include Authorization
|
||||||
|
|
||||||
attr_reader :server, :env
|
attr_reader :server, :env, :pubsub
|
||||||
delegate :worker_pool, :pubsub, to: :server
|
delegate :worker_pool, to: :server
|
||||||
|
|
||||||
attr_reader :logger
|
attr_reader :logger
|
||||||
|
|
||||||
|
@ -62,6 +62,8 @@ module ActionCable
|
||||||
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
|
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
|
||||||
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
|
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
|
||||||
|
|
||||||
|
@pubsub = server.pubsub_pool.sample
|
||||||
|
|
||||||
@started_at = Time.now
|
@started_at = Time.now
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -122,7 +124,6 @@ module ActionCable
|
||||||
transmit ActiveSupport::JSON.encode(identifier: '_ping', message: Time.now.to_i)
|
transmit ActiveSupport::JSON.encode(identifier: '_ping', message: Time.now.to_i)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
protected
|
protected
|
||||||
# The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
|
# The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
|
||||||
def request
|
def request
|
||||||
|
|
|
@ -47,14 +47,18 @@ module ActionCable
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# The redis pubsub adapter used for all streams/broadcasting.
|
# The redis pubsub pool used for all streams/broadcasting.
|
||||||
def pubsub
|
def pubsub_pool
|
||||||
@pubsub ||= redis.pubsub
|
@pubsub_pool ||= redis_pool.map { |redis| redis.pubsub }
|
||||||
end
|
end
|
||||||
|
|
||||||
# The EventMachine Redis instance used by the pubsub adapter.
|
# The EventMachine Redis pool used by the pubsub adapter.
|
||||||
def redis
|
def redis_pool
|
||||||
@redis ||= EM::Hiredis.connect(config.redis[:url]).tap do |redis|
|
@redis_pool ||= Array.new(config.redis_pool_size) { initialize_redis }
|
||||||
|
end
|
||||||
|
|
||||||
|
def initialize_redis
|
||||||
|
EM::Hiredis.connect(config.redis[:url]).tap do |redis|
|
||||||
redis.on(:reconnect_failed) do
|
redis.on(:reconnect_failed) do
|
||||||
logger.info "[ActionCable] Redis reconnect failed."
|
logger.info "[ActionCable] Redis reconnect failed."
|
||||||
# logger.info "[ActionCable] Redis reconnected. Closing all the open connections."
|
# logger.info "[ActionCable] Redis reconnected. Closing all the open connections."
|
||||||
|
|
|
@ -6,7 +6,7 @@ module ActionCable
|
||||||
# in a Rails config initializer.
|
# in a Rails config initializer.
|
||||||
class Configuration
|
class Configuration
|
||||||
attr_accessor :logger, :log_tags
|
attr_accessor :logger, :log_tags
|
||||||
attr_accessor :connection_class, :worker_pool_size
|
attr_accessor :connection_class, :worker_pool_size, :redis_pool_size
|
||||||
attr_accessor :redis_path, :channels_path
|
attr_accessor :redis_path, :channels_path
|
||||||
attr_accessor :disable_request_forgery_protection, :allowed_request_origins
|
attr_accessor :disable_request_forgery_protection, :allowed_request_origins
|
||||||
|
|
||||||
|
@ -16,6 +16,7 @@ module ActionCable
|
||||||
|
|
||||||
@connection_class = ApplicationCable::Connection
|
@connection_class = ApplicationCable::Connection
|
||||||
@worker_pool_size = 100
|
@worker_pool_size = 100
|
||||||
|
@redis_pool_size = 20
|
||||||
|
|
||||||
@redis_path = Rails.root.join('config/redis/cable.yml')
|
@redis_path = Rails.root.join('config/redis/cable.yml')
|
||||||
@channels_path = Rails.root.join('app/channels')
|
@channels_path = Rails.root.join('app/channels')
|
||||||
|
|
|
@ -6,6 +6,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
|
||||||
class Connection < ActionCable::Connection::Base
|
class Connection < ActionCable::Connection::Base
|
||||||
identified_by :current_user
|
identified_by :current_user
|
||||||
attr_reader :websocket
|
attr_reader :websocket
|
||||||
|
attr_writer :pubsub
|
||||||
|
|
||||||
public :process_internal_message
|
public :process_internal_message
|
||||||
|
|
||||||
|
@ -28,7 +29,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
|
||||||
pubsub.expects(:unsubscribe_proc).with('action_cable/User#lifo', kind_of(Proc))
|
pubsub.expects(:unsubscribe_proc).with('action_cable/User#lifo', kind_of(Proc))
|
||||||
|
|
||||||
server = TestServer.new
|
server = TestServer.new
|
||||||
server.stubs(:pubsub).returns(pubsub)
|
server.stubs(:pubsub_pool).returns([ pubsub ])
|
||||||
|
|
||||||
open_connection server: server
|
open_connection server: server
|
||||||
close_connection
|
close_connection
|
||||||
|
@ -58,7 +59,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
|
||||||
protected
|
protected
|
||||||
def open_connection_with_stubbed_pubsub
|
def open_connection_with_stubbed_pubsub
|
||||||
server = TestServer.new
|
server = TestServer.new
|
||||||
server.stubs(:pubsub).returns(stub_everything('pubsub'))
|
server.stubs(:pubsub_pool).returns([ stub_everything('pubsub') ])
|
||||||
|
|
||||||
open_connection server: server
|
open_connection server: server
|
||||||
end
|
end
|
||||||
|
|
|
@ -25,7 +25,7 @@ class ActionCable::Connection::StringIdentifierTest < ActionCable::TestCase
|
||||||
protected
|
protected
|
||||||
def open_connection_with_stubbed_pubsub
|
def open_connection_with_stubbed_pubsub
|
||||||
@server = TestServer.new
|
@server = TestServer.new
|
||||||
@server.stubs(:pubsub).returns(stub_everything('pubsub'))
|
@server.stubs(:pubsub_pool).returns([ stub_everything('pubsub') ])
|
||||||
|
|
||||||
open_connection
|
open_connection
|
||||||
end
|
end
|
||||||
|
|
|
@ -2,6 +2,7 @@ require 'stubs/user'
|
||||||
|
|
||||||
class TestConnection
|
class TestConnection
|
||||||
attr_reader :identifiers, :logger, :current_user, :transmissions
|
attr_reader :identifiers, :logger, :current_user, :transmissions
|
||||||
|
attr_accessor :pubsub
|
||||||
|
|
||||||
def initialize(user = User.new("lifo"))
|
def initialize(user = User.new("lifo"))
|
||||||
@identifiers = [ :current_user ]
|
@identifiers = [ :current_user ]
|
||||||
|
|
|
@ -4,10 +4,12 @@ class TestServer
|
||||||
include ActionCable::Server::Connections
|
include ActionCable::Server::Connections
|
||||||
|
|
||||||
attr_reader :logger, :config
|
attr_reader :logger, :config
|
||||||
|
attr_accessor :pubsub_pool
|
||||||
|
|
||||||
def initialize
|
def initialize
|
||||||
@logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
|
@logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
|
||||||
@config = OpenStruct.new(log_tags: [])
|
@config = OpenStruct.new(log_tags: [])
|
||||||
|
@pubsub_pool = []
|
||||||
end
|
end
|
||||||
|
|
||||||
def send_async
|
def send_async
|
||||||
|
|
Loading…
Reference in New Issue