An exploration of patterns for building composable data pipelines in Ruby, from the basics to the possibly YAGNI.
Function composition
Ruby’s function composition allows you to neatly chain Procs together using the #>>
operator.
DISCOUNT = 200
substract_discount = ->(amount) { amount - DISCOUNT }
TAX_RATE = 0.19
add_tax = ->(amount) { amount * (1 + TAX_RATE) }
calculate_total = substract_discount >> add_tax
calculate_total.call(1000) # 952.0
#>>
(and its inverse, #<<
) are implemented in procs and method objects, so it’s possible to write class-based steps.
class Discount
def initialize(discount)
@discount = discount
end
def call(amount)
amount - @discount
end
def >>(other)
method(:call) >> other
end
end
calculate_total = Discount.new(200) >> add_tax
calculate_total.call(1000) # 952.0
A problem: error handling
Let’s say we want to validate that discounts aren’t greater than amounts, and treat that case as an error scenario.
DISCOUNT = 200
substract_discount = ->(amount) {
if DISCOUNT > amount
# What now?
else
amount - DISCOUNT
end
}
What now indeed. We could raise an exception.
if DISCOUNT > amount
raise DiscountGreaterThanAmountError, \
"discount of #{DISCOUNT} is greater than amount #{amount}"
But that means that client code needs to be aware of all the possible error cases.
begin
result = calculate_total.call(100)
rescue DiscountGreaterThanAmountError => ex
# handle error here
end
Add specialised exceptions for different steps, and things get unwieldy.
rescue DiscountGreaterThanAmountError
rescue AmountTooSmallError
rescue AmountIsNotNumberError
rescue Etc
You get the idea. Pipelines should allow you to treat each step as interchangeable little black boxes. Handling errors in this way just leaks individual step details to the client code.
We could instead return a special error object.
if DISCOUNT > amount
Error.new('discount is greater than amount')
else
amount - DISCOUNT
end
The problem here is that this forces all steps downstream to handle errors.
add_tax = ->(amount_or_error) {
if amount_or_error.is_a?(Error)
# pass the error as-is?
amount_or_error
else
amount_or_error * (1 + TAX_RATE)
end
}
There’s a better way.
Railway oriented pipelines
Let’s take a step back (pun intended). The happy-path examples above work because all steps in the pipeline expect the same type (numeric, in this case). But we now want to incorporate errors into our possible results. We need to wrap our values in a uniform interface that supports expressing errors. That’s a result object.
class Result
attr_reader :value
def initialize(value)
@value = value
end
class Success < self
end
class Failure < self
attr_reader :error
def initialize(value = nil, error = nil)
super(value)
@error = error
end
end
end
A result wraps a value and exposes it as #value
.
Subclass Result::Success
represents a successful result. Result::Failure
is the failed case and also includes an optional #error
.
Let’s now refactor our pipeline steps to take a Result::Success
as argument, and return a Result::Success
or Result::Failure
.
# new interface: #call(Result::Success) Result::Success | Result::Failure
substract_discount = ->(result) {
if DISCOUNT > result.value
Result::Failure.new(result.value, 'discount is greater than amount')
else
Result::Success.new(result.value - DISCOUNT)
end
}
And the same for add_tax
.
Now the key: each Result
subclass implements a #map(callable)
Result interface.
class Success < self
def map(callable)
callable.call(self)
end
end
class Failure < self
def map(callable)
self
end
end
What’s all this about? It means that we can now chain pipeline steps using result sub types as glue.
# Happy path
Result::Success.new(1000)
.map(substract_discount)
.map(add_tax)
# Returns Success(952)
# Error path
Result::Success.new(100)
.map(substract_discount)
.map(add_tax)
# Returns Failure(100, 'discount is greater than amount')
Note that, in the error case, add_tax
was never applied. This is because substract_discount
returns a Result::Failure
, which then maps to add_tax
as
def map(add_tax)
self
end
In other words, a failure result returns itself without ever calling the next step. This means that the first failure encountered short-circuits the pipeline, forwarding the failure object all the way to its other end. This is sometimes called Railway oriented programming, because it conceptually separates data flow into distinct success and failure “tracks”.
In the (professionally drawn) diagram above, once R2 returns a failure, it is propagated to the end of the pipeline, skipping R3, R4 and R5.
The FP-curious reader might notice that this Result implementation is a type of monad. Depending on your domain you might want to use an Option type instead, for example when processing lists where the output can be either a new list (“Some”) or an empty list (“None”). See this nice blog post for more in-depth Ruby examples. You might also be reminded of Promises in various languages, which share a lot with this approach.
Back to declarative
Great, result objects give us a generic way to handle errors at any point in pipelines, but we’ve lost the ability to compose steps declaratively, for reuse or configuration , using the #>>
operator. Let’s add that back in.
But before that, let’s add a helper to wrap regular values into Result::Success
instances.
class Result
def self.wrap(value)
value.is_a?(Result) ? value : Success.new(value)
end
end
# Usage:
# result = Result.wrap(1000) # returns a Result::Success
# result.value # 1000
We’ll have a Chain
class to map two callables via Result#map
.
class Chain
def initialize(left_callable, right_callable)
@left_callable = left_callable
@right_callable = right_callable
end
def call(result)
Result.wrap(result).map(@left_callable).map(@right_callable)
end
end
It glues two steps together as a single callable exposing the same #call(Success) Success | Failure
interface.
A successful result from the first step is piped as input to the second one.
Failures are returned as-is.
calculate_total = Chain.new(substract_discount, add_tax)
calculate_total.call(1000) # Success(952.0)
calculate_total.call(100) # Failure(100, 'Discount is greater than amount')
Now we’ll create a Chainable
mixin to implement #>>
.
module Chainable
def >>(other)
Chain.new(self, other)
end
end
# Chain is chainable
class Chain
include Chainable
# ..etc
end
Chains can produce new chains:
calculate_total = Chain.new(substract_discount, add_tax)
total_with_offer = calculate_total >> add_special_offer
Finally, we’ll create a Step
class to wrap our custom steps and make them chainable.
class Step
include Chainable
# Accept a block, or anything that responds to #call
def initialize(callable = nil, &block)
@callable = callable || block
end
def call(result)
@callable.call(Result.wrap(result))
end
end
This gives us:
substract_discount = Step.new do |result|
if DISCOUNT > result.value
Result::Failure.new(result.value, 'discount is greater than amount')
else
Result::Success.new(result.value - DISCOUNT)
end
end
add_tax = Step.new { |result| ... etc }
calculate_total = substract_discount >> add_tax
# produces Chain(substract_discount, add_tax)
calculate_total.call(1000) # Success(952.0)
Note that any callable can be made pipeline-compatible.
custom_step = Step.new(MyCustomCallable.new)
pipeline = some_step >> custom_step >> some_other_step
Some syntax sugar
Since instantiating Result::Success
or Result::Failure
will be commonplace within step implementations, let’s add a convenience to Result::Success
.
class Success < self
# ...etc
def success(value)
Success.new(value)
end
def failure(val = value, error)
Failure.new(val, error)
end
end
This is just so that we have the shorter result.success(new_value)
and result.failure('something bad happened')
available in our steps.
Note that these additions are only required in Result::Success
, as that’s the only type ever passed to step callables.
Possibility 1: declarative Pipeline class
We can add some extra infrastructure to have portable pipeline definitions.
pipeline1 = Pipeline.new.tap do |pl|
# register steps as callables
pl.step Discount.new(200)
pl.step Tax.new(0.19)
# ... or blocks
pl.step do |result|
Logger.info "Got #{result.inspect}"
result
end
end
result = pipeline1.call(1000) # Result::Success(952.0)
The implementation goes something like this.
class Pipeline
def initialize
# Start with a no-op step
@chain = Step.new { |result| result }
end
def step(callable = nil, &block)
@chain = @chain >> Step.new(callable || block)
end
def call(value)
@chain.call(value)
end
end
Pipeline
is itself composable, since it implements the same #call(Success) Success | Failure
interface.
pipeline2 = Pipeline.new.tap do |pl|
pl.step pipeline1 # treat a Pipeline like a regular Step
pl.step FinalStep
end
We can add some nice little helpers on top of #step
.
# Pipeline
def debug!
step do |result|
byebug
end
end
def log(label)
step do |result|
Logger.info "[#{label}] #{result.inspect}"
result
end
end
Use case:
pipeline = Pipeline.new.tap do |pl|
pl.log 'before discount'
pl.step Discount.new(200)
pl.log 'after discount'
pl.step Tax.new(0.19)
pl.debug!
end
pipeline.call(1000)
This could be a nice little abstraction for middleware-style pipelines.
Possibility 2: pseudo (runtime) type system
What follows builds on the infrastructure described above, and is inspired by the Dry-* set of Ruby gems. For real-world use of these ideas you can refer to those libraries, which are mature and heavily optimised for performance.
Let’s start with a no-op step as a base to compose more specialised behaviour. This is the identity monad in the functional world.
Noop = Step.new { |result| result }
Let’s now add some extra conveniences to our Chainable
mixin (the one that affords #>>
to Step
and Chain
).
#transform
module Chainable
# .. etc
def transform(callable = nil, &block)
transformation = ->(result) {
new_value = callable.call(result.value)
result.success(new_value)
}
# Pipe self to transformation step
# returning a new Chain
self >> transformation
end
end
This is just a shortcut for value-transformation pipelines where the operations are assumed to be successful. An example for coercible values:
to_int = Noop.transform { |value| value.to_i }
# This works, too:
to_int = Noop.transform(&:to_i)
# This returns Chain(Noop, transform)
# Now use it in other pipelines
calculate_total = to_int >> substract_discount >> add_tax
#check
Quick boolean check on a result’s value.
module Chainable
def check(err = 'did not pass the check', &block)
a_check = ->(result) {
block.call(result.value) ? result : result.failure(err)
}
self >> a_check
end
end
Usage:
is_a_string = Noop.check('not a string') { |value| value.is_a?(String) }
is_a_string.call('yup') # Success('yup')
is_a_string.call(10) # Failure('not a string')
#is_a
Simple type check on top of #check
.
module Chainable
def is_a(klass)
check("is not a #{klass}") { |value| value.is_a?(klass) }
end
end
This one allows us to type-check input values, or return a Failure
early on in a pipeline.
must_be_numeric = Noop.is_a(::Numeric)
calculate_total = must_be_numeric >> substract_discount >> add_tax
calculate_total.call('nope!') # Result::Failure('is not a Numeric')
#|
Here we implement the or
logical operator by returning a custom callable.
module Chainable
# Disjunction operator (or)
# return an Or instance
def |(other)
Or.new(self, other)
end
end
class Or
include Chainable
def initialize(left_callable, right_callable)
@left_callable = left_callable
@right_callable = right_callable
end
# if left callable returns Success, return it.
# Otherwise try right callable.
def call(result)
result = @left_callable.call(result)
result.is_a?(Result::Success) ? result :
@right_callable.call(result.success)
end
end
Use case:
int_or_string = Noop.is_a(Integer) | Noop.is_a(String)
int_or_string.call(10) # Success(10)
int_or_string.call('yup!') # Success('yup!')
int_or_string.call({}) # Failure('{} is not a String')
With these helpers in hand, we could, if pressed, devise a kind of runtime type system.
module Types
String = Noop.is_a(::String)
Numeric = Noop.is_a(::Numeric)
Nil = Noop.is_a(::NilClass)
True = Noop.is_a(::TrueClass)
False = Noop.is_a(::FalseClass)
# etc
end
We can now combine these base “types” into other types. Here’s a “Boolean”:
module Types
Boolean = True | False
end
Types::Boolean.call(true) # Success(true)
Types::Boolean.call(false) # Success(false)
Types::Boolean.call('nope') # Failure
And here’s a Maybe type:
maybe_string = Types::Nil | Types::String
maybe_string.call('yes!') # Success('yes!')
maybe_string.call(nil) # Success(nil)
As you can see, we’re using disjunctions (the |
operator) as a kind of runtime Union type.
We can now combine basic, custom types and operators into complex logic:
require 'money'
module Types
Money = Noop.is_a(::Money)
end
# Coerce an integer into a Money instance
int_to_gbp = Types::Integer.transform{ |int| Money.new(int, 'GBP') }
# Is it already USD?
is_usd = Types::Money.check { |amount| amount.currency.code == 'USD' }
# Exchange to USD
to_usd = Types::Money.transform { |amount| amount.exchange_to('USD') }
# Check minimum money amount
gte = ->(cents) {
Types::Money.check("must be >= than #{cents} cents") { |amount| amount.cents >= cents }
}
# * If it's an Integer, convert into GBP Money instance
# * else if it's already a Money
# * if already USD, stop
# * else convert to USD
# * finally validate that it's greater or equal than $1000.00
money = (int_to_gbp | Types::Money) >> (is_usd | to_usd) >> gte(1000_00)
money.call(1000_10) # Success(Money(1_368_58, 'USD'))
money.call(Money.new(1000_00, 'USD')) # Success(Money(1000_00, 'USD'))
money.call(999_99) # Failure("must be >= than 100000 cents")
money.call('nope') # Failure('is not a Money')
Advanced “types”: Arrays, hashes
The basic Step
class can also be subclassed to handle composite type definitions.
array_of_strings_or_numbers = Types::Array.of(Types::String | Types::Numeric)
Birthday = Types::String.check('not a date') { |str| str =~ /\d{4}-\d{2}-\d{2}/ }
user_hash = Types::Hash.schema(
name: Types::String,
birthday: Birthday
)
And a long etc. Other examples include checking specific values, interfaces, concurrent processing of arrays. The Dry-Types project and related gems are a good showcase of where we can take these patterns in Ruby.