In this series:
- Part 1: Practical Railway-Oriented Pipelines in Ruby
- Part 2: User input, errors and metadata
- Part 3: Extending pipelines
- Part 4: Middleware
- Part 5: Testing pipelines
In the previous article in this series I showed how to extend the basic pipeline with domain-specific steps and helpers.
Here I’ll show how to add middleware to the pipeline, to add tracing, logging, caching, and other cross-cutting concerns.
Middleware
Middleware is a bit of code that wraps around each step in the pipeline, adding functionality to it. See Rack for a well-known use case.
As an example, I want to add middleware that adds context[:halted_step]
to the Result
instance, so that we know exactly what step halted the pipeline.
As a starter implementation, I’ll tweak Pipeline#step
to wrap all registered steps with a middleware that adds the halted_step
to the result context if the step halts the pipeline.
class Pipeline
# ... etc
def step(callable, &block)
callable ||= block
raise ArgumentError, "Step must respond to #call" unless callable.respond_to?(:call)
# Wrap the step with a middleware before appending it to the list
callable = StepTracker.new(callable)
steps << callable
self
end
end
A middleware step wraps around the execution of another step.
# Delegate anything else to the underlying step
# https://ruby-doc.org/3.3.0/stdlibs/delegate/SimpleDelegator.html
class StepTracker < SimpleDelegator
# Capture the call to a step, and add something to the context if it halted.
def call(result)
step = __getobj__
result = step.call(result)
return result.with_context(:halted_step, step) unless result.continue?
result
end
end
Now, context[:halted_step]
will be set to the step that halted the pipeline.
We also get context[:trace]
to show the position of the halted step in the pipeline, as shown in the previous article.
result = BigPipeline.call(Result.continue)
result.continue? # => false
result.context[:halted_step] # => FailedStep
result.context[:trace] # => [3, 2]
Note that the same middleware approach can be used to add other tracing and introspection features to the pipeline. Some examples:
callable = Instrumentation.new(callable)
callable = Logging.new(callable, Rails.logger)
callable = StepTracker.new(callable)
steps << callable
It’s also possible to add class-level configuration to register middleware for Pipeline
subclasses.
class MyPipeline < Pipeline
middleware Instrumentation.new(api_key: ENV.fetch('API_KEY'))
middleware Logging.new(Rails.logger)
end
A framework-agnostic implementation for that is included in the code gist
Middleware steps might look similar to regular steps, but they are not. Each registered middleware step wraps around every regular step, including in nested pipelines.
CLIs
A CLI-tailored pipeline class can leverage step tracing to print step positions and halt reasons to the terminal.
class StepPrinter < SimpleDelegator
def call(result)
step = __getobj__
position = result.context[:trace].join(".")
result = step.call(result)
status = result.success? ? 'OK' : 'ERROR'
errors = result.errors.any? ? "Errors: #{result.errors}" : ""
puts "#{position}. [#{status}] #{step} #{errors}"
result
end
end
1. [OK] InputStep
2. [OK] ParseCSV
3. [OK] ValidateCSV
3.1. [OK] ValidateHeaders
3.2. [ERROR] ValidateRows Errors: { 1: "Invalid format" }
Caching middleware
A piece of middleware can optimise expensive operations by caching their results.
class CachedStep < SimpleDelegator
def initialize(step, cache)
@cache = cache
super(step)
end
def call(result)
cache_key = result.value.cache_key # or something else
# Only call expensive operation if not in cache
# and store the result in the cache
@cache.fetch(cache_key) do
__getobj__.call(result)
end
end
end
- 1.
Expensive Operation 1
cached, skipped - 2.
Expensive Operation 2
cached, skipped - 3.
Expensive Operation 3
not cached, running - 4.
ExpensiveOperation4
pending
Caching could also be controlled selectively for one or more steps, via a custom sub-pipeline and a helper method. See Extending Pipelines for how to implement these helpers.
pl.step OkStep
pl.cached do |ch|
ch.step ExpensiveStep
ch.step AnotherExpensiveStep
end
pl.step OkStep
Other use cases
I’ve found that these pipelines make it simple to assemble a wide range of processing workflows big and small. Most specialisation can be contained in the steps themselves, and the pipeline class can be kept simple and generic.
Query builders
You can use it to build complex queries for databases or APIs.
pl.step do |result|
query = result.value # An ActiveRecord::Relation or a Sequel::Dataset
account_id = result.params[:account_id]
query = query.where(account_id:) if account_id
result.continue(query)
end
# Composable query components
pl.step FullTextSearch
Durable execution
You can use it to build durable execution workflows, where each step is a task that can be retried or rolled back. This can be used to build robust and fault-tolerant operations. For example background jobs, or long-running sagas.
class DurablePipeline < Pipeline
# Custom middleware to store the result of last successful step
# In case of failure, the pipeline can be resumed from the last successful step
middleware DurableExecution.new(store: Redis.new)
end
HolidayBookingSaga = DurablePipeline.new do |pl|
pl.step BookFlights
pl.step BookHotel
pl.step BookCarRental
pl.step SendConfirmationEmail
end
Concurrent execution
It’s reasonably straightforward to build a pipeline that runs steps concurrently, for example to optimise I/O-bound operations.
HolidayBookingSaga = Pipeline.new do |pl|
# .. etc
# Run these steps concurrently, then collect their results in order.
# For example using Fibers or Threads.
# This block can implement _all_ or _any_ semantics.
pl.concurrent do |c|
c.step BookFlights
c.step BookHotel
c.step BookCarRental
end
# Send email once all bookings are confirmed
pl.step SendConfirmationEmail
end
This is a basic implementation of that.
HTTP handlers
In Ruby we have plenty of incredible web frameworks to choose from, but a pipeline-oriented approach to web handling could be a good fit for some use cases. A bit like Elixir’s Plug.
module API
CreateUserHandler = HTTPPipeline.new do |pl|
pl.params do
# This syntax belongs to Parametric, but you can use anything else
# for input validation.
field(:name).type(:string).required
field(:email).type(:string).required
end
pl.step ValidateUserInput
pl.step CreateUser
pl.step SendWelcomeEmail
pl.respond_with(201, :created)
pl.respond_with(400, :bad_request)
end
end
In future articles I might explore the potential of middleware in more depth.
In the next article, I’ll touch on testing pipelines and steps.