Using Rails Event Store as a pub-sub message bus

Defining an event

Firstly, you have to define an event class extending RailsEventStore::Event class.

class OrderCancelled < RailsEventStore::Event
end

Configuring RES client

# config/application.rb
module YourAppName
  class Application < Rails::Application
    config.to_prepare do
      Rails.configuration.event_store = RailsEventStore::Client.new
    end
  end
end

Publishing

class CancelOrdersService
  def call(order_id, user_id)
    order = Order.find_by!(
      customer_id: user_id,
      order_id: order_id,
    )
    order.cancel!
    event_store.publish_event(
      OrderCancelled.new(data: {
        order_id: order.id,
        customer_id: order.customer_id,
      }),
      stream_name: "Order-#{order.id}"
    )
  end

  private

  def event_store
    Rails.configuration.event_store
  end
end

Any class with access to Rails.configuration.event_store can publish events to subscribed listeners (aka subscribers or handlers). That can be a Rails model or a service object. Whatever you like.

Listeners subscribe, at runtime (or during configuration phase) to the publisher.

Subscribing

Objects

Any object responding to call can be subscribed as an event handler.

cancel_order = CancelOrder.new
event_store  = Rails.configuration.event_store
listener     = OrderNotifier.new

event_store.subscribe(listener, [OrderCancelled]) do
  cancel_order.call(order_id, user_id)
end

The listener would need to implement the call method. If it needs to handle more than one event, it can distinguish them based on their class.

class OrderNotifier
  def call(event)
    order_id = order.data.fetch(:order_id)
    case event
    when OrderCancelled
      # notify someone...
    else
      raise "not supported event #{event.inspect}"
    end
  end
end

Blocks

cancel_order = CancelOrder.new
event_store  = Rails.configuration.event_store

event_store.subscribe(
    -> (event) {
      Rails.logger.warn(event.inspect)
    },
    [OrderCancelled]
  ) do
  cancel_order.call(order_id, user_id)
end

Nicer API was proposed. Check it out.

Global event subscribers (a.k.a. handlers/listeners)

# config/application.rb
module YourAppName
  class Application < Rails::Application
    config.to_prepare do
      Rails.configuration.event_store = event_store = RailsEventStore::Client.new
      event_store.subscribe(
        OrderNotifier.new,
        [OrderCancelled]
      )
    end
  end
end

Make sure to read about fresh handler state to avoid potential issues from using the same listener for many published events.

Handling Events Asynchronously

Asynchronous handlers are described in Async handlers section