In this series:
- Part 1: Practical Railway-Oriented Pipelines in Ruby
- Part 2: User input, errors and metadata
- Part 3: Extending pipelines
- Part 4: Middleware
- Part 5: Testing pipelines
In the previous article I described a bare-bones implementation of Railway-oriented pipelines in Ruby. I showed how to build pipelines of steps that can be executed sequentially, with each step receiving the result of the previous one.
# An illustrative data processing pipeline
DataImporter = Pipeline.new do |pl|
pl.step ValidateUserInput
pl.step ReadCSV
pl.step ValidateData
pl.step TransformData
end
Result metadata
To make these pipelines practical, though, we want to be able to pass extra metadata with the result as it moves through the pipeline, so that we can support a variety of use cases.
# Start a result with a dataset value and user parameters.
result = Result.new([1, 2, 3, 4], params: { limit: 5 })
result.value # => [1, 2, 3, 4]
result.params[:limit] # => 5
What those fields are may depend on the domain, but for my use cases I’ve settled on the following:
- The
value
is the main data being processed. A set of records, an API response, a CSV stream, etc. - The
params
Hash is meant to pass external user or system input relevant for processing, or to control pipeline behaviour. - The
errors
Hash is meant to accumulate errors during processing. - The
context
Hash is meant to pass or accumulate arbitrary data between pipeline steps. Counts, lookups, facets, etc.
result = Result.new([1, 2, 3, 4], params: { limit: 5 })
result.params # { limit: 5 }
result.errors # {}
result.context # {}
Then I add helper methods such as #with_context
and #with_error
as well as #halt
and #continue
to help manipulate result instances as they move through the pipeline.
Passing context between steps
result = result.with_context(:count, 4)
# result.context[:count] # 4
Accummulating errors while allowing the pipeline to continue
result = result.with_error(:limit, "Exceeded")
# result.continue? => true
# result.errors => { limit: ["Exceeded"] }
Halting with errors
result = result.halt.with_error(:limit, "Exceeded")
# result.continue? => false
# result.errors => { limit: ["Exceeded"] }
Combining helpers
result = result
.halt([]) # <= halt with an empty value
.with_error(:limit, "Exceeded") # <= add an error
.with_context(:count, 4) # <= add context
Note that these helpers are not required for the pipeline to work. They’re just syntax sugar to make working with Result
instances more convenient.
All of these methods, as well as
#continue
and#halt
, return new instances, leaving the original untouched. Inmutable results means no risk of a step accidentaly modifying an object that might be used elsewhere in the code. It also enables concurrent execution of steps, as we’ll see later.
Let’s add a step to limit the set to the first N elements based on user input. It will also validate that the limit is set.
LimitSet = proc do |result|
if (limit = result.params[:limit])
set = result.value.first(result.params[:limit])
result.continue(set)
else # No limit! Halt with an error.
result.halt.with_error(:limit, "Not set")
end
end
NumberCruncher = Pipeline.new do |pl|
# ... Previous steps here
pl.step LimitSet # <= this step expects params[:limit]
end
initial_result = Result.new((1..100), params: { limit: 5 })
result = NumberCruncher.call(initial_result)
result.value # =>[2, 4, 6, 8, 10]
- 1.
Logging
- 2.
ValidateSetSize.new(lte: 100)
- 3.
MultiplyBy.(2)
- 4.
LimitSet
Ommitting the limit
param will halt the pipeline with an error.
initial_result = Result.new((1..100), params: {})
result = NumberCruncher.call(initial_result)
result.continue? # => false
result.errors # => { limit: ["Not set"] }
- 1.
Logging
- 2.
ValidateSetSize.new(lte: 100)
- 3.
MultiplyBy.(2)
- 4.
LimitSet errors: {limit: ['Not set']}
params
can be used for filtering lists, setting limits, defining transformations, etc. It’s a flexible way to pass user input to the pipeline.
Parameter validation steps
It’s possible to implement steps specialised in validating params and populating errors. These steps can be put at the front of a pipeline, to ensure that no further steps run if the input is invalid.
class ValidateParamPresence
def initialize(field)
@field = field
end
def call(result)
return result.halt.with_error(@field, "Not set") if result.params[@field].nil?
result
end
end
NumberCruncher = Pipeline.new do |pl|
pl.step ValidateParamPresence.new(:limit)
pl.step LimitSet # <= this step expects params[:limit]
end
There’s no constraint to the kinds of APIs or DSLs exposed by these steps. A more complex example could for example leverage Rails' attributes API
How exactly that step implements validations is not important, as long as it responds to #call
and returns a Result
with the relevant errors.
pl.step(ValidateParams.new do
attribute :limit, :integer
attribute :order, :string, default: "asc"
validates :limit, presence: true, numericality: { greater_than: 0 }
validates :order, inclusion: { in: %w[asc desc] }
end)
In the next article I’ll show more examples, when I talk about extending pipelines with specialised steps and helper methods.
Context is king.
I showed params
and errors
. context
is intended for steps to pass data between each other, or to accumulate data as the pipeline progresses.
The following step computes a count of odd numbers in the set, and passes it to the next step.
CountOdds = proc do |result|
count = result.value.count(&:odd?)
result.with_context(:odd_count, count)
end
This other step processes a list of user records, and builds facet counts for each country.
# data looks like:
#
# [
# { name: "Alice", country: "US" },
# { name: "Bob", country: "UK" }
# ...
# ]
FacetByCountry = proc do |result|
hash = Hash.new { |h, country| h[country] = 0 }
counts = result.value.each.with_object(hash) do |user, h|
h[user[:country]] += 1
end
result.with_context(:country_facets, counts)
end
# The pipeline
UserProcessor = Pipeline.new do |pl|
pl.step FacetByCountry
pl.step FilterByCountry # .. etc
end
# Example
initial_result = Result.new(users)
result = UserProcessor.call(initial_result)
result.context[:country_facets] # => { "US" => 10, "UK" => 5, ... }
I’ll rely on params
, errors
and context
throughout the series to show how to build complex pipelines that can handle a variety of use cases.