In this series:
- Part 1: Practical Railway-Oriented Pipelines in Ruby
- Part 2: User input, errors and metadata
- Part 3: Extending pipelines
- Part 4: Middleware
- Part 5: Testing pipelines
In the previous article in this series I showed how to extend the basic pipeline with domain-specific steps and helpers.
Here I’ll show how to add middleware to the pipeline, to add tracing, logging, caching, and other cross-cutting concerns.
Middleware
Middleware is a bit of code that wraps around each step in the pipeline, adding functionality to it. See Rack for a well-known use case.
As an example, I want to add middleware that adds context[:halted_step] to the Result instance, so that we know exactly what step halted the pipeline.
As a starter implementation, I’ll tweak Pipeline#step to wrap all registered steps with a middleware that adds the halted_step to the result context if the step halts the pipeline.
class Pipeline
  # ... etc
  def step(callable, &block)
    callable ||= block
    raise ArgumentError, "Step must respond to #call" unless callable.respond_to?(:call)
    # Wrap the step with a middleware before appending it to the list
    callable = StepTracker.new(callable)
    steps << callable
    self
  end
end
A middleware step wraps around the execution of another step.
# Delegate anything else to the underlying step
# https://ruby-doc.org/3.3.0/stdlibs/delegate/SimpleDelegator.html
class StepTracker < SimpleDelegator
  # Capture the call to a step, and add something to the context if it halted.
  def call(result)
    step = __getobj__
    result = step.call(result)
    return result.with_context(:halted_step, step) unless result.continue?
    result
  end
end
Now, context[:halted_step] will be set to the step that halted the pipeline.
We also get context[:trace] to show the position of the halted step in the pipeline, as shown in the previous article.
result = BigPipeline.call(Result.continue)
result.continue? # => false
result.context[:halted_step] # => FailedStep
result.context[:trace] # => [3, 2]
Note that the same middleware approach can be used to add other tracing and introspection features to the pipeline. Some examples:
callable = Instrumentation.new(callable)
callable = Logging.new(callable, Rails.logger)
callable = StepTracker.new(callable)
steps << callable
It’s also possible to add class-level configuration to register middleware for Pipeline subclasses.
class MyPipeline < Pipeline
  middleware Instrumentation.new(api_key: ENV.fetch('API_KEY'))
  middleware Logging.new(Rails.logger)
end
A framework-agnostic implementation for that is included in the code gist
Middleware steps might look similar to regular steps, but they are not. Each registered middleware step wraps around every regular step, including in nested pipelines.
CLIs
A CLI-tailored pipeline class can leverage step tracing to print step positions and halt reasons to the terminal.
class StepPrinter < SimpleDelegator
  def call(result)
    step = __getobj__
    position = result.context[:trace].join(".")
    result = step.call(result)
    status = result.success? ? 'OK' : 'ERROR'
    errors = result.errors.any? ? "Errors: #{result.errors}" : ""
    puts "#{position}. [#{status}] #{step} #{errors}"
    result
  end
end
1. [OK] InputStep
2. [OK] ParseCSV
3. [OK] ValidateCSV
3.1. [OK] ValidateHeaders
3.2. [ERROR] ValidateRows Errors: { 1: "Invalid format" }
Caching middleware
A piece of middleware can optimise expensive operations by caching their results.
class CachedStep < SimpleDelegator
  def initialize(step, cache)
    @cache = cache
    super(step)
  end
  def call(result)
    cache_key = result.value.cache_key # or something else
    # Only call expensive operation if not in cache
    # and store the result in the cache
    @cache.fetch(cache_key) do
      __getobj__.call(result)
    end
  end
end
- 1. Expensive Operation 1cached, skipped
- 2. Expensive Operation 2cached, skipped
- 3. Expensive Operation 3not cached, running
- 4. ExpensiveOperation4pending
Caching could also be controlled selectively for one or more steps, via a custom sub-pipeline and a helper method. See Extending Pipelines for how to implement these helpers.
pl.step OkStep
pl.cached do |ch|
  ch.step ExpensiveStep
  ch.step AnotherExpensiveStep
end
pl.step OkStep
Other use cases
I’ve found that these pipelines make it simple to assemble a wide range of processing workflows big and small. Most specialisation can be contained in the steps themselves, and the pipeline class can be kept simple and generic.
Query builders
You can use it to build complex queries for databases or APIs.
pl.step do |result|
  query = result.value # An ActiveRecord::Relation or a Sequel::Dataset
  account_id = result.params[:account_id]
  query = query.where(account_id:) if account_id
  result.continue(query)
end
# Composable query components
pl.step FullTextSearch
Durable execution
You can use it to build durable execution workflows, where each step is a task that can be retried or rolled back. This can be used to build robust and fault-tolerant operations. For example background jobs, or long-running sagas.
class DurablePipeline < Pipeline
  # Custom middleware to store the result of last successful step
  # In case of failure, the pipeline can be resumed from the last successful step
  middleware DurableExecution.new(store: Redis.new)
end
HolidayBookingSaga = DurablePipeline.new do |pl|
  pl.step BookFlights
  pl.step BookHotel
  pl.step BookCarRental
  pl.step SendConfirmationEmail
end
Concurrent execution
It’s reasonably straightforward to build a pipeline that runs steps concurrently, for example to optimise I/O-bound operations.
HolidayBookingSaga = Pipeline.new do |pl|
  # .. etc
  # Run these steps concurrently, then collect their results in order.
  # For example using Fibers or Threads.
  # This block can implement _all_ or _any_ semantics.
  pl.concurrent do |c|
    c.step BookFlights
    c.step BookHotel
    c.step BookCarRental
  end
  # Send email once all bookings are confirmed
  pl.step SendConfirmationEmail
end
This is a basic implementation of that.
HTTP handlers
In Ruby we have plenty of incredible web frameworks to choose from, but a pipeline-oriented approach to web handling could be a good fit for some use cases. A bit like Elixir’s Plug.
module API
  CreateUserHandler = HTTPPipeline.new do |pl|
    pl.params do
      # This syntax belongs to Parametric, but you can use anything else
      # for input validation.
      field(:name).type(:string).required
      field(:email).type(:string).required
    end
    pl.step ValidateUserInput
    pl.step CreateUser
    pl.step SendWelcomeEmail
    pl.respond_with(201, :created)
    pl.respond_with(400, :bad_request)
  end
end
In future articles I might explore the potential of middleware in more depth.
In the next article, I’ll touch on testing pipelines and steps.