Ruby heartbeats (#654)

* :heartbeat option

* Make heartbeat global configuration

* Document heartbeat

* Bump version to 1.0.0.beta2
This commit is contained in:
Ismael Celis 2025-02-12 15:21:19 +00:00 committed by GitHub
parent 5c890ecf09
commit defedfa44d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 152 additions and 15 deletions

View File

@ -170,7 +170,10 @@ datastar.on_client_connect do
end
```
This callback's behaviour depends on the configured [heartbeat](#heartbeat)
#### `on_server_disconnect`
Register server-side code to run when the connection is closed by the server.
Ie when the served is done streaming without errors.
@ -188,15 +191,64 @@ datastar.on_error do |exception|
Sentry.notify(exception)
end
```
Note that this callback can be registered globally, too.
Note that this callback can be [configured globally](#global-configuration), too.
### heartbeat
By default, streaming responses (using the `#stream` block) launch a background thread/fiber to periodically check the connection.
This is because the browser could have disconnected during a long-lived, idle connection (for example waiting on an event bus).
The default heartbeat is 3 seconds, and it will close the connection and trigger [on_client_disconnect](#on_client_disconnect) callbacks if the client has disconnected.
In cases where a streaming block doesn't need a heartbeat and you want to save precious threads (for example a regular ticker update, ie non-idle), you can disable the heartbeat:
```ruby
datastar = Datastar.new(request:, response:, view_context:, heartbeat: false)
datastar.stream do |sse|
100.times do |i|
sleep 1
sse.merge_signals count: i
end
end
```
You can also set it to a different number (in seconds)
```ruby
heartbeat: 0.5
```
#### Manual connection check
If you want to check connection status on your own, you can disable the heartbeat and use `sse.check_connection!`, which will close the connection and trigger callbacks if the client is disconnected.
```ruby
datastar = Datastar.new(request:, response:, view_context:, heartbeat: false)
datastar.stream do |sse|
# The event bus implementaton will check connection status when idle
# by calling #check_connection! on it
EventBus.subscribe('channel', sse) do |event|
sse.merge_signals eventName: event.name
end
end
```
### Global configuration
```ruby
Datastar.configure do |config|
# Global on_error callback
# Can be overriden on specific instances
config.on_error do |exception|
Sentry.notify(exception)
end
# Global heartbeat interval (or false, to disable)
# Can be overriden on specific instances
config.heartbeat = 0.3
end
```
@ -274,7 +326,7 @@ From this library's root, run the bundled-in test Rack app:
bundle puma examples/test.ru
```
Now run the test bash scripts in the `test` directory in this repo.
Now run the test bash scripts in the `sdk/test` directory in this repo.
```bash
./test-all.sh http://localhost:9292

View File

@ -32,13 +32,15 @@ module Datastar
class Configuration
NOOP_CALLBACK = ->(_error) {}
RACK_FINALIZE = ->(_view_context, response) { response.finish }
DEFAULT_HEARTBEAT = 3
attr_accessor :executor, :error_callback, :finalize
attr_accessor :executor, :error_callback, :finalize, :heartbeat
def initialize
@executor = ThreadExecutor.new
@error_callback = NOOP_CALLBACK
@finalize = RACK_FINALIZE
@heartbeat = DEFAULT_HEARTBEAT
end
def on_error(callable = nil, &block)

View File

@ -35,13 +35,15 @@ module Datastar
# @option executor [Object] the executor object to use for managing threads and queues
# @option error_callback [Proc] the callback to call when an error occurs
# @option finalize [Proc] the callback to call when the response is finalized
# @option heartbeat [Integer, nil, FalseClass] the heartbeat interval in seconds
def initialize(
request:,
response: nil,
view_context: nil,
executor: Datastar.config.executor,
error_callback: Datastar.config.error_callback,
finalize: Datastar.config.finalize
finalize: Datastar.config.finalize,
heartbeat: Datastar.config.heartbeat
)
@on_connect = []
@on_client_disconnect = []
@ -61,6 +63,10 @@ module Datastar
@response.headers['X-Accel-Buffering'] = 'no'
@response.delete_header 'Content-Length'
@executor.prepare(@response)
raise ArgumentError, ':heartbeat must be a number' if heartbeat && !heartbeat.is_a?(Numeric)
@heartbeat = heartbeat
@heartbeat_on = false
end
# Check if the request accepts SSE responses
@ -124,7 +130,7 @@ module Datastar
# @param fragments [String, #call(view_context: Object) => Object] the HTML fragment or object
# @param options [Hash] the options to send with the message
def merge_fragments(fragments, options = BLANK_OPTIONS)
stream do |sse|
stream_no_heartbeat do |sse|
sse.merge_fragments(fragments, options)
end
end
@ -138,7 +144,7 @@ module Datastar
# @param selector [String] a CSS selector for the fragment to remove
# @param options [Hash] the options to send with the message
def remove_fragments(selector, options = BLANK_OPTIONS)
stream do |sse|
stream_no_heartbeat do |sse|
sse.remove_fragments(selector, options)
end
end
@ -152,7 +158,7 @@ module Datastar
# @param signals [Hash] signals to merge
# @param options [Hash] the options to send with the message
def merge_signals(signals, options = BLANK_OPTIONS)
stream do |sse|
stream_no_heartbeat do |sse|
sse.merge_signals(signals, options)
end
end
@ -166,7 +172,7 @@ module Datastar
# @param paths [Array<String>] object paths to the signals to remove
# @param options [Hash] the options to send with the message
def remove_signals(paths, options = BLANK_OPTIONS)
stream do |sse|
stream_no_heartbeat do |sse|
sse.remove_signals(paths, options)
end
end
@ -180,7 +186,7 @@ module Datastar
# @param script [String] the script to execute
# @param options [Hash] the options to send with the message
def execute_script(script, options = BLANK_OPTIONS)
stream do |sse|
stream_no_heartbeat do |sse|
sse.execute_script(script, options)
end
end
@ -190,7 +196,7 @@ module Datastar
#
# @param url [String] the URL or path to redirect to
def redirect(url)
stream do |sse|
stream_no_heartbeat do |sse|
sse.redirect(url)
end
end
@ -237,6 +243,15 @@ module Datastar
def stream(streamer = nil, &block)
streamer ||= block
@streamers << streamer
if @heartbeat && !@heartbeat_on
@heartbeat_on = true
@streamers << proc do |sse|
while true
sleep @heartbeat
sse.check_connection!
end
end
end
body = if @streamers.size == 1
stream_one(streamer)
@ -250,6 +265,14 @@ module Datastar
private
def stream_no_heartbeat(&block)
was = @heartbeat
@heartbeat = false
stream(&block).tap do
@heartbeat = was
end
end
# Produce a response body for a single stream
# In this case, the SSE generator can write directly to the socket
#
@ -300,11 +323,12 @@ module Datastar
handling_errors(conn_generator, socket) do
done_count = 0
threads_size = @heartbeat_on ? threads.size - 1 : threads.size
while (data = @queue.pop)
if data == :done
done_count += 1
@queue << nil if done_count == threads.size
@queue << nil if done_count == threads_size
elsif data.is_a?(Exception)
raise data
else

View File

@ -39,6 +39,13 @@ module Datastar
@view_context = view_context
end
# Sometimes we'll want to run periodic checks to ensure the connection is still alive
# ie. the browser hasn't disconnected
# For example when idle listening on an event bus.
def check_connection!
@stream << MSG_END
end
def merge_fragments(fragments, options = BLANK_OPTIONS)
# Support Phlex components
# And Rails' #render_in interface

View File

@ -1,5 +1,5 @@
# frozen_string_literal: true
module Datastar
VERSION = '1.0.0.beta.1'
VERSION = '1.0.0.beta.2'
end

View File

@ -331,12 +331,12 @@ RSpec.describe Datastar::Dispatcher do
describe '#stream' do
it 'writes multiple events to socket' do
socket = TestSocket.new
dispatcher.stream do |sse|
sse.merge_fragments %(<div id="foo">\n<span>hello</span>\n</div>\n)
sse.merge_signals(foo: 'bar')
end
socket = TestSocket.new
dispatcher.response.body.call(socket)
expect(socket.open).to be(false)
expect(socket.lines.size).to eq(2)
@ -373,6 +373,44 @@ RSpec.describe Datastar::Dispatcher do
end
end
specify ':heartbeat enabled' do
dispatcher = Datastar.new(request:, response:, heartbeat: 0.001)
connected = true
block_called = false
dispatcher.on_client_disconnect { |conn| connected = false }
socket = TestSocket.new
allow(socket).to receive(:<<).with("\n\n").and_raise(Errno::EPIPE, 'Socket closed')
dispatcher.stream do |sse|
sleep 10
block_called = true
end
dispatcher.response.body.call(socket)
expect(connected).to be(false)
expect(block_called).to be(false)
end
specify ':heartbeat disabled' do
dispatcher = Datastar.new(request:, response:, heartbeat: false)
connected = true
block_called = false
dispatcher.on_client_disconnect { |conn| connected = false }
socket = TestSocket.new
allow(socket).to receive(:<<).with("\n\n").and_raise(Errno::EPIPE, 'Socket closed')
dispatcher.stream do |sse|
sleep 0.001
block_called = true
end
dispatcher.response.body.call(socket)
expect(connected).to be(true)
expect(block_called).to be(true)
end
specify '#signals' do
request = build_request(
%(/events),
@ -400,8 +438,6 @@ RSpec.describe Datastar::Dispatcher do
sse.merge_signals(foo: 'bar')
end
socket = TestSocket.new
# allow(socket).to receive(:<<).and_raise(Errno::EPIPE, 'Socket closed')
#
dispatcher.response.body.call(socket)
expect(connected).to be(true)
end
@ -422,6 +458,22 @@ RSpec.describe Datastar::Dispatcher do
expect(events).to eq([true, false])
end
specify '#check_connection triggers #on_client_disconnect' do
events = []
dispatcher
.on_connect { |conn| events << true }
.on_client_disconnect { |conn| events << false }
dispatcher.stream do |sse|
sse.check_connection!
end
socket = TestSocket.new
allow(socket).to receive(:<<).with("\n\n").and_raise(Errno::EPIPE, 'Socket closed')
dispatcher.response.body.call(socket)
expect(events).to eq([true, false])
end
specify '#on_server_disconnect' do
events = []
dispatcher