Skip to content

Commit

Permalink
Projections redesigned
Browse files Browse the repository at this point in the history
Projection class is redesigned here. It now only suports read & reduce
based on existing events, without possibility to subscribe for events.

New & redesigned:
* replace `from_all_streams` & `from_stream` with passing read scopes
  to `call` method, it allows to fully use all read specification
  features to define what events should be handled
* instead of providing several streams (and starting points) to be
  processed `call` expects several read scopes
* `when` method replaced with `on` method, with usage consistent with
  `on` handlers (as in `AggregateRoot`), the `on` methods require block
  to process state & event and it must return new projection state
* allows to use simple values as initial state, no need to use hash to
  pass values, `nil` is the initial default state now
* initial state is passed to projection using costructor

Typical usage:

```ruby
account_balance =
  RailsEventStore::Projection
    .new(0.0)
    .on(MoneyDeposited) { |state, event| state += event.data[:amount] }
    .on(MoneyWithdrawn) { |state, event| state -= event.data[:amount] }
    .call(client.read)
```
  • Loading branch information
mpraglowski authored and mostlyobvious committed Jan 30, 2023
1 parent 5af6725 commit f76c43d
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 207 deletions.
89 changes: 20 additions & 69 deletions ruby_event_store/lib/ruby_event_store/projection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,94 +2,45 @@

module RubyEventStore
class Projection
private_class_method :new
ANONYMOUS_CLASS = "#<Class:".freeze

def self.from_stream(stream_or_streams)
streams = Array(stream_or_streams)
raise(ArgumentError, "At least one stream must be given") if streams.empty?
new(streams: streams)
end

def self.from_all_streams
new
end

def initialize(streams: [])
@streams = streams
def initialize(initial_state = nil)
@handlers = {}
@init = -> { {} }
@init = -> { initial_state }
end

attr_reader :streams, :handlers

def init(handler)
@init = handler
self
end
def on(*event_klasses, &block)
raise(ArgumentError, 'No handler block given') unless block_given?

def when(events, handler)
Array(events).each { |event| handlers[event.to_s] = handler }
event_klasses.each do |event_klass|
name = event_klass.to_s
raise(ArgumentError, 'Anonymous class is missing name') if name.start_with? ANONYMOUS_CLASS

@handlers[name] = ->(state, event) { block.call(state, event) }
end
self
end

def initial_state
@init.call
end

def current_state
@current_state ||= initial_state
end

def call(event)
handlers.fetch(event.event_type).(current_state, event)
end

def handled_events
handlers.keys
end

def run(event_store, start: nil, count: PAGE_SIZE)
def call(*scopes)
return initial_state if handled_events.empty?
streams.any? ? reduce_from_streams(event_store, start, count) : reduce_from_all_streams(event_store, start, count)
end

private

def valid_starting_point?(start)
return true unless start
streams.any? ? (start.instance_of?(Array) && start.size === streams.size) : start.instance_of?(String)
scopes.reduce(initial_state) do |state, scope|
scope.of_types(handled_events).reduce(state, &method(:transition))
end
end

def reduce_from_streams(event_store, start, count)
raise ArgumentError.new("Start must be an array with event ids") unless valid_starting_point?(start)
streams
.zip(start_events(start))
.reduce(initial_state) do |state, (stream_name, start_event_id)|
read_scope(event_store, stream_name, count, start_event_id).reduce(state, &method(:transition))
end
end

def reduce_from_all_streams(event_store, start, count)
raise ArgumentError.new("Start must be valid event id") unless valid_starting_point?(start)
read_scope(event_store, nil, count, start).reduce(initial_state, &method(:transition))
end
private

def read_scope(event_store, stream, count, start)
scope = event_store.read.in_batches(count)
scope = scope.of_type(handled_events)
scope = scope.stream(stream) if stream
scope = scope.from(start) if start
scope
def initial_state
@init.call
end

def start_events(start)
start ? start : Array.new
def handled_events
@handlers.keys
end

def transition(state, event)
handlers.fetch(event.event_type).call(state, event)
state
@handlers.fetch(event.event_type).call(state, event)
end
end
end
Loading

0 comments on commit f76c43d

Please sign in to comment.