Skip to main content
Version: 3.0.0

Event Sourcing with AggregateRoot

Configuration

AggregateRoot::Repository needs an event store client to load and store aggregates. Pass it explicitly when you build the repository:

event_store = Rails.configuration.event_store
repository = AggregateRoot::Repository.new(event_store)

The event store is a required constructor argument — there is no global default.

Usage

To create a new aggregate domain object, include the AggregateRoot module.

class Order
include AggregateRoot

# ... more later
end

Define domain events

class OrderSubmitted < RubyEventStore::Event; end
class OrderExpired < RubyEventStore::Event; end

Define aggregate logic

class Order
include AggregateRoot
class HasBeenAlreadySubmitted < StandardError; end
class HasExpired < StandardError; end

def initialize
@state = :new
# any other code here
end

def submit
raise HasBeenAlreadySubmitted if state == :submitted
raise HasExpired if state == :expired
apply OrderSubmitted.new(data: {delivery_date: Time.now + 24.hours})
end

def expire
apply OrderExpired.new
end

on OrderSubmitted do |event|
@state = :submitted
@delivery_date = event.data.fetch(:delivery_date)
end

on OrderExpired do |event|
@state = :expired
end

private

attr_reader :state
end

Loading an aggregate root object from an event store

stream_name = "Order$123"
order = AggregateRoot::Repository.new(Rails.configuration.event_store).load(Order.new, stream_name)

To restore the state of your aggregate you need to use AggregateRoot::Repository. Repository's #load gets all domain events stored for the aggregate in the event store stream Order$123 and applies them to the newly created order object in order to rebuild the aggregate's state.

Storing an aggregate root's changes in an event store

stream_name = "Order$123"
repository = AggregateRoot::Repository.new(Rails.configuration.event_store)
order = repository.load(Order.new, stream_name)
order.submit
repository.store(order, stream_name)

Storing (publishing) aggregate changes is also performed by the AggregateRoot::Repository object. Repository's #store gets all the unpublished aggregate's domain events (added by executing a domain logic method like submit) from unpublished_events and publishes them in order of creation to the event store.

Simplify loading/storing aggregates

AggregateRoot::Repository delivers a convenient method to handle a typical workflow with aggregates. The with_aggregate method will load an aggregate from a given stream, yield a block to allow performing an action on the aggregate object (the aggregate object will be yielded as a block argument), and then publish all changes in aggregate to the event store provided to the repository.

stream_name = "Order$123"
repository = AggregateRoot::Repository.new(Rails.configuration.event_store)
repository.with_aggregate(Order.new, stream_name) do |order|
order.submit
end

You could also provide a specific repository for Order aggregate to make this code even better:

class OrderRepository
def initialize(event_store = Rails.configuration.event_store)
@repository = AggregateRoot::Repository.new(event_store)
end

def with_order(order_id, &block)
stream_name = "Order$#{order_id}"
repository.with_aggregate(Order.new, stream_name, &block)
end

private
attr_reader :repository
end

And then your code to submit an order might look like:

repository = OrderRepository.new
repository.with_order(123) do |order|
order.submit
end

Overwriting default apply_strategy

You can change the way how aggregate methods are called in response to applied events. Let's say we want to call order_has_expired when the OrderExpired event is applied. To achieve this, we'll provide our implementation for the apply_strategy method:

class Order
include AggregateRoot
class HasBeenAlreadySubmitted < StandardError; end
class HasExpired < StandardError; end

def initialize
@state = :new
end

def submit
raise HasBeenAlreadySubmitted if state == :submitted
raise HasExpired if state == :expired
apply OrderSubmitted.new(data: {delivery_date: Time.now + 24.hours})
end

def expire
apply OrderExpired.new
end

private
attr_reader :state

def apply_strategy
->(aggregate, event) do
case event
when OrderExpired
order_has_expired
when OrderSubmitted
order_has_been_submitted
else
raise
end
end
end

def order_has_been_submitted(event)
@state = :submitted
end

def order_has_expired(event)
@state = :expired
end
end

The apply_strategy method must return a callable that responds to #call. We've used lambda in the example above. This lambda takes two arguments -- aggreate which in this case is self and an event being applied.

The case statement is one way to implement such a dispatch. The following example shows an equivalent implementation with Hash:

def apply_strategy
->(aggregate, event) do
{
'OrderExpired' => method(:order_has_been_submitted),
'OrderSubmitted' => method(:order_has_expired),
}.fetch(event.event_type , ->(event) { raise }).call(event)
end
end

def order_has_been_submitted(event)
@state = :submitted
end

def order_has_expired(event)
@state = :expired
end

Resources

There're already a few blog posts about building event sourced applications with rails_event_store and aggregate_root gems:

Also this example app might be useful.