In this series:
- Part 1: Practical Railway-Oriented Pipelines in Ruby
- Part 2: User input, errors and metadata
- Part 3: Extending pipelines
- Part 4: Middleware
- Part 5: Testing pipelines
Some years ago I explored patterns for building composable processing pipelines in Ruby, using a Railway-oriented paradigm.
In this series, I’ll describe a simplified implementation for practical use.
# An illustrative data processing pipeline
DataImporter = Pipeline.new do |pl|
pl.step ValidateUserInput
pl.step ReadCSV
pl.step ValidateData
pl.step TransformData
pl.step SaveData
end
Part 1: The core pattern
I’ve relied on versions of this approach in various projects for some time now, and I’ve found it to be a very effective way to build and maintain complex data processing workflows. The following are the core components of the pattern.
The result class
A generic Result
wraps values passed through the pipeline, and can be in one of two states: Continue
or Halt
.
The values themselves can be anything relevant to the domain, but Result
offers a consistent interface for handling them in the pipeline, as well as metadata such as user input, errors and arbitrary context (I’ll describe some of this in a separate article).
# Initial result
result = Result.new([1, 2, 3, 4])
result.value # [1, 2, 3, 4]
result.continue? # => true
Result
instances can be continued or halted. These return new copies with the same or different data.
result = result.continue([5, 6, 7, 8])
result.value # [5, 6, 7, 8]
result.continue? # => true
result = result.halt
result.continue? # => false
The Step interface
#call(Result) Result
A step is a simple object that responds to #call
and takes a Result
as input, returning a new Result
.
This is a step:
class MyCustomStep
def call(result)
# Do something with result.value
result.continue(new_value)
end
end
And so is this:
MyProcStep = proc do |result|
# Do something with result.value
result.continue(new_value)
end
Steps can be instances, classes, Procs, Lambdas or Method objects.
They can be stateless procedures, or complex objects that manage their own internal state.
It doesn’t matter how they were defined or initialised, as long as they respond to #call
.
The pipeline
A pipeline is a container for a sequence of steps that process a Result
and return a new Result
, in the order they were added to the pipeline.
MyPipeline = Pipeline.new do |pl|
# Anything that responds to #call can be a step
pl.step MyCustomStep.new
# Or a simple proc. This one limits the set by the first 5 elements
pl.step do |result|
set = result.value.first(5)
result.continue(set)
end
end
# Usage
initial_result = Result.new((1..100))
result = MyPipeline.call(initial_result)
result.value # => [1, 2, 3, 4, 5]
There’s very little to the Pipeline
class itself.
class Pipeline
attr_reader :steps
def initialize(&config)
@steps = []
config.call(self) and @steps.freeze if block_given?
end
# Add a step to the pipeline, either in block form or as a callable object.
# @param callable [Object, nil] a step that responds to `#call(Result) Result`
# @yield [Result] a step as a block
def step(callable = nil, &block)
callable ||= block
raise ArgumentError, "Step must respond to #call" unless callable.respond_to?(:call)
steps << callable
self
end
# Reduce over steps, call each one in turn,
# * [Continue] results are passed on to the next step
# * [Halt] results are returned unchanged
# @param result [Result]
# @return [Result]
def call(result)
steps.reduce(result) do |r, step|
r.continue? ? step.call(r) : r
end
end
end
Because it responds to #call(Result) Result
, a pipeline is itself a step. More on that later.
The Railway bit
Where this becomes useful is in the ability to “halt” processing at any point in the pipeline.
MyPipeline = Pipeline.new do |pl|
# This step halts processing if the set size is greater than 100
pl.step do |result|
if result.value.size > 100 # value to bit. Halt.
return result.halt
else # nothing to do. Continue.
result
end
end
# Any further steps here will not be executed
# if the pipeline is halted in the step above
end
The key to this is the Pipeline#call
method, expanded here for clarity:
# @param result [Result]
# @return [Result]
def call(result)
steps.reduce(result) do |r, step|
if r.continue? # if the result is a Continue, invoke the next step
step.call(r)
else # if the result is a Halt, return it unchanged
r
end
end
end
Now, any step that returns a halt will just skip over further steps downstream. In other words, a step can return a continue or a halt, but it can only ever receive a continue as argument.
#call(Result[Continue]) [Result[Continue], Result[Halt]]
Other implementations of this pattern rely on Sum types or monads to represent the continue and halt states. See Dry::Monads for a more functional approach. I also expand on a typed implementation in this article.
Lets do some number crunching:
# A portable step to validate set size
class ValidateSetSize
# @param lte [Integer] the maximum size allowed (Less Than or Equal)
def initialize(lte:) = @lte = lte
def call(result)
return result.halt if result.value.size > @lte
result
end
end
# A step to multiply each number in the set by a factor
# This one is a Proc that returns a Proc.
MultiplyBy = proc do |factor|
proc do |result|
result.continue(result.value.map { |n| n * factor })
end
end
# Limit set to first N elements
LimitSet = proc do |limit|
proc do |result|
result.continue(result.value.first(limit))
end
end
# Compose the pipeline
NumberCruncher = Pipeline.new do |pl|
pl.step { |r| puts 'Logging'; r }
pl.step ValidateSetSize.new(lte: 100)
pl.step MultiplyBy.(2)
pl.step LimitSet.(5)
end
In this example, the second ValidateSetSize
step will halt the pipeline if the set size is greater than 100, preventing MultiplyBy
from running.
initial_result = Result.new((1..101))
result = NumberCruncher.call(initial_result)
result.continue? # => false
- 1.
Logging
- 2.
ValidateSetSize.new(lte: 100)
- 3.
MultiplyBy.(2)
- 4.
LimitSet.(5)
However, if all steps return a continue result, the pipeline processes all steps and returns the final result.
initial_result = Result.new((1..99))
result = MyPipeline.call(initial_result)
result.continue? # => true
# Each number in set was multiplied by 2, then limited to the first 5
result.value # => [2, 4, 6, 8, 10]
- 1.
Logging
- 2.
ValidateSetSize.new(lte: 100)
- 3.
MultiplyBy.(2)
- 4.
LimitSet.(5)
Composing pipelines
Since Pipeline
itself implements the #call(Result) Result
interface, it can be used as a step in another pipeline.
BigPipeline = Pipeline.new do |pl|
pl.step Step1 # a regular step
pl.step NumberCruncher # a nested pipeline
pl.step Step3 # another regular step
end
- 1.
Step1
- 2.
NumberCruncher
- 2.1.
Logging
- 2.2.
ValidateSetSize.new(lte: 100)
- 2.3.
MultiplyBy.(2)
- 2.4.
LimitSet.(5)
- 2.1.
- 3.
Step3
This allows “packaging up” complex processing workflows into reusable components, where each component can be composed of multiple steps if need be. It’s also possible to have factory methods that parameterise the creation of pipelines.
# A component to validate and coerce a set of numbers
# It returns a 2-step pipeline that can be composed into a larger pipeline
module NumberValidation
def self.new(lte:)
Pipeline.new do |pl|
pl.step ValidateSetSize.new(lte: lte)
pl.step CoerceToIntegers
end
end
CoerceToIntegers = proc do |result|
result.continue(result.value.map(&:to_i))
end
end
# Compose a larger pipeline
BigPipeline = Pipeline.new do |pl|
pl.step Step1
pl.step NumberValidation.new(lte: 100) # a nested pipeline
pl.step Step3
end
Pipelines can also be used internally by custom classes.
class NumberValidation
def initialize(lte:)
@pipeline = Pipeline.new do |pl|
pl.step ValidateSetSize.new(lte: lte)
# Use a Method object as step
# https://ruby-doc.org/3.3.0/Method.html
pl.step method(:coerce_to_integers)
end
end
# Expose the Step interface
# to make instances of this class behave like a step
def call(result) = @pipeline.call(result)
private def coerce_to_integers(result)
result.continue(result.value.map(&:to_i))
end
end
Which approach to use will depend on each step’s internals. Other than the simple #call
interface, steps are effective black boxes and refactoring them is straightforward.
Conclusion
Like anything, this approach has its trade-offs. If the problem can be better thought of as an object graph rather than a sequence, or the processing required can’t be easily broken down into steps, then this approach might not be the best fit.
In general, I’ve found it provides a simple mental model to reason about problems (in the Rick Hickey sense).
Any operation that can be coerced into complying with the Monoid Laws can be a good candidate.
In the next article in this series, I describe how to make pipelines more useful by handling user input, errors and general metadata.
The basic implementation used in this article is here.