Event Sourcing with AggregateRoot

Configuration

Choose your event store client. To do so add configuration in environment setup. Example using RailsEventStore:

AggregateRoot.configure do |config|
  config.default_event_store = RailsEventStore::Client.new
  # or
  config.default_event_store = Rails.configuration.event_store
end

Remember that this is only a default event store used by AggregateRoot module when no event store is given in load / store methods parameters.

Usage

To create a new aggregate domain object include AggregateRoot module.

class Order
  include AggregateRoot

  # ... more later
end

Define domain events

class OrderSubmitted < RailsEventStore::Event; end
class OrderExpired < RailsEventStore::Event; end

Define aggregate logic

class Order
  include AggregateRoot
  class HasBeenAlreadySubmitted < StandardError; end
  class HasExpired < StandardError; end

  def initialize
    @state = :new
    # any other code here
  end

  def submit
    raise HasBeenAlreadySubmitted if state == :submitted
    raise HasExpired if state == :expired
    apply OrderSubmitted.new(data: {delivery_date: Time.now + 24.hours})
  end

  def expire
    apply OrderExpired.new
  end

  private
  attr_reader :state

  def apply_order_submitted(event)
    @state = :submitted
  end

  def apply_order_expired(event)
    @state = :expired
  end
end

The convention is to use apply_ plus an underscored event class name for event handler methods. I.e. when you apply OrderExpired event, the apply_order_expired method is called.

Alternative syntax for event handler methods.

You can use class method on(event_klass, &method) for defining those methods alternatively. This is useful because you can more easily grep/find where events are used in your codebase.

class Order
  include AggregateRoot
  class HasBeenAlreadySubmitted < StandardError; end
  class HasExpired < StandardError; end

  def initialize
    @state = :new
    # any other code here
  end

  def submit
    raise HasBeenAlreadySubmitted if state == :submitted
    raise HasExpired if state == :expired
    apply OrderSubmitted.new(data: {delivery_date: Time.now + 24.hours})
  end

  def expire
    apply OrderExpired.new
  end

  on OrderSubmitted do |event|
    @state = :submitted
    @delivery_date = event.data.fetch(:delivery_date)
  end

  on OrderExpired do |event|
    @state = :expired
  end

  private

  attr_reader :state
end

Loading an aggregate root object from event store

stream_name = "Order$123"
order = Order.new.load(stream_name)

#load gets all domain events stored for the aggregate in event store and applies them in order to rebuild aggregate's state.

Storing an aggregate root's changes in event store

stream_name = "Order$123"
order = Order.new.load(stream_name)
order.submit
order.store

#store gets all unpublished aggregate's domain events (added by executing a domain logic method like submit) from unpublished_events and publishes them in order of creation to event store. If stream_name is not specified events will be stored in the same stream from which the aggregate has been loaded.

Overwriting default apply_strategy

You can change the way how aggregate methods are called in response to applied events. Let's say we want to call order_has_expired when OrderExpired event is applied. To achieve this we'll provide our implementation for the apply_strategy method:

class Order
  include AggregateRoot
  class HasBeenAlreadySubmitted < StandardError; end
  class HasExpired < StandardError; end

  def initialize
    @state = :new
  end

  def submit
    raise HasBeenAlreadySubmitted if state == :submitted
    raise HasExpired if state == :expired
    apply OrderSubmitted.new(data: {delivery_date: Time.now + 24.hours})
  end

  def expire
    apply OrderExpired.new
  end

  private
  attr_reader :state

  def apply_strategy
    ->(aggregate, event) do
      case event
      when OrderExpired
        order_has_expired
      when OrderSubmitted
        order_has_been_submitted
      else
        raise
      end
    end
  end

  def order_has_been_submitted(event)
    @state = :submitted
  end

  def order_has_expired(event)
    @state = :expired
  end
end

The apply_strategy method must return a callable, that responds to #call. We've used lambda in the example above. This lambda takes two arguments -- aggreate which in this case is self and a an event being applied.

The case statement is one way to implement such dispatch. The following example shows an equivalent implemented with Hash:

def apply_strategy
    ->(aggregate, event) do
      {
        OrderExpired => method(:order_has_been_submitted),
        OrderSubmitted => method(:order_has_expired),
      }.fetch(event.class, ->(event) { raise }).call(event)
    end
  end

  def order_has_been_submitted(event)
    @state = :submitted
  end

  def order_has_expired(event)
    @state = :expired
  end

API

Instance methods

Public

apply(*events)
load(stream_name, event_store: AggregateRoot.configuration.default_event_store)
store(stream_name = @loaded_from_stream_name, event_store: AggregateRoot.configuration.default_event_store)
unpublished_events()

Private

apply_strategy()
default_event_store()

Class methods

on(event_class, &method)

Resources

There're already few blog posts about building an event sourced applications with rails_event_store and aggregate_root gems:

Also this example app might be useful.