Event Sourcing with AggregateRoot

Configuration

Choose your event store client. To do so add configuration in environment setup. Example using RailsEventStore:

AggregateRoot.configure do |config|
  config.default_event_store = RailsEventStore::Client.new
  # or
  config.default_event_store = Rails.configuration.event_store
end

Remember that this is only a default event store used by AggregateRoot module to initialize AggregateRoot::Repository when no event store is given in repository's constructor as argument.

Usage

To create a new aggregate domain object include AggregateRoot module.

class Order
  include AggregateRoot

  # ... more later
end

Define domain events

class OrderSubmitted < RailsEventStore::Event; end
class OrderExpired < RailsEventStore::Event; end

Define aggregate logic

class Order
  include AggregateRoot
  class HasBeenAlreadySubmitted < StandardError; end
  class HasExpired < StandardError; end

  def initialize
    @state = :new
    # any other code here
  end

  def submit
    raise HasBeenAlreadySubmitted if state == :submitted
    raise HasExpired if state == :expired
    apply OrderSubmitted.new(data: {delivery_date: Time.now + 24.hours})
  end

  def expire
    apply OrderExpired.new
  end

  private
  attr_reader :state

  def apply_order_submitted(event)
    @state = :submitted
  end

  def apply_order_expired(event)
    @state = :expired
  end
end

The convention is to use apply_ plus an underscored event class name for event handler methods. I.e. when you apply OrderExpired event, the apply_order_expired method is called.

Alternative syntax for event handler methods.

You can use class method on(event_klass, &method) for defining those methods alternatively. This is useful because you can more easily grep/find where events are used in your codebase.

class Order
  include AggregateRoot
  class HasBeenAlreadySubmitted < StandardError; end
  class HasExpired < StandardError; end

  def initialize
    @state = :new
    # any other code here
  end

  def submit
    raise HasBeenAlreadySubmitted if state == :submitted
    raise HasExpired if state == :expired
    apply OrderSubmitted.new(data: {delivery_date: Time.now + 24.hours})
  end

  def expire
    apply OrderExpired.new
  end

  on OrderSubmitted do |event|
    @state = :submitted
    @delivery_date = event.data.fetch(:delivery_date)
  end

  on OrderExpired do |event|
    @state = :expired
  end

  private

  attr_reader :state
end

Loading an aggregate root object from event store

stream_name = "Order$123"
order = AggregateRoot::Repository.new.load(Order.new, stream_name)

To restore the state of your aggregate you need to use AggregateRoot::Repository. Repository's #load gets all domain events stored for the aggregate in event store stream Order$123 and applies them to newly created order object in order to rebuild aggregate's state.

Storing an aggregate root's changes in event store

stream_name = "Order$123"
repository = AggregateRoot::Repository.new
order = repository.load(Order.new, stream_name)
order.submit
repository.store(order, stream_name)

Also storing (publishing) aggregate changes is performed by AggregateRoot::Repository object. Repository's #store gets all unpublished aggregate's domain events (added by executing a domain logic method like submit) from unpublished_events and publishes them in order of creation to event store.

Simplify loading/storing aggregates

AggregateRoot::Repository delivers convinient method to handle typical flow of work with aggregates. The with_aggregate method will load an aggregate from given stream, yield block to allow perform action on aggregate object (aggregate object will be yielded as block argument) and then publish all changes in aggregate to event store provided to repository.

stream_name = "Order$123"
repository = AggregateRoot::Repository.new
repository.with_aggregate(Order.new, stream_name) do |order|
  order.submit
end

You could also privide specific repository for Order aggregate to make this code event better:

class OrderRepository
  def initialize(event_store = Rails.configuration.event_store)
    @repository = AggregateRoot::Repository.new(event_store)
  end

  def with_order(order_id, &block)
    stream_name = "Order$#{order_id}"
    repository.with_aggregate(Order.new, stream_name, &block)
  end

  private
  attr_reader :repository
end

And then you code to submit order maight look like:

repository = OrderRepository.new
repository.with_order(123) do |order|
  order.submit
end

Overwriting default apply_strategy

You can change the way how aggregate methods are called in response to applied events. Let's say we want to call order_has_expired when OrderExpired event is applied. To achieve this we'll provide our implementation for the apply_strategy method:

class Order
  include AggregateRoot
  class HasBeenAlreadySubmitted < StandardError; end
  class HasExpired < StandardError; end

  def initialize
    @state = :new
  end

  def submit
    raise HasBeenAlreadySubmitted if state == :submitted
    raise HasExpired if state == :expired
    apply OrderSubmitted.new(data: {delivery_date: Time.now + 24.hours})
  end

  def expire
    apply OrderExpired.new
  end

  private
  attr_reader :state

  def apply_strategy
    ->(aggregate, event) do
      case event
      when OrderExpired
        order_has_expired
      when OrderSubmitted
        order_has_been_submitted
      else
        raise
      end
    end
  end

  def order_has_been_submitted(event)
    @state = :submitted
  end

  def order_has_expired(event)
    @state = :expired
  end
end

The apply_strategy method must return a callable, that responds to #call. We've used lambda in the example above. This lambda takes two arguments -- aggreate which in this case is self and a an event being applied.

The case statement is one way to implement such dispatch. The following example shows an equivalent implemented with Hash:

def apply_strategy
    ->(aggregate, event) do
      {
        OrderExpired => method(:order_has_been_submitted),
        OrderSubmitted => method(:order_has_expired),
      }.fetch(event.class, ->(event) { raise }).call(event)
    end
  end

  def order_has_been_submitted(event)
    @state = :submitted
  end

  def order_has_expired(event)
    @state = :expired
  end

Resources

There're already few blog posts about building an event sourced applications with rails_event_store and aggregate_root gems:

Also this example app might be useful.