Correlation and Causation
Debugging can be one of the challenges when building asynchronous, evented systems. Why did this happen, what caused all of that?. But there are patterns which might make your life easier. We just need to keep track of what is happening as a result of what.
For that, you can use 2 metadata attributes associated with events you are going to publish.
Let's hear what Greg Young says about correlation_id
and causation_id
:
Let's say every message has 3 ids. 1 is its id. Another is correlation, the last is causation. If you are responding to a message, you copy its correlation id as your correlation id, its message id is your causation id. This allows you to see an entire conversation (correlation id) or to see what causes what (causation id).
Now, the message that you are responding to can be either a command or an event which triggered some event handlers and probably caused even more events.
Correlating one event with another
class MyEventHandler
def call(previous_event)
new_event = MyEvent.new(data: { foo: "bar" })
new_event.correlate_with(previous_event)
event_store.publish(new_event)
end
private
def event_store
Rails.configuration.event_store
end
end
After using correlate_with
you can access UUIDs of related events via two getters:
new_event.correlation_id
and
new_event.causation_id
They are also available as:
new_event.metadata[:correlation_id]
new_event.metadata[:causation_id]
This is however not necessary for sync handlers. Events published from sync handlers are by default correlated with events that caused them.
Correlating events published from async handlers
Events published from async handlers are not correlated with events that caused them by default. To enable that functionality you need to prepend RailsEventStore::CorrelatedHandler
class SendOrderEmail < ActiveJob::Base
prepend RailsEventStore::CorrelatedHandler
prepend RailsEventStore::AsyncHandler
def perform(event)
event_store.publish(HappenedLater.new(data: { user_id: event.data.fetch(:user_id) }))
end
private
def event_store
Rails.configuration.event_store
end
end
Correlating an event with a command
If your command responds to correlation_id
(can even always be nil
) and message_id
you can correlate your events also with commands.
class ApproveOrder < Struct.new(:order_id, :message_id, :correlation_id)
end
command = ApproveOrder.new("KTXBN123", SecureRandom.uuid, nil)
event = OrderApproved.new(data: { foo: "bar" })
event.correlate_with(command)
Correlating multiple events
class MyEventHandler
def call(previous_event)
event_store.with_metadata(
correlation_id: previous_event.correlation_id || previous_event.event_id,
causation_id: previous_event.event_id,
) { event_store.publish([MyEvent.new(data: { foo: "bar" }), AnotherEvent.new(data: { baz: "bax" })]) }
end
end
Correlating together events with commands, and commands with events from sync handlers
If you use event store and command bus you can correlate together both kinds of messages: events & commands.
config.to_prepare do
Rails.configuration.event_store = event_store = RailsEventStore::Client.new
# register handlers
command_bus = Arkency::CommandBus.new
# register commands...
# wire event_store and command_bus together
Rails.configuration.command_bus = RubyEventStore::CorrelatedCommands.new(event_store, command_bus)
end
Using CorrelatedCommands
makes your events automatically correlated to the commands which triggered them (commands must respond to message_id
method).
If your commands respond to correlate_with
method they will be correlated to events which triggered them inside sync handlers.
Example:
module CorrelableCommand
attr_accessor :correlation_id, :causation_id
def correlate_with(other_message)
self.correlation_id = other_message.correlation_id || other_message.message_id
self.causation_id = other_message.message_id
end
end
class AddProductCommand < Struct.new(:message_id, :product_id)
include CorrelableCommand
def initialize(product_id:, message_id: SecureRandom.uuid)
super(message_id, product_id)
end
end
Building streams based on correlation id and causation id
You can use RailsEventStore::LinkByCorrelationId
(RubyEventStore::LinkByCorrelationId
) and RailsEventStore::LinkByCausationId
(RubyEventStore::LinkByCausationId
) to build streams of all events with certain correlation or causation id. This makes debugging and making sense of a large process easier to see.
Rails.application.configure do
config.to_prepare do
Rails.configuration.event_store = event_store = RailsEventStore::Client.new
event_store.subscribe_to_all_events(RailsEventStore::LinkByCorrelationId.new)
event_store.subscribe_to_all_events(RailsEventStore::LinkByCausationId.new)
end
end
After publishing an event:
event = OrderPlaced.new
event_store.publish(event)
you can read events caused by it:
event_store.read.stream("$by_causation_id_#{event.event_id}")
and events correlated with it:
event_store.read.stream("$by_correlation_id_#{event.correlation_id || event.event_id}")
Thanks
Image thanks to Arkency blog