Skip to main content
Version: 1.3.1

Correlation and Causation

Debugging can be one of the challenges when building asynchronous, evented systems. Why did this happen, what caused all of that?. But there are patterns which might make your life easier. We just need to keep track of what is happening as a result of what.

For that, you can use 2 metadata attributes associated with events you are going to publish.

Let's hear what Greg Young says about correlation_id and causation_id:

Let's say every message has 3 ids. 1 is its id. Another is correlation, the last is causation. If you are responding to a message, you copy its correlation id as your correlation id, its message id is your causation id. This allows you to see an entire conversation (correlation id) or to see what causes what (causation id).

Now, the message that you are responding to can be either a command or an event which triggered some event handlers and probably caused even more events.

CorrelationAndCausationEventsCommands

Correlating one event with another

class MyEventHandler
def call(previous_event)
new_event = MyEvent.new(data: { foo: "bar" })
new_event.correlate_with(previous_event)

event_store.publish(new_event)
end

private

def event_store
Rails.configuration.event_store
end
end

After using correlate_with you can access UUIDs of related events via two getters:

new_event.correlation_id

and

new_event.causation_id

They are also available as:

new_event.metadata[:correlation_id]
new_event.metadata[:causation_id]

This is however not necessary for sync handlers. Events published from sync handlers are by default correlated with events that caused them.

Correlating events published from async handlers

Events published from async handlers are not correlated with events that caused them by default. To enable that functionality you need to prepend RailsEventStore::CorrelatedHandler

class SendOrderEmail < ActiveJob::Base
prepend RailsEventStore::CorrelatedHandler
prepend RailsEventStore::AsyncHandler

def perform(event)
event_store.publish(HappenedLater.new(data: { user_id: event.data.fetch(:user_id) }))
end

private

def event_store
Rails.configuration.event_store
end
end

Correlating an event with a command

If your command responds to correlation_id (can even always be nil) and message_id you can correlate your events also with commands.

class ApproveOrder < Struct.new(:order_id, :message_id, :correlation_id)
end

command = ApproveOrder.new("KTXBN123", SecureRandom.uuid, nil)
event = OrderApproved.new(data: { foo: "bar" })
event.correlate_with(command)

Correlating multiple events

class MyEventHandler
def call(previous_event)
event_store.with_metadata(
correlation_id: previous_event.correlation_id || previous_event.event_id,
causation_id: previous_event.event_id,
) { event_store.publish([MyEvent.new(data: { foo: "bar" }), AnotherEvent.new(data: { baz: "bax" })]) }
end
end

Correlating together events with commands, and commands with events from sync handlers

If you use event store and command bus you can correlate together both kinds of messages: events & commands.

config.to_prepare do
Rails.configuration.event_store = event_store = RailsEventStore::Client.new

# register handlers

command_bus = Arkency::CommandBus.new

# register commands...

# wire event_store and command_bus together
Rails.configuration.command_bus = RubyEventStore::CorrelatedCommands.new(event_store, command_bus)
end

Using CorrelatedCommands makes your events automatically correlated to the commands which triggered them (commands must respond to message_id method).

If your commands respond to correlate_with method they will be correlated to events which triggered them inside sync handlers.

Example:

module CorrelableCommand
attr_accessor :correlation_id, :causation_id

def correlate_with(other_message)
self.correlation_id = other_message.correlation_id || other_message.message_id
self.causation_id = other_message.message_id
end
end

class AddProductCommand < Struct.new(:message_id, :product_id)
include CorrelableCommand

def initialize(product_id:, message_id: SecureRandom.uuid)
super(message_id, product_id)
end
end

Building streams based on correlation id and causation id

You can use RailsEventStore::LinkByCorrelationId (RubyEventStore::LinkByCorrelationId) and RailsEventStore::LinkByCausationId (RubyEventStore::LinkByCausationId) to build streams of all events with certain correlation or causation id. This makes debugging and making sense of a large process easier to see.

Rails.application.configure do
config.to_prepare do
Rails.configuration.event_store = event_store = RailsEventStore::Client.new
event_store.subscribe_to_all_events(RailsEventStore::LinkByCorrelationId.new)
event_store.subscribe_to_all_events(RailsEventStore::LinkByCausationId.new)
end
end

After publishing an event:

event = OrderPlaced.new
event_store.publish(event)

you can read events caused by it:

event_store.read.stream("$by_causation_id_#{event.event_id}")

and events correlated with it:

event_store.read.stream("$by_correlation_id_#{event.correlation_id || event.event_id}")

Thanks

Image thanks to Arkency blog