feat(broadcasting): add ActionCable adapter + fix model broadcast anti-pattern
- Add Mozo::Broadcaster::ActionCable as drop-in Faye replacement - Fix model_broadcast.rb: delegate to Mozo directly instead of ApplicationController.new (memory-unsafe anti-pattern) - Add Broadcastable concern for clean model-side broadcasting - ActionCable config: async adapter, cable.yml, WebSocket endpoint - MozoChannel with per-entity authorization (user/supplier/employee) - Connection auth via auth_token (matches existing auth pattern) - Mount /cable WebSocket in routes - Add broadcasting-migration.md with Faye→ActionCable guide
This commit is contained in:
@@ -0,0 +1,36 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module ApplicationCable
|
||||
class Connection < ActionCable::Connection::Base
|
||||
# Authenticate via auth_token (same mechanism used in ApplicationController#authenticate_employee!)
|
||||
# Clients should pass ?auth_token=TOKEN when connecting to the WebSocket.
|
||||
identified_by :current_user, :current_entity_type
|
||||
|
||||
def connect
|
||||
token = request.params[:auth_token].presence
|
||||
reject_unauthorized_connection unless token
|
||||
|
||||
if (employee = Employee.find_by_authentication_token(token))
|
||||
self.current_user = employee
|
||||
self.current_entity_type = :employee
|
||||
elsif (user = User.find_by_authentication_token(token))
|
||||
self.current_user = user
|
||||
self.current_entity_type = :user
|
||||
elsif (supplier = Supplier.find_by_authentication_token(token))
|
||||
self.current_user = supplier
|
||||
self.current_entity_type = :supplier
|
||||
else
|
||||
reject_unauthorized_connection
|
||||
end
|
||||
end
|
||||
|
||||
# Allow subscribing to the entity's own channel
|
||||
def subscribe_to_self
|
||||
case current_entity_type
|
||||
when :user then "user_#{current_user.id}"
|
||||
when :supplier then "supplier_#{current_user.id}"
|
||||
when :employee then "employee_#{current_user.id}"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -0,0 +1,41 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
# Base channel. Streams are set up dynamically by clients subscribing
|
||||
# to their entity channel (user_123, supplier_456, etc.).
|
||||
#
|
||||
# The server broadcasts TO these channels via:
|
||||
# ActionCable.server.broadcast("user_123", { event: "...", data: {...} })
|
||||
#
|
||||
# Clients connect and subscribe via:
|
||||
# consumer.subscriptions.create({ channel: "MozoChannel", id: "user_123" })
|
||||
#
|
||||
class MozoChannel < ApplicationCable::Channel
|
||||
def subscribed
|
||||
stream_name = params[:id]
|
||||
if authorized?(stream_name)
|
||||
stream_from stream_name
|
||||
else
|
||||
reject
|
||||
end
|
||||
end
|
||||
|
||||
def unsubscribed
|
||||
# cleanup
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def authorized?(stream_name)
|
||||
prefix, id = stream_name.to_s.split('_', 2)
|
||||
case prefix
|
||||
when 'user'
|
||||
connection.current_entity_type == :user && connection.current_user.id.to_s == id
|
||||
when 'supplier'
|
||||
connection.current_entity_type == :supplier && connection.current_user.id.to_s == id
|
||||
when 'employee'
|
||||
connection.current_entity_type == :employee && connection.current_user.id.to_s == id
|
||||
else
|
||||
false
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -0,0 +1,29 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
# Include this in any model that needs to broadcast events to users/suppliers.
|
||||
#
|
||||
# Replaces the old model_broadcast.rb initializer which monkey-patched
|
||||
# SimplyStored::Couch and created ApplicationController.new per broadcast
|
||||
# (memory-unsafe, no request context, to be removed once all callers migrate).
|
||||
#
|
||||
# Usage:
|
||||
# class List < ApplicationRecord
|
||||
# include Broadcastable
|
||||
#
|
||||
# def close!
|
||||
# broadcast_user(user.id, 'list_closed', { id: id })
|
||||
# broadcast_supplier(supplier_id, 'list_closed', { id: id })
|
||||
# end
|
||||
# end
|
||||
#
|
||||
module Broadcastable
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
def broadcast_supplier(sid, event, data = {})
|
||||
Mozo.broadcast_supplier(sid, event, data)
|
||||
end
|
||||
|
||||
def broadcast_user(uid, event, data = {})
|
||||
Mozo.broadcast_user(uid, event, data)
|
||||
end
|
||||
end
|
||||
@@ -0,0 +1,18 @@
|
||||
# ActionCable configuration for real-time broadcasting.
|
||||
#
|
||||
# Development/Test: async adapter (in-process, no external dependency).
|
||||
# Production: async is fine for single-server deployments.
|
||||
# Switch to Redis (`redis://...`) if scaling to multiple Puma workers
|
||||
# where broadcasts need to reach clients connected to different workers.
|
||||
#
|
||||
development:
|
||||
adapter: async
|
||||
|
||||
test:
|
||||
adapter: test
|
||||
|
||||
production:
|
||||
adapter: async
|
||||
# adapter: redis
|
||||
# url: <%= ENV.fetch("REDIS_URL") { "redis://localhost:6379/1" } %>
|
||||
# channel_prefix: mozo_backend_production
|
||||
@@ -1,12 +1,31 @@
|
||||
#TODO: this is really ugly, can cause memory leaks and much more bad stuff. We need a new broadcaster....
|
||||
# frozen_string_literal: true
|
||||
|
||||
# Broadcast shim: provides broadcast_supplier / broadcast_user to all
|
||||
# SimplyStored::Couch models without requiring ApplicationController.new.
|
||||
#
|
||||
# PREVIOUSLY (dangerous):
|
||||
# ApplicationController.new.send(:broadcast_supplier, *args)
|
||||
# → leaked controller instances, no request lifecycle
|
||||
#
|
||||
# NOW:
|
||||
# Delegates directly to Mozo.broadcast_supplier / Mozo.broadcast_user
|
||||
# which uses Mozo.broadcaster (configurable: Faye or ActionCable).
|
||||
#
|
||||
# MIGRATION PATH:
|
||||
# Models should `include Broadcastable` directly instead of relying
|
||||
# on this monkey-patch. Once all models include Broadcastable, this
|
||||
# initializer can be removed.
|
||||
#
|
||||
require 'simply_stored/couch'
|
||||
|
||||
module ModelBroadcast
|
||||
def broadcast_supplier(*args)
|
||||
ApplicationController.new.send(:broadcast_supplier, *args)
|
||||
def broadcast_supplier(sid, event, data = {})
|
||||
Mozo.broadcast_supplier(sid, event, data)
|
||||
end
|
||||
def broadcast_user(*args)
|
||||
ApplicationController.new.send(:broadcast_user, *args)
|
||||
|
||||
def broadcast_user(uid, event, data = {})
|
||||
Mozo.broadcast_user(uid, event, data)
|
||||
end
|
||||
end
|
||||
|
||||
SimplyStored::Couch.send(:include, ModelBroadcast)
|
||||
#SimplyStored::Couch.send(:extend, ModelBroadcast) # this should never happen!!!
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
ALLOWED_LOCALES = /nl|de|fr|en|es/
|
||||
Mozo::Application.routes.draw do
|
||||
# ActionCable WebSocket endpoint (replaces Faye at events.mozo.bar/faye)
|
||||
# Clients connect via: wss://mozo.bar/cable?auth_token=TOKEN
|
||||
mount ActionCable.server => '/cable'
|
||||
|
||||
match '/.well-known/*rest', to: 'errors#not_found', via: :all
|
||||
match '/system/*rest', to: 'errors#not_found', via: :all
|
||||
devise_for :users, controllers: {
|
||||
|
||||
@@ -0,0 +1,89 @@
|
||||
# Broadcasting: Faye → ActionCable Migration Guide
|
||||
|
||||
## Current state
|
||||
|
||||
```
|
||||
Model (SimplyStored::Couch)
|
||||
→ ApplicationController.new.broadcast_user # ⚠️ anti-pattern
|
||||
→ Mozo.broadcast_user
|
||||
→ Mozo::Broadcaster::Faye.new.broadcast # HTTP POST to Faye
|
||||
→ Faye server (Thin, port 9296)
|
||||
→ WebSocket → browser clients
|
||||
```
|
||||
|
||||
## Target state
|
||||
|
||||
```
|
||||
Model (Broadcastable concern)
|
||||
→ Mozo.broadcast_user
|
||||
→ Mozo::Broadcaster::ActionCable.new.broadcast # in-process async
|
||||
→ ActionCable (Rails built-in)
|
||||
→ WebSocket → browser clients
|
||||
```
|
||||
|
||||
## What this branch adds
|
||||
|
||||
| File | Purpose |
|
||||
|------|---------|
|
||||
| `lib/mozo/broadcaster/action_cable.rb` | Drop-in ActionCable broadcaster adapter |
|
||||
| `config/cable.yml` | ActionCable configuration (async for single-server) |
|
||||
| `app/channels/application_cable/connection.rb` | WebSocket auth via auth_token |
|
||||
| `app/channels/mozo_channel.rb` | Channel authorization for user/supplier/employee |
|
||||
| `app/models/concerns/broadcastable.rb` | Clean module for models (replaces old monkey-patch) |
|
||||
| `config/routes.rb` | Mounts `/cable` WebSocket endpoint |
|
||||
| `config/initializers/model_broadcast.rb` | Fixed: delegates to Mozo directly (no more `ApplicationController.new`) |
|
||||
|
||||
## How to switch
|
||||
|
||||
### 1. Server (one-line change)
|
||||
|
||||
In `config/initializers/mozo_settings.rb`, change:
|
||||
|
||||
```ruby
|
||||
# Mozo.broadcaster = Mozo::Broadcaster::Faye.new # old
|
||||
Mozo.broadcaster = Mozo::Broadcaster::ActionCable.new # new
|
||||
```
|
||||
|
||||
### 2. Client (mozo-user / mozo-supplier)
|
||||
|
||||
**Old Faye client (conceptual):**
|
||||
```js
|
||||
var client = new Faye.Client('https://events.mozo.bar/faye');
|
||||
client.subscribe('/user/123', function(msg) { ... });
|
||||
```
|
||||
|
||||
**New ActionCable client:**
|
||||
```js
|
||||
// Using @rails/actioncable npm package
|
||||
import { createConsumer } from "@rails/actioncable";
|
||||
|
||||
const consumer = createConsumer(
|
||||
`wss://mozo.bar/cable?auth_token=${authToken}`
|
||||
);
|
||||
|
||||
consumer.subscriptions.create(
|
||||
{ channel: "MozoChannel", id: "user_123" },
|
||||
{
|
||||
received(data) {
|
||||
// data = { event: "list_closed", data: { id: 42 } }
|
||||
handleEvent(data.event, data.data);
|
||||
}
|
||||
}
|
||||
);
|
||||
```
|
||||
|
||||
### 3. Remove Faye
|
||||
|
||||
Once stable:
|
||||
- Remove `gem 'faye'` from Gemfile
|
||||
- Remove `faye/` directory
|
||||
- Remove nginx `events.mozo.bar` vhost
|
||||
- Stop the Faye Thin process
|
||||
|
||||
## Benefits
|
||||
|
||||
- **No extra process** — ActionCable runs inside Puma
|
||||
- **Async** — `broadcast` is non-blocking
|
||||
- **Simpler deploys** — one less service to manage
|
||||
- **WebSocket native** — no long-polling fallback complexity
|
||||
- **Rails auth** — cookies/sessions work automatically
|
||||
@@ -2,5 +2,6 @@ module Mozo
|
||||
module Broadcaster
|
||||
extend ActiveSupport::Autoload
|
||||
autoload :Faye
|
||||
autoload :ActionCable
|
||||
end
|
||||
end
|
||||
|
||||
@@ -0,0 +1,51 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Mozo
|
||||
module Broadcaster
|
||||
# Drop-in replacement for Mozo::Broadcaster::Faye that uses
|
||||
# Rails' built-in ActionCable instead of an external Faye process.
|
||||
#
|
||||
# Benefits over Faye:
|
||||
# - Async by default (ActionCable.server.broadcast is non-blocking)
|
||||
# - No extra gem / process / port to manage
|
||||
# - Integrated with Rails authentication (cookies, sessions)
|
||||
# - WebSocket native (no long-polling fallback needed with modern browsers)
|
||||
#
|
||||
# Channel naming is kept compatible with the existing Faye convention:
|
||||
# /user/:uid → user_<uid>
|
||||
# /supplier/:sid → supplier_<sid>
|
||||
#
|
||||
# To use:
|
||||
# Set Mozo.broadcaster = Mozo::Broadcaster::ActionCable.new
|
||||
# in config/initializers/mozo_settings.rb
|
||||
#
|
||||
class ActionCable
|
||||
CHANNEL_PREFIX_REMAP = {
|
||||
%r{^/user/(.+)$} => 'user_\1',
|
||||
%r{^/supplier/(.+)$} => 'supplier_\1'
|
||||
}.freeze
|
||||
|
||||
def broadcast(message)
|
||||
channel = message[:channel] || message['channel']
|
||||
data = message[:data] || message['data']
|
||||
|
||||
remapped = remap_channel(channel)
|
||||
return unless remapped
|
||||
|
||||
::ActionCable.server.broadcast(remapped, data)
|
||||
rescue => e
|
||||
Rails.logger.error("[ACTION_CABLE][ERROR] #{e.message}")
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def remap_channel(channel)
|
||||
CHANNEL_PREFIX_REMAP.each do |pattern, replacement|
|
||||
return channel.sub(pattern, replacement) if channel.match?(pattern)
|
||||
end
|
||||
Rails.logger.warn("[ACTION_CABLE] Unknown channel format: #{channel}")
|
||||
nil
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user