Rails Event Store

Event Sourcing with AggregateRoot

Configuration

Choose your event store client. To do so add configuration in your environment setup. Example using RailsEventStore:

AggregateRoot.configure do |config|
  config.default_event_store = RailsEventStore::Client.new
  # or
  config.default_event_store = Rails.configuration.event_store
end

Remember that this is only a default event store used by the AggregateRoot module to initialize AggregateRoot::Repository when no event store is given in the repository's constructor as an argument.

Usage

To create a new aggregate domain object, include the AggregateRoot module.

class Order
  include AggregateRoot

  # ... more later
end

Define domain events

class OrderSubmitted < RailsEventStore::Event; end
class OrderExpired < RailsEventStore::Event; end

Define aggregate logic

class Order
  include AggregateRoot
  class HasBeenAlreadySubmitted < StandardError; end
  class HasExpired < StandardError; end

  def initialize
    @state = :new
    # any other code here
  end

  def submit
    raise HasBeenAlreadySubmitted if state == :submitted
    raise HasExpired if state == :expired
    apply OrderSubmitted.new(data: {delivery_date: Time.now + 24.hours})
  end

  def expire
    apply OrderExpired.new
  end

  private
  attr_reader :state

  def apply_order_submitted(event)
    @state = :submitted
  end

  def apply_order_expired(event)
    @state = :expired
  end
end

The convention is to use apply_ plus an underscored event type (event.event_type what with RubyEventStore::Event is equal to event's class name)for event handler methods. I.e. when you apply the OrderExpired event, the apply_order_expired method is called.

Alternative syntax for event handler methods.

Alternatively, you can use the class method on(event_klass, &method) for defining those methods. This is useful because you can more easily grep/find where events are used in your codebase.

class Order
  include AggregateRoot
  class HasBeenAlreadySubmitted < StandardError; end
  class HasExpired < StandardError; end

  def initialize
    @state = :new
    # any other code here
  end

  def submit
    raise HasBeenAlreadySubmitted if state == :submitted
    raise HasExpired if state == :expired
    apply OrderSubmitted.new(data: {delivery_date: Time.now + 24.hours})
  end

  def expire
    apply OrderExpired.new
  end

  on OrderSubmitted do |event|
    @state = :submitted
    @delivery_date = event.data.fetch(:delivery_date)
  end

  on OrderExpired do |event|
    @state = :expired
  end

  private

  attr_reader :state
end

Loading an aggregate root object from an event store

stream_name = "Order$123"
order = AggregateRoot::Repository.new.load(Order.new, stream_name)

To restore the state of your aggregate you need to use AggregateRoot::Repository. Repository's #load gets all domain events stored for the aggregate in the event store stream Order$123 and applies them to the newly created order object in order to rebuild the aggregate's state.

Storing an aggregate root's changes in an event store

stream_name = "Order$123"
repository = AggregateRoot::Repository.new
order = repository.load(Order.new, stream_name)
order.submit
repository.store(order, stream_name)

Storing (publishing) aggregate changes is also performed by the AggregateRoot::Repository object. Repository's #store gets all the unpublished aggregate's domain events (added by executing a domain logic method like submit) from unpublished_events and publishes them in order of creation to the event store.

Simplify loading/storing aggregates

AggregateRoot::Repository delivers a convenient method to handle a typical workflow with aggregates. The with_aggregate method will load an aggregate from a given stream, yield a block to allow performing an action on the aggregate object (the aggregate object will be yielded as a block argument), and then publish all changes in aggregate to the event store provided to the repository.

stream_name = "Order$123"
repository = AggregateRoot::Repository.new
repository.with_aggregate(Order.new, stream_name) do |order|
  order.submit
end

You could also provide a specific repository for Order aggregate to make this code event better:

class OrderRepository
  def initialize(event_store = Rails.configuration.event_store)
    @repository = AggregateRoot::Repository.new(event_store)
  end

  def with_order(order_id, &block)
    stream_name = "Order$#{order_id}"
    repository.with_aggregate(Order.new, stream_name, &block)
  end

  private
  attr_reader :repository
end

And then your code to submit an order might look like:

repository = OrderRepository.new
repository.with_order(123) do |order|
  order.submit
end

Overwriting default apply_strategy

You can change the way how aggregate methods are called in response to applied events. Let's say we want to call order_has_expired when the OrderExpired event is applied. To achieve this, we'll provide our implementation for the apply_strategy method:

class Order
  include AggregateRoot
  class HasBeenAlreadySubmitted < StandardError; end
  class HasExpired < StandardError; end

  def initialize
    @state = :new
  end

  def submit
    raise HasBeenAlreadySubmitted if state == :submitted
    raise HasExpired if state == :expired
    apply OrderSubmitted.new(data: {delivery_date: Time.now + 24.hours})
  end

  def expire
    apply OrderExpired.new
  end

  private
  attr_reader :state

  def apply_strategy
    ->(aggregate, event) do
      case event
      when OrderExpired
        order_has_expired
      when OrderSubmitted
        order_has_been_submitted
      else
        raise
      end
    end
  end

  def order_has_been_submitted(event)
    @state = :submitted
  end

  def order_has_expired(event)
    @state = :expired
  end
end

The apply_strategy method must return a callable that responds to #call. We've used lambda in the example above. This lambda takes two arguments -- aggreate which in this case is self and an event being applied.

The case statement is one way to implement such a dispatch. The following example shows an equivalent implementation with Hash:

def apply_strategy
    ->(aggregate, event) do
      {
        'OrderExpired' => method(:order_has_been_submitted),
        'OrderSubmitted' => method(:order_has_expired),
      }.fetch(event.event_type , ->(event) { raise }).call(event)
    end
  end

  def order_has_been_submitted(event)
    @state = :submitted
  end

  def order_has_expired(event)
    @state = :expired
  end

Resources

There're already a few blog posts about building event sourced applications with rails_event_store and aggregate_root gems:

Also this example app might be useful.