In this series:
- Part 1: Practical Railway-Oriented Pipelines in Ruby
- Part 2: User input, errors and metadata
- Part 3: Extending pipelines
- Part 4: Middleware
- Part 5: Testing pipelines
In the previous article in this series I showed how to pass extra metadata from one step to the next, including user input, errors and context data.
This article expands on the previous ones by showing how to extend the pipeline with domain-specific steps and helpers.
Extending the pipeline
The Pipeline
class itself can be subclassed or extended to add domain-specific functionality.
One that I’ve found helpful is to add a terse DSL for input parameter validation.
NumberCruncher = ValidatingPipeline.new do |pl|
# the #params helper adds a step to validate input parameters
pl.params do
field(:limit).type(:integer).required.default(5)
field(:lte).type(:integer).required
end
# ... other steps here
end
All #params
does is register a step using a specialised class that knows how to validate result parameters. That class exposes the #call(Result) Result
interface, and halts the pipeline if any parameter is invalid.
class ValidatingPipeline < Pipeline
# ... etc
# A helper method to register a custom step
def params(&block)
step ParamsValidator.new(&block)
end
end
I use my Parametric gem for this, but anything that makes sense for your domain will do. Dry::Types is another good option. Or Rails' ActiveModel::Validations (as shown in the previous article) if you’re in a Rails app.
This is the implementation.
This means that complex operations can now be packaged up and validate their own inputs.
# A portable step to multiply each number in the set by a factor.
Multiply = ValidatingPipeline.new do |pl|
pl.params do
field(:factor).type(:integer).required.default(1)
end
pl.step do |result|
factor = result.params[:factor]
result.continue(result.value.map { |n| n * factor })
end
end
# A portable step to limit the set to the first N elements.
# It defines its own required parameters.
LimitSet = ValidatingPipeline.new do |pl|
pl.params do
field(:limit).type(:integer).required.default(5)
end
pl.step do |result|
set = result.value.first(result.params[:limit])
result.continue(set)
end
end
NumberCruncher = Pipeline.new do |pl|
pl.step ValidateNumbers.new(lte: 100)
pl.step Multiply
pl.step LimitSet
end
I use helper methods to simplify domain-specific pipelines. Some other examples include:
MyPipeline = DatasetPipeline.new do |pl|
# A helper to filter elements in a set.
# Returns a new [Result] with the filtered set.
pl.filter do |element|
element > 10
end
# A helper to sort elements in a set
pl.sort do |a, b|
a <=> b
end
# A development helper to invoke a Byebug or Pry session at this point
pl.debug
end
For most, the implementation is trivial.
class DatasetPipeline < Pipeline
def filter(&block)
step do |result|
set = result.value.filter(&block)
result.continue(set)
end
end
def sort(&block)
step do |result|
set = result.value.sort(&block)
result.continue(set)
end
end
def debug
step do |result|
binding.pry
result
end
end
end
Tracing step positions
As workflows become more complex, it’s helpful to have ways to trace and instrospect execution. For example, when a step halts the pipeline, I would like to know exactly what step it was, and at what depth in the pipeline it sits.
We’ll deal with the latter first. The following tweaks Pipeline#call
to keep track of the current step position relative to its parent pipeline.
class Pipeline
# ... etc
# For each step, keep track of its position in the pipeline
# in the result context.
def call(result)
trace = result.context[:trace] || []
steps.each.with_index(1).reduce(result) do |res, (step, position)|
if res.continue?
step.call(res.with_context(:trace, trace + [position]))
else
res
end
end
end
end
With this, the Result
instance passed to each step will have a :trace
key in its context, which is an array of integers representing the position of the step in the pipeline.
For example:
OkStep = ->(result) { result.continue }
FailStep = ->(result) { result.halt }
ChildPipeline = Pipeline.new do |pl|
pl.step OkStep
pl.step FailedStep # <- this one halts the pipeline
pl.step OkStep
end
BigPipeline = Pipeline.new do |pl|
pl.step OkStep
pl.step OkStep
pl.step ChildPipeline # <- 2nd step in this pipeline halts
pl.step OkStep
end
FailedStep
inside the child pipeline will be the last step in the trace, and #context[:trace]
will be [3, 2]
, because it’s the third step in the child pipeline, and the child pipeline is the second step in the parent pipeline.
result = BigPipeline.call(Result.continue)
result.continue? # => false
result.context[:trace] # => [3, 2]
In other words:
- [1]
OkStep
- [2]
OkStep
-
[3]
ChildPipeline
- [3,1]
OkStep
- [3,2]
FailedStep
- [3,3]
OkStep
- [3,1]
- [4]
OkStep
In the following article I’ll show how to leverage this metadata when adding middleware to pipelines.