Compare commits

...

5 Commits

Author SHA1 Message Date
Pratik Naik 7b987d07d5 Fix the test mocks 2015-10-20 17:36:53 -05:00
Pratik Naik 2b79976abd Merge branch 'master' into pubsub-pool
Conflicts:
	lib/action_cable/channel/streams.rb
	test/channel/stream_test.rb
	test/connection/identifier_test.rb
	test/connection/string_identifier_test.rb
	test/test_helper.rb
2015-10-20 17:34:28 -05:00
Pratik Naik 1ec72a582d Merge branch 'master' into pubsub-pool 2015-10-15 12:43:07 -05:00
Pratik Naik 9cad56cfee Log the time it takes to subscribe to a redis channel 2015-10-15 12:40:45 -05:00
Pratik Naik 5cb6d2e02d Use a pool of pubsub connections instead of just one connection 2015-10-15 11:35:47 -05:00
7 changed files with 23 additions and 13 deletions

View File

@ -48,8 +48,8 @@ module ActionCable
include InternalChannel
include Authorization
attr_reader :server, :env
delegate :worker_pool, :pubsub, to: :server
attr_reader :server, :env, :pubsub
delegate :worker_pool, to: :server
attr_reader :logger
@ -62,6 +62,8 @@ module ActionCable
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@pubsub = server.pubsub_pool.sample
@started_at = Time.now
end
@ -122,7 +124,6 @@ module ActionCable
transmit ActiveSupport::JSON.encode(identifier: '_ping', message: Time.now.to_i)
end
protected
# The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
def request

View File

@ -47,14 +47,18 @@ module ActionCable
end
end
# The redis pubsub adapter used for all streams/broadcasting.
def pubsub
@pubsub ||= redis.pubsub
# The redis pubsub pool used for all streams/broadcasting.
def pubsub_pool
@pubsub_pool ||= redis_pool.map { |redis| redis.pubsub }
end
# The EventMachine Redis instance used by the pubsub adapter.
def redis
@redis ||= EM::Hiredis.connect(config.redis[:url]).tap do |redis|
# The EventMachine Redis pool used by the pubsub adapter.
def redis_pool
@redis_pool ||= Array.new(config.redis_pool_size) { initialize_redis }
end
def initialize_redis
EM::Hiredis.connect(config.redis[:url]).tap do |redis|
redis.on(:reconnect_failed) do
logger.info "[ActionCable] Redis reconnect failed."
# logger.info "[ActionCable] Redis reconnected. Closing all the open connections."

View File

@ -6,7 +6,7 @@ module ActionCable
# in a Rails config initializer.
class Configuration
attr_accessor :logger, :log_tags
attr_accessor :connection_class, :worker_pool_size
attr_accessor :connection_class, :worker_pool_size, :redis_pool_size
attr_accessor :redis_path, :channels_path
attr_accessor :disable_request_forgery_protection, :allowed_request_origins
@ -16,6 +16,7 @@ module ActionCable
@connection_class = ApplicationCable::Connection
@worker_pool_size = 100
@redis_pool_size = 20
@redis_path = Rails.root.join('config/redis/cable.yml')
@channels_path = Rails.root.join('app/channels')

View File

@ -6,6 +6,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
class Connection < ActionCable::Connection::Base
identified_by :current_user
attr_reader :websocket
attr_writer :pubsub
public :process_internal_message
@ -28,7 +29,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
pubsub.expects(:unsubscribe_proc).with('action_cable/User#lifo', kind_of(Proc))
server = TestServer.new
server.stubs(:pubsub).returns(pubsub)
server.stubs(:pubsub_pool).returns([ pubsub ])
open_connection server: server
close_connection
@ -58,7 +59,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
protected
def open_connection_with_stubbed_pubsub
server = TestServer.new
server.stubs(:pubsub).returns(stub_everything('pubsub'))
server.stubs(:pubsub_pool).returns([ stub_everything('pubsub') ])
open_connection server: server
end

View File

@ -25,7 +25,7 @@ class ActionCable::Connection::StringIdentifierTest < ActionCable::TestCase
protected
def open_connection_with_stubbed_pubsub
@server = TestServer.new
@server.stubs(:pubsub).returns(stub_everything('pubsub'))
@server.stubs(:pubsub_pool).returns([ stub_everything('pubsub') ])
open_connection
end

View File

@ -2,6 +2,7 @@ require 'stubs/user'
class TestConnection
attr_reader :identifiers, :logger, :current_user, :transmissions
attr_accessor :pubsub
def initialize(user = User.new("lifo"))
@identifiers = [ :current_user ]

View File

@ -4,10 +4,12 @@ class TestServer
include ActionCable::Server::Connections
attr_reader :logger, :config
attr_accessor :pubsub_pool
def initialize
@logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
@config = OpenStruct.new(log_tags: [])
@pubsub_pool = []
end
def send_async