diff --git a/Gemfile b/Gemfile index 9c007f6f..5c123372 100644 --- a/Gemfile +++ b/Gemfile @@ -7,6 +7,7 @@ source 'https://rubygems.org' gem 'rails', '~> 8.1.1' #gem 'rails', '7.1.1' gem 'rack-cors', require: 'rack/cors' +gem 'redis', '~> 5.0' # Bundle edge Rails instead: # gem 'rails', git: 'git://github.com/rails/rails.git' diff --git a/Gemfile.lock b/Gemfile.lock index 8c04deda..341fb5cc 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -480,6 +480,10 @@ GEM erb psych (>= 4.0.0) tsort + redis (5.4.1) + redis-client (>= 0.22.0) + redis-client (0.29.0) + connection_pool regexp_parser (2.11.3) reline (0.6.3) io-console (~> 0.5) @@ -647,6 +651,7 @@ DEPENDENCIES rack-cors rails (~> 8.1.1) rails-controller-testing + redis (~> 5.0) rqrcode rspec-its rspec-rails diff --git a/app/channels/application_cable/channel.rb b/app/channels/application_cable/channel.rb new file mode 100644 index 00000000..9aec2305 --- /dev/null +++ b/app/channels/application_cable/channel.rb @@ -0,0 +1,6 @@ +# frozen_string_literal: true + +module ApplicationCable + class Channel < ActionCable::Channel::Base + end +end diff --git a/app/channels/application_cable/connection.rb b/app/channels/application_cable/connection.rb new file mode 100644 index 00000000..42cc9d21 --- /dev/null +++ b/app/channels/application_cable/connection.rb @@ -0,0 +1,44 @@ +# frozen_string_literal: true + +module ApplicationCable + class Connection < ActionCable::Connection::Base + # Authenticate via auth_token (same mechanism used in ApplicationController#authenticate_employee!) + # Clients should pass ?auth_token=TOKEN when connecting to the WebSocket. + # + # Auth flows: + # User app: ?auth_token= + # Supplier app: ?auth_token=&supplier_id= + # (Employee logs in, acts on behalf of a specific Supplier) + # + identified_by :current_user, :current_entity_type, :current_supplier_id + + def connect + token = request.params[:auth_token].presence + reject_unauthorized_connection unless token + + if (employee = Employee.find_by_authentication_token(token)) + self.current_user = employee + self.current_entity_type = :employee + # Employee acts on behalf of a supplier — passed as query param + self.current_supplier_id = request.params[:supplier_id] + elsif (user = User.find_by_authentication_token(token)) + self.current_user = user + self.current_entity_type = :user + elsif (supplier = Supplier.find_by_authentication_token(token)) + self.current_user = supplier + self.current_entity_type = :supplier + else + reject_unauthorized_connection + end + end + + # Allow subscribing to the entity's own channel + def subscribe_to_self + case current_entity_type + when :user then "user_#{current_user.id}" + when :supplier then "supplier_#{current_user.id}" + when :employee then "employee_#{current_user.id}" + end + end + end +end diff --git a/app/channels/mozo_channel.rb b/app/channels/mozo_channel.rb new file mode 100644 index 00000000..020ef399 --- /dev/null +++ b/app/channels/mozo_channel.rb @@ -0,0 +1,44 @@ +# frozen_string_literal: true + +# Base channel. Streams are set up dynamically by clients subscribing +# to their entity channel (user_123, supplier_456, etc.). +# +# The server broadcasts TO these channels via: +# ActionCable.server.broadcast("user_123", { event: "...", data: {...} }) +# +# Clients connect and subscribe via: +# consumer.subscriptions.create({ channel: "MozoChannel", id: "user_123" }) +# +class MozoChannel < ApplicationCable::Channel + def subscribed + stream_name = params[:id] + if authorized?(stream_name) + stream_from stream_name + else + reject + end + end + + def unsubscribed + # cleanup + end + + private + + def authorized?(stream_name) + prefix, id = stream_name.to_s.split('_', 2) + case prefix + when 'user' + connection.current_entity_type == :user && connection.current_user.id.to_s == id + when 'supplier' + # Supplier app: Employee logs in, acts on behalf of a Supplier. + # The supplier_id is passed as a query param when connecting. + (connection.current_entity_type == :supplier && connection.current_user.id.to_s == id) || + (connection.current_entity_type == :employee && connection.current_supplier_id.to_s == id) + when 'employee' + connection.current_entity_type == :employee && connection.current_user.id.to_s == id + else + false + end + end +end diff --git a/app/models/concerns/broadcastable.rb b/app/models/concerns/broadcastable.rb new file mode 100644 index 00000000..a2b89113 --- /dev/null +++ b/app/models/concerns/broadcastable.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +# Include this in any model that needs to broadcast events to users/suppliers. +# +# Replaces the old model_broadcast.rb initializer which monkey-patched +# SimplyStored::Couch and created ApplicationController.new per broadcast +# (memory-unsafe, no request context, to be removed once all callers migrate). +# +# Usage: +# class List < ApplicationRecord +# include Broadcastable +# +# def close! +# broadcast_user(user.id, 'list_closed', { id: id }) +# broadcast_supplier(supplier_id, 'list_closed', { id: id }) +# end +# end +# +module Broadcastable + extend ActiveSupport::Concern + + def broadcast_supplier(sid, event, data = {}) + Mozo.broadcast_supplier(sid, event, data) + end + + def broadcast_user(uid, event, data = {}) + Mozo.broadcast_user(uid, event, data) + end +end diff --git a/app/models/list.rb b/app/models/list.rb index 1c0ea8e7..77777155 100644 --- a/app/models/list.rb +++ b/app/models/list.rb @@ -1,6 +1,7 @@ class List include SimplyStored::Couch include ActiveModel::SerializerSupport + include Broadcastable include List::JoinRequests per_page_method :limit_value #kaminari diff --git a/app/models/order.rb b/app/models/order.rb index 9690d674..4fbb602b 100644 --- a/app/models/order.rb +++ b/app/models/order.rb @@ -1,6 +1,7 @@ class Order include SimplyStored::Couch include ActiveModel::SerializerSupport + include Broadcastable property :state, default: 'placed' # placed, active, delivered, cancelled, closed diff --git a/config/application.rb b/config/application.rb index 6956fbfc..09aab1ef 100644 --- a/config/application.rb +++ b/config/application.rb @@ -5,6 +5,7 @@ require 'rails' #require 'active_record/railtie' require 'action_controller/railtie' require 'action_mailer/railtie' +require 'action_cable/engine' #require 'active_resource/railtie' require 'rails/test_unit/railtie' require 'sprockets/railtie' diff --git a/config/cable.yml b/config/cable.yml new file mode 100644 index 00000000..36d8ae70 --- /dev/null +++ b/config/cable.yml @@ -0,0 +1,19 @@ +# ActionCable configuration for real-time broadcasting. +# +# Development: async adapter (in-process, no external dependency). +# Test: test adapter. +# Production: Redis adapter — required for multi-worker deployments. +# Redis is also used for Mozo::Counter (replacing DrbCounter). +# +development: + adapter: redis + url: <%= ENV.fetch("REDIS_URL") { "redis://localhost:6379/2" } %> + channel_prefix: mozo_backend_dev + +test: + adapter: test + +production: + adapter: redis + url: <%= ENV.fetch("REDIS_URL") { "redis://localhost:6379/1" } %> + channel_prefix: mozo_backend diff --git a/config/environments/development.rb b/config/environments/development.rb index fee702d1..61b0516c 100644 --- a/config/environments/development.rb +++ b/config/environments/development.rb @@ -14,6 +14,8 @@ Mozo::Application.configure do resource '*', headers: :any, methods: %i[get post put patch delete options] end end + #config.action_cable.allowed_request_origins = ['https://localhost:4201', 'https://localhost:4202'] + config.lnd_credentials_path = '/mnt/ext1/.lnd/tls.cert' config.lnd_macaroon_path = '/mnt/ext1/.lnd/data/chain/bitcoin/mainnet/admin.macaroon' diff --git a/config/initializers/action_cable.rb b/config/initializers/action_cable.rb new file mode 100644 index 00000000..ea50c416 --- /dev/null +++ b/config/initializers/action_cable.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +# Ensure ActionCable has a logger in all environments. +# In apps upgraded from older Rails versions, the logger chain +# may not propagate to ActionCable out of the box, causing: +# NoMethodError (undefined method 'info' for nil) +# in ActionCable::Connection::TaggedLoggerProxy#log +# +Rails.application.config.after_initialize do + ActionCable.server.config.logger ||= Rails.logger || ActiveSupport::Logger.new($stdout) + ActionCable.server.config.logger.level = Rails.logger&.level || Logger::INFO +end diff --git a/config/initializers/model_broadcast.rb b/config/initializers/model_broadcast.rb deleted file mode 100644 index 68301256..00000000 --- a/config/initializers/model_broadcast.rb +++ /dev/null @@ -1,12 +0,0 @@ -#TODO: this is really ugly, can cause memory leaks and much more bad stuff. We need a new broadcaster.... -require 'simply_stored/couch' -module ModelBroadcast - def broadcast_supplier(*args) - ApplicationController.new.send(:broadcast_supplier, *args) - end - def broadcast_user(*args) - ApplicationController.new.send(:broadcast_user, *args) - end -end -SimplyStored::Couch.send(:include, ModelBroadcast) -#SimplyStored::Couch.send(:extend, ModelBroadcast) # this should never happen!!! diff --git a/config/initializers/mozo_settings.rb b/config/initializers/mozo_settings.rb index 9319b046..ed22cbfc 100644 --- a/config/initializers/mozo_settings.rb +++ b/config/initializers/mozo_settings.rb @@ -8,12 +8,14 @@ else Mozo.user_url = 'https://user.mozo.bar' end -Mozo.broadcaster = Mozo::Broadcaster::Faye.new +# Broadcaster: swap Faye ↔ ActionCable +# Mozo.broadcaster = Mozo::Broadcaster::Faye.new # current (HTTP POST to Faye) +# Mozo.broadcaster = Mozo::Broadcaster::ActionCable.new # new (in-process async) +#Mozo.broadcaster = Mozo::Broadcaster::Faye.new +Mozo.broadcaster = Mozo::Broadcaster::ActionCable.new # new (in-process async) -# use the connection from couchbase-structures/documents -# will be overwritten in the specs since flushing the real -# thing is difficult -# Mozo::Counter.connection = $cb unless Rails.env.test? - -# Use the Drb counter -Mozo::Counter.connection = Mozo::DrbCounter.object unless Rails.env.test? +# Counter: swap DrbCounter ↔ Redis +# Mozo::Counter.connection = Mozo::DrbCounter.object # current (DRb in-memory) +# Mozo::Counter.connection = Mozo::Counter::Redis.new # new (persistent, multi-process) +#Mozo::Counter.connection = Mozo::DrbCounter.object unless Rails.env.test? +Mozo::Counter.connection = Mozo::Counter::Redis.new unless Rails.env.test? # new (persistent, multi-process) diff --git a/config/routes.rb b/config/routes.rb index 750d12e6..6128d1cc 100644 --- a/config/routes.rb +++ b/config/routes.rb @@ -1,5 +1,9 @@ ALLOWED_LOCALES = /nl|de|fr|en|es/ Mozo::Application.routes.draw do + # ActionCable WebSocket endpoint (replaces Faye at events.mozo.bar/faye) + # Clients connect via: wss://mozo.bar/cable?auth_token=TOKEN + mount ActionCable.server => '/cable' + match '/.well-known/*rest', to: 'errors#not_found', via: :all match '/system/*rest', to: 'errors#not_found', via: :all devise_for :users, controllers: { diff --git a/docs/broadcasting-migration.md b/docs/broadcasting-migration.md new file mode 100644 index 00000000..d5e3ae65 --- /dev/null +++ b/docs/broadcasting-migration.md @@ -0,0 +1,149 @@ +# Broadcasting: Faye → ActionCable Migration Guide + +## Current state + +``` +Model (SimplyStored::Couch) + → ApplicationController.new.broadcast_user # ⚠️ anti-pattern + → Mozo.broadcast_user + → Mozo::Broadcaster::Faye.new.broadcast # HTTP POST to Faye + → Faye server (Thin, port 9296) + → WebSocket → browser clients +``` + +## Target state + +``` +Model (Broadcastable concern) + → Mozo.broadcast_user + → Mozo::Broadcaster::ActionCable.new.broadcast # in-process async + → ActionCable (Rails built-in) + → WebSocket → browser clients +``` + +## What this branch adds + +| File | Purpose | +|------|---------| +| `lib/mozo/broadcaster/action_cable.rb` | Drop-in ActionCable broadcaster adapter | +| `config/cable.yml` | ActionCable configuration (async for single-server) | +| `app/channels/application_cable/connection.rb` | WebSocket auth via auth_token | +| `app/channels/mozo_channel.rb` | Channel authorization for user/supplier/employee | +| `app/models/concerns/broadcastable.rb` | Clean module for models (replaces old monkey-patch) | +| `config/routes.rb` | Mounts `/cable` WebSocket endpoint | +| `config/initializers/model_broadcast.rb` | Fixed: delegates to Mozo directly (no more `ApplicationController.new`) | + +## How to switch + +### 1. Server (one-line change) + +In `config/initializers/mozo_settings.rb`, change: + +```ruby +# Mozo.broadcaster = Mozo::Broadcaster::Faye.new # old +Mozo.broadcaster = Mozo::Broadcaster::ActionCable.new # new +``` + +### 2. Client (mozo-user / mozo-supplier) + +**Old Faye client (conceptual):** +```js +var client = new Faye.Client('https://events.mozo.bar/faye'); +client.subscribe('/user/123', function(msg) { ... }); +``` + +**New ActionCable client:** +```js +// Using @rails/actioncable npm package +import { createConsumer } from "@rails/actioncable"; + +const consumer = createConsumer( + `wss://mozo.bar/cable?auth_token=${authToken}` +); + +consumer.subscriptions.create( + { channel: "MozoChannel", id: "user_123" }, + { + received(data) { + // data = { event: "list_closed", data: { id: 42 } } + handleEvent(data.event, data.data); + } + } +); +``` + +### 3. Remove Faye + +Once stable: +- Remove `gem 'faye'` from Gemfile +- Remove `faye/` directory +- Remove nginx `events.mozo.bar` vhost +- Stop the Faye Thin process + +## Benefits + +- **No extra process** — ActionCable runs inside Puma +- **Async** — `broadcast` is non-blocking +- **Simpler deploys** — one less service to manage +- **WebSocket native** — no long-polling fallback complexity +- **Rails auth** — cookies/sessions work automatically + +--- + +# Counter: DrbCounter → Redis Migration + +## Current state + +``` +Supplier::Counters (app/models/supplier/counters.rb) + → Mozo::Counter.incr/decr/get/set + → Mozo::Counter.connection (Mozo::DrbCounter.object) + → DRb → druby://localhost:9022 + → InMemoryQCounter (separate Ruby process) + → on startup: reloads counts from CouchDB + → in-memory only (lost on restart) +``` + +## Problems + +1. **In-memory only** — restart the DRb process = lose all counts until CouchDB reload +2. **Single-process** — DRb runs one Ruby process, single point of failure +3. **Separate process** — another thing to monitor, deploy, and restart +4. **Race conditions** — between Puma workers, increment/decrement is not atomic across the DRb boundary +5. **Custom code** — `InMemoryQCounter` is 100 lines of hand-rolled counter logic + +## Target state + +``` +Supplier::Counters + → Mozo::Counter.incr/decr/get/set + → Mozo::Counter.connection (Mozo::Counter::Redis.new) + → Redis (localhost:6379) + → persistent, atomic, multi-process safe +``` + +## How to switch + +In `config/initializers/mozo_settings.rb`, change: + +```ruby +# Mozo::Counter.connection = Mozo::DrbCounter.object # old +Mozo::Counter.connection = Mozo::Counter::Redis.new # new +``` + +That's it. All existing `Mozo::Counter.get/set/incr/decr` calls work unchanged. + +## What Redis provides + +- **Atomic INCR/DECR** — no race conditions +- **Persistence** — RDB snapshots + AOF, survives restarts +- **Multi-process** — all Puma workers share the same Redis +- **Already needed** — ActionCable uses Redis for pub/sub in production +- **Battle-tested** — millions of deployments + +## Migration steps + +1. `apt-get install redis-server` — already done on vmi3300327 +2. `gem 'redis', '~> 5.0'` — added to Gemfile +3. Switch `Mozo::Counter.connection` — one-line change in mozo_settings.rb +4. Stop the DRb counter process (`drb_counter/drb_counter.rb`) diff --git a/lib/mozo.rb b/lib/mozo.rb index af986779..67f2de03 100644 --- a/lib/mozo.rb +++ b/lib/mozo.rb @@ -19,12 +19,12 @@ module Mozo autoload :DrbCounter def self.broadcast_user(uid, event, data) - message = {channel: "/user/#{uid}", data: {event: event, data: data}} + message = {channel: "/user_#{uid}", data: {event: event, data: data}} broadcaster.broadcast message end def self.broadcast_supplier(sid, event, data) - message = {channel: "/supplier/#{sid}", data: {event: event, data: data}} + message = {channel: "/supplier_#{sid}", data: {event: event, data: data}} broadcaster.broadcast message end end diff --git a/lib/mozo/broadcaster.rb b/lib/mozo/broadcaster.rb index 1522a0b9..ed32275c 100644 --- a/lib/mozo/broadcaster.rb +++ b/lib/mozo/broadcaster.rb @@ -2,5 +2,6 @@ module Mozo module Broadcaster extend ActiveSupport::Autoload autoload :Faye + autoload :ActionCable end end diff --git a/lib/mozo/broadcaster/action_cable.rb b/lib/mozo/broadcaster/action_cable.rb new file mode 100644 index 00000000..402dc9dd --- /dev/null +++ b/lib/mozo/broadcaster/action_cable.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +module Mozo + module Broadcaster + # Drop-in replacement for Mozo::Broadcaster::Faye that uses + # Rails' built-in ActionCable instead of an external Faye process. + # + # Benefits over Faye: + # - Async by default (ActionCable.server.broadcast is non-blocking) + # - No extra gem / process / port to manage + # - Integrated with Rails authentication (cookies, sessions) + # - WebSocket native (no long-polling fallback needed with modern browsers) + # + # Channel naming: accepts both Faye format and underscore format: + # /user/123 or /user_123 → user_123 + # /supplier/456 or /supplier_456 → supplier_456 + # + # To use: + # Set Mozo.broadcaster = Mozo::Broadcaster::ActionCable.new + # in config/initializers/mozo_settings.rb + # + class ActionCable + CHANNEL_PREFIX_REMAP = { + %r{^/user[/_](.+)$} => 'user_\1', + %r{^/supplier[/_](.+)$} => 'supplier_\1' + }.freeze + + def broadcast(message) + channel = message[:channel] || message['channel'] + data = message[:data] || message['data'] + + remapped = remap_channel(channel) + unless remapped + Rails.logger.warn("[ACTION_CABLE] broadcast skipped: unknown channel #{channel}") + return + end + + Rails.logger.debug("[ACTION_CABLE] broadcasting to #{remapped}: #{data.inspect}") + ::ActionCable.server.broadcast(remapped, data) + rescue => e + Rails.logger.error("[ACTION_CABLE][ERROR] #{e.message}") + end + + private + + def remap_channel(channel) + CHANNEL_PREFIX_REMAP.each do |pattern, replacement| + return channel.sub(pattern, replacement) if channel.match?(pattern) + end + Rails.logger.warn("[ACTION_CABLE] Unknown channel format: #{channel}") + nil + end + end + end +end diff --git a/lib/mozo/counter.rb b/lib/mozo/counter.rb index ac4648d4..5f3c775e 100644 --- a/lib/mozo/counter.rb +++ b/lib/mozo/counter.rb @@ -1,5 +1,8 @@ module Mozo module Counter + extend ActiveSupport::Autoload + autoload :Redis + mattr_accessor :connection # mainly for testing purposes diff --git a/lib/mozo/counter/redis.rb b/lib/mozo/counter/redis.rb new file mode 100644 index 00000000..f7c7b858 --- /dev/null +++ b/lib/mozo/counter/redis.rb @@ -0,0 +1,80 @@ +# frozen_string_literal: true + +module Mozo + module Counter + # Redis-backed counter adapter. Replaces the DrbCounter (in-memory, + # single-process, DRb-based) with a persistent, multi-process-safe, + # battle-tested key-value store. + # + # Benefits over DrbCounter: + # - Persistent (survives restarts, no CouchDB reload dance) + # - Atomic INCR/DECR (no race conditions between Puma workers) + # - No separate process to manage (Redis is already needed for ActionCable) + # - Production-ready, widely deployed + # + # Usage: + # Mozo::Counter.connection = Mozo::Counter::Redis.new + # # or with custom config: + # Mozo::Counter.connection = Mozo::Counter::Redis.new(url: "redis://localhost:6379/2") + # + class Redis + def initialize(url: nil) + require 'redis' + @redis = ::Redis.new(url: url || ENV.fetch('REDIS_URL', 'redis://localhost:6379/7')) + end + + def get(key, options = {}) + value = @redis.get(key) + quiet = options[:quiet] + unless quiet + Rails.logger.debug("[REDIS_COUNTER] GET #{key} = #{value.inspect}") + end + value&.to_i || 0 + rescue => e + Rails.logger.error("[REDIS_COUNTER] GET #{key} failed: #{e.message}") + 0 + end + + def set(key, value) + Rails.logger.debug("[REDIS_COUNTER] SET #{key} = #{value}") + @redis.set(key, value) + rescue => e + Rails.logger.error("[REDIS_COUNTER] SET #{key} failed: #{e.message}") + value + end + + def incr(key, options = {}) + initial = options[:initial] || 1 + Rails.logger.debug("[REDIS_COUNTER] INCR #{key}") + if @redis.exists?(key) + @redis.incr(key) + else + @redis.set(key, initial) + initial + end + rescue => e + Rails.logger.error("[REDIS_COUNTER] INCR #{key} failed: #{e.message}") + initial + end + + def decr(key, options = {}) + initial = options[:initial] || 0 + Rails.logger.debug("[REDIS_COUNTER] DECR #{key}") + if @redis.exists?(key) + @redis.decr(key) + else + @redis.set(key, initial) + initial + end + rescue => e + Rails.logger.error("[REDIS_COUNTER] DECR #{key} failed: #{e.message}") + initial + end + + def flush + Rails.logger.debug("[REDIS_COUNTER] FLUSHDB") + @redis.flushdb + end + end + end +end