Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast to others #191

Merged
merged 2 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## master

- Add support for broadcast options (e.g., `exclude_socket`). ([@palkan][])

- Add `batch_broadcasts` option to automatically batch broadcasts for code wrapped in Rails executor. ([@palkan][])

## 1.4.1 (2023-09-27)
Expand Down
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ if File.exist?(local_gemfile)
else
gem 'actioncable', '~> 7.0'
gem 'activerecord'
gem 'activejob'
end

gem 'sqlite3', '~> 1.3'
40 changes: 38 additions & 2 deletions docs/getting_started.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Getting Started with AnyCable on Rails

AnyCable initially was designed for Rails applications only.
AnyCable can be used as a drop-in replacement for the Action Cable server in Rails applications. It supports most Action Cable features (see [Compatibility](./compatibility.md) for more) and can be used with any Action Cable client. Moreover, AnyCable brings additional power-ups for your real-time features, such as [streams history support](../guides/reliable_streams.md) and API extensions (see [below](#action-cable-extensions)).

> See also the [demo](https://github.com/anycable/anycable_rails_demo/pull/2) of migrating from Action Cable to AnyCable.

Expand Down Expand Up @@ -95,7 +95,7 @@ Or you can use the environment variables (or anything else supported by [anyway_

AnyCable supports publishing [broadcast messages in batches](../ruby/broadcast_adapters.md#batching) (to reduce the number of round-trips and ensure delivery order). You can enable automatic batching of broadcasts by setting `ANYCABLE_BROADCAST_BATCHING=true` (or `broadcast_batching: true` in the config file).

Auto-batching uses [Rails executor](https://guides.rubyonrails.org/threading_and_code_execution.html#executor) under the hood, so broadcasts are aggregated within Rails _units of work_, such as HTTP requests, background jobs, etc.
Auto-batching uses [Rails executor]() under the hood, so broadcasts are aggregated within Rails _units of work_, such as HTTP requests, background jobs, etc.

### Server installation

Expand Down Expand Up @@ -183,6 +183,42 @@ AnyCable automatically integrates with Rails 7+ error reporting interface (`Rail

For earlier Rails versions, see [docs](../ruby/exceptions.md).

## Action Cable extensions

### Broadcast to others

AnyCable provides a functionality to deliver broadcasts to all clients except from the one initiated the action (e.g., when you need to broadcast a message to all users in a chat room except the one who sent the message).

> **NOTE:** This feature is not available in Action Cable. It relies on [Action Cable protocol extensions](../misc/action_cable_protocol.md) currently only supported by AnyCable.

To do so, you need to obtain a unique socket identifier. For example, using [AnyCable JS client](https://github.com/anycable/anycable-client), you can access it via the `cable.sessionId` property.

Then, you must attach this identifier to HTTP request as a `X-Socket-ID` header value. AnyCable Rails uses this value to populate the `AnyCable::Rails.current_socket_id` value. If this value is set, you can implement broadcasting to other using one of the following methods:

- Calling `ActionCable.server.broadcast stream, data, to_others: true`
- Calling `MyChannel.broadcast_to stream, data, to_others: true`

Finally, if you perform broadcasts indirectly, you can wrap the code with `AnyCable::Rails.broadcasting_to_others` to enable this feature. For example, when using Turbo Streams:

```ruby
AnyCable::Rails.broadcasting_to_others do
Turbo::StreamsChannel.broadcast_remove_to workspace, target: item
end
```

You can also pass socket ID explicitly (if obtained from another source):

```ruby
AnyCable::Rails.broadcasting_to_others(socket_id: my_socket_id) do
# ...
end

# or
ActionCable.server.broadcast stream, data, exclude_socket: my_socket_id
````

**IMPORTANT:** AnyCable Rails automatically pass the current socket ID to Active Job, so you can use `broadcast ..., to_others: true` in your background jobs without any additional configuration.

## Development and test

AnyCable is [compatible](compatibility.md) with the original Action Cable implementation; thus you can continue using Action Cable for development and tests.
Expand Down
1 change: 1 addition & 0 deletions gemfiles/rails51.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ source "https://rubygems.org"

gem "actioncable", "~> 5.1"
gem "activerecord"
gem "activejob"
gem "rspec-rails", "~> 4.0.0"
gem "sqlite3", "~> 1.3"

Expand Down
1 change: 1 addition & 0 deletions gemfiles/rails6.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ source "https://rubygems.org"

gem "actioncable", "~> 6.0"
gem "activerecord"
gem "activejob"
gem "rspec-rails", "~> 4.0.0"
gem "sqlite3", "~> 1.4"

Expand Down
1 change: 1 addition & 0 deletions gemfiles/rails60.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ source "https://rubygems.org"

gem "actioncable", "~> 6.0.0"
gem "activerecord"
gem "activejob"
gem "rspec-rails", "~> 4.0.0"
gem "sqlite3", "~> 1.4"

Expand Down
1 change: 1 addition & 0 deletions gemfiles/rails7.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ source "https://rubygems.org"

gem "actioncable", "~> 7.0"
gem "activerecord"
gem "activejob"
gem "rspec-rails", "~> 4.0.0"
gem "sqlite3", "~> 1.4"

Expand Down
7 changes: 5 additions & 2 deletions lib/action_cable/subscription_adapter/any_cable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ class AnyCable < Base
def initialize(*)
end

def broadcast(channel, payload)
::AnyCable.broadcast(channel, payload)
def broadcast(channel, payload, **options)
options.merge!(::AnyCable::Rails.current_broadcast_options || {})
to_others = options.delete(:to_others)
options[:exclude_socket] ||= ::AnyCable::Rails.current_socket_id if to_others
::AnyCable.broadcast(channel, payload, **options.compact)
end

def subscribe(*)
Expand Down
31 changes: 29 additions & 2 deletions lib/anycable/rails.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "anycable/rails/rack"

require "globalid"
require "active_support/core_ext/module/attribute_accessors_per_thread"

module AnyCable
# Rails handler for AnyCable
Expand All @@ -14,6 +15,9 @@ module Rails

ADAPTER_ALIASES = %w[any_cable anycable].freeze

thread_mattr_accessor :current_socket_id
thread_mattr_accessor :current_broadcast_options

class << self
def enabled?
adapter = ::ActionCable.server.config.cable&.fetch("adapter", nil)
Expand All @@ -24,6 +28,29 @@ def compatible_adapter?(adapter)
ADAPTER_ALIASES.include?(adapter)
end

def with_socket_id(socket_id)
old_socket_id, self.current_socket_id = current_socket_id, socket_id
yield
ensure
self.current_socket_id = old_socket_id
end

def with_broadcast_options(**options)
old_options = current_broadcast_options
self.current_broadcast_options = options.reverse_merge(old_options || {})
yield
ensure
self.current_broadcast_options = old_options
end

def broadcasting_to_others(socket_id: nil, &block)
if socket_id
with_socket_id(socket_id) { with_broadcast_options(to_others: true, &block) }
else
with_broadcast_options(to_others: true, &block)
end
end

# Serialize connection/channel state variable to string
# using GlobalID where possible or JSON (if json: true)
def serialize(obj, json: false)
Expand All @@ -50,9 +77,9 @@ def deserialize(str, json: false)
end

module Extension
def broadcast(channel, payload)
def broadcast(...)
super
::AnyCable.broadcast(channel, payload)
::AnyCable.broadcast(...)
end
end

Expand Down
23 changes: 23 additions & 0 deletions lib/anycable/rails/action_cable_ext/broadcast_options.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# frozen_string_literal: true

require "action_cable"

ActionCable::Server::Base.prepend(Module.new do
def broadcast(channel, payload, **options)
return super if options.empty?

AnyCable::Rails.with_broadcast_options(**options) do
super(channel, payload)
end
end
end)

ActionCable::Channel::Base.singleton_class.prepend(Module.new do
def broadcast_to(target, payload, **options)
return super if options.empty?

AnyCable::Rails.with_broadcast_options(**options) do
super(target, payload)
end
end
end)
3 changes: 2 additions & 1 deletion lib/anycable/rails/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
persistent_session_enabled: false,
embedded: false,
http_rpc_mount_path: nil,
batch_broadcasts: false
batch_broadcasts: false,
socket_id_header: "X-Socket-ID"
)
AnyCable::Config.ignore_options :access_logs_disabled, :persistent_session_enabled
8 changes: 4 additions & 4 deletions lib/anycable/rails/middlewares/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ def initialize(executor)
end

def call(method, message, metadata)
sid = metadata["sid"]

if ::Rails.respond_to?(:error)
executor.wrap do
sid = metadata["sid"]

::Rails.error.record(context: {method: method, payload: message.to_h, sid: sid}) do
yield
Rails.with_socket_id(sid) { yield }
end
end
else
executor.wrap { yield }
executor.wrap { Rails.with_socket_id(sid) { yield } }
end
end
end
Expand Down
13 changes: 13 additions & 0 deletions lib/anycable/rails/railtie.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "anycable/rails/action_cable_ext/connection"
require "anycable/rails/action_cable_ext/channel"
require "anycable/rails/action_cable_ext/remote_connections"
require "anycable/rails/action_cable_ext/broadcast_options"

require "anycable/rails/channel_state"
require "anycable/rails/connection_factory"
Expand Down Expand Up @@ -70,6 +71,18 @@ class Railtie < ::Rails::Railtie # :nodoc:
end
end

initializer "anycable.socket_id_tracking" do
ActiveSupport.on_load(:action_controller) do
require "anycable/rails/socket_id_tracking"
include AnyCable::Rails::SocketIdTrackingController
end

ActiveSupport.on_load(:active_job) do
require "anycable/rails/socket_id_tracking"
include AnyCable::Rails::SocketIdTrackingJob
end
end

initializer "anycable.connection_factory", after: "action_cable.set_configs" do |app|
ActiveSupport.on_load(:action_cable) do
app.config.to_prepare do
Expand Down
42 changes: 42 additions & 0 deletions lib/anycable/rails/socket_id_tracking.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# frozen_string_literal: true

module AnyCable
module Rails
module SocketIdTrackingController
extend ActiveSupport::Concern

included do
around_action :anycable_tracking_socket_id
end

private

def anycable_tracking_socket_id(&block)
Rails.with_socket_id(request.headers[AnyCable.config.socket_id_header], &block)
end
end

module SocketIdTrackingJob
extend ActiveSupport::Concern

attr_accessor :cable_socket_id

def serialize
return super unless Rails.current_socket_id

super.merge("cable_socket_id" => Rails.current_socket_id)
end

def deserialize(job_data)
super
self.cable_socket_id = job_data["cable_socket_id"]
end

included do
around_perform do |job, block|
Rails.with_socket_id(job.cable_socket_id, &block)
end
end
end
end
end
3 changes: 2 additions & 1 deletion spec/dummy/app/controllers/broadcasts_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ class BroadcastsController < ApplicationController
around_action :maybe_disable_auto_batching

def create
options = params[:to_others] ? {to_others: true} : {}
params[:count].to_i.times do |num|
ActionCable.server.broadcast "test", {count: num + 1}
ActionCable.server.broadcast "test", {count: num + 1}, **options
end

head :created
Expand Down
7 changes: 7 additions & 0 deletions spec/dummy/app/jobs/broadcast_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# frozen_string_literal: true

class BroadcastJob < ActiveJob::Base
def perform(stream, data, to_others = false)
ActionCable.server.broadcast(stream, data, to_others: to_others)
end
end
1 change: 1 addition & 0 deletions spec/dummy/config/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require_relative "boot"
require "action_controller/railtie"
require "action_cable/engine"
require "active_job/railtie"
require "global_id/railtie"
require "active_record/railtie"

Expand Down
Loading
Loading