In this series:

In the previous article in this series I showed how to extend the basic pipeline with domain-specific steps and helpers.

Here I’ll show how to add middleware to the pipeline, to add tracing, logging, caching, and other cross-cutting concerns.


Middleware is a bit of code that wraps around each step in the pipeline, adding functionality to it. See Rack for a well-known use case.

As an example, I want to add middleware that adds context[:halted_step] to the Result instance, so that we know exactly what step halted the pipeline.

As a starter implementation, I’ll tweak Pipeline#step to wrap all registered steps with a middleware that adds the halted_step to the result context if the step halts the pipeline.

class Pipeline
  # ... etc

  def step(callable, &block)
    callable ||= block
    raise ArgumentError, "Step must respond to #call" unless callable.respond_to?(:call)

    # Wrap the step with a middleware before appending it to the list
    callable =

    steps << callable

A middleware step wraps around the execution of another step.

# Delegate anything else to the underlying step
class StepTracker < SimpleDelegator
  # Capture the call to a step, and add something to the context if it halted.
  def call(result)
    step = __getobj__
    result =
    return result.with_context(:halted_step, step) unless result.continue?

Now, context[:halted_step] will be set to the step that halted the pipeline.

We also get context[:trace] to show the position of the halted step in the pipeline, as shown in the previous article.

result =
result.continue? # => false
result.context[:halted_step] # => FailedStep
result.context[:trace] # => [3, 2]

Note that the same middleware approach can be used to add other tracing and introspection features to the pipeline. Some examples:

callable =
callable =, Rails.logger)
callable =
steps << callable

It’s also possible to add class-level configuration to register middleware for Pipeline subclasses.

class MyPipeline < Pipeline
  middleware ENV.fetch('API_KEY'))

A framework-agnostic implementation for that is included in the code gist

Middleware steps might look similar to regular steps, but they are not. Each registered middleware step wraps around every regular step, including in nested pipelines.


A CLI-tailored pipeline class can leverage step tracing to print step positions and halt reasons to the terminal.

class StepPrinter < SimpleDelegator
  def call(result)
    step = __getobj__
    position = result.context[:trace].join(".")
    result =
    status = result.success? ? 'OK' : 'ERROR'
    errors = result.errors.any? ? "Errors: #{result.errors}" : ""
    puts "#{position}. [#{status}] #{step} #{errors}"
1. [OK] InputStep
2. [OK] ParseCSV
3. [OK] ValidateCSV
3.1. [OK] ValidateHeaders
3.2. [ERROR] ValidateRows Errors: { 1: "Invalid format" }

Caching middleware

A piece of middleware can optimise expensive operations by caching their results.

class CachedStep < SimpleDelegator
  def initialize(step, cache)
    @cache = cache

  def call(result)
    cache_key = result.value.cache_key # or something else
    # Only call expensive operation if not in cache
    # and store the result in the cache
    @cache.fetch(cache_key) do
  • 1. Expensive Operation 1cached, skipped
  • 2. Expensive Operation 2cached, skipped
  • 3. Expensive Operation 3not cached, running
  • 4. ExpensiveOperation4pending

Caching could also be controlled selectively for one or more steps, via a custom sub-pipeline and a helper method. See Extending Pipelines for how to implement these helpers.

pl.step OkStep
pl.cached do |ch|
  ch.step ExpensiveStep
  ch.step AnotherExpensiveStep
pl.step OkStep

Other use cases

I’ve found that these pipelines make it simple to assemble a wide range of processing workflows big and small. Most specialisation can be contained in the steps themselves, and the pipeline class can be kept simple and generic.

Query builders

You can use it to build complex queries for databases or APIs.

pl.step do |result|
  query = result.value # An ActiveRecord::Relation or a Sequel::Dataset
  account_id = result.params[:account_id]
  query = query.where(account_id:) if account_id

# Composable query components
pl.step FullTextSearch

Durable execution

You can use it to build durable execution workflows, where each step is a task that can be retried or rolled back. This can be used to build robust and fault-tolerant operations. For example background jobs, or long-running sagas.

class DurablePipeline < Pipeline
  # Custom middleware to store the result of last successful step
  # In case of failure, the pipeline can be resumed from the last successful step

HolidayBookingSaga = do |pl|
  pl.step BookFlights
  pl.step BookHotel
  pl.step BookCarRental
  pl.step SendConfirmationEmail

Concurrent execution

It’s reasonably straightforward to build a pipeline that runs steps concurrently, for example to optimise I/O-bound operations.

HolidayBookingSaga = do |pl|
  # .. etc

  # Run these steps concurrently, then collect their results in order.
  # For example using Fibers or Threads.
  # This block can implement _all_ or _any_ semantics.
  pl.concurrent do |c|
    c.step BookFlights
    c.step BookHotel
    c.step BookCarRental

  # Send email once all bookings are confirmed
  pl.step SendConfirmationEmail

This is a basic implementation of that.

HTTP handlers

In Ruby we have plenty of incredible web frameworks to choose from, but a pipeline-oriented approach to web handling could be a good fit for some use cases. A bit like Elixir’s Plug.

module API
  CreateUserHandler = do |pl|
    pl.params do
      # This syntax belongs to Parametric, but you can use anything else
      # for input validation.

    pl.step ValidateUserInput
    pl.step CreateUser
    pl.step SendWelcomeEmail
    pl.respond_with(201, :created)
    pl.respond_with(400, :bad_request)

In future articles I might explore the potential of middleware in more depth.

In the next article, I’ll touch on testing pipelines and steps.