A Practical Example of the Pipeline Pattern in Python

By on 8 November 2024

What is this pattern about?


The Pipeline design pattern (also known as Chain of Command pattern) is a flexible way to handle a sequence of actions, where each handler in the chain processes the input data and passes it to the next handler. This pattern is commonly used in scenarios involving data processing, web scraping, or middleware systems.

In this blog post, I’ll walk you through a specific example that leverages Python’s powerful functools.reduce and partial functions, along with the BeautifulSoup library for parsing HTML content. This code showcases the Pipeline pattern applied to HTML table extraction and processing.

What Does the Code Do?

The code defines a pipeline of data parsing steps for extracting and cleaning tables from an HTML file. It follows a functional programming approach to compose several processing functions into a unified flow using the Chain of Command pattern.

Key Concepts

  1. Functional Composition: Combining multiple functions into one that executes in a specific order.
  2. Data Parsing Pipeline: Sequential processing of HTML content into structured data (a DataFrame).
  3. Error Handling: Ensuring the pipeline gracefully handles missing or malformed data.

Let’s break down the code step by step:

1. Function Composition with compose

from functools import reduce, partial
from typing import Callable

The pipeline is created by composing multiple parsing functions into a single unified function. The compose function uses reduce to chain these functions together:

def compose(*functions: ParsingPipeline) -> ParsingPipeline:
    """Composes functions into a single function"""
    return reduce(lambda f, g: lambda x: g(f(x)), functions, lambda x: x)

This allows you to define an ordered flow of operations that process input data from one function to the next. Each function modifies the input data, which is then passed down the pipeline.

2. Reading HTML Content

The first step in the pipeline is to read the contents of an HTML file. This is done by read_htm_from:

def read_htm_from(filename: T, mode: T = "r", encoding: T = "utf-8") -> T:
    with open(filename, mode, encoding=encoding) as file:
        html_content = file.read()
    return html_content

This function opens an HTML file and returns its content as a string. It supports different file modes and encodings, making it flexible for various file formats.

Note that T is defined here as TypeVar("T"), see the typing docs.

3. Parsing the HTML Table

Next, read_table_from uses BeautifulSoup to find the HTML table within the file:

from bs4 import BeautifulSoup

def read_table_from(htm_file: T, parser: str = "html.parser") -> T:
    soup = BeautifulSoup(htm_file, parser)
    table = soup.find("table")
    return table

This function converts the HTML content into a BeautifulSoup object and extracts the first table it finds. The parsed table is passed down the pipeline for further processing.

4. Extracting Rows and Data

Once the table is identified, the pipeline extracts the rows and applies filtering logic based on custom markers:

def extract_row_data_from(
    table_rows: T, start_markers: T, continue_markers: T, end_markers: T
) -> T:
    row_data: T = []
    start_processing = False
    for row in table_rows:
        if any(marker in row.text for marker in start_markers) and not start_processing:
            start_processing = True
            continue
        if start_processing:
            if any(marker in row.text for marker in continue_markers):
                continue
            if any(marker in row.text for marker in end_markers):
                break
            row_data.append(row)
    return row_data[:-1]

This function inspects each row in the table, checking if the row text matches specified start, continue, or end markers. Data extraction begins after encountering the start marker and ends when the end marker is found.

5. Converting Rows to DataFrame

The next steps involve transforming the extracted row data into a structured pandas DataFrame. First, the rows are separated into individual columns using separate_columns_in:

def separate_columns_in(rows: T) -> T:
    data_rows: T = []
    try:
        for row in rows:
            columns = row.find_all(["td", "th"])
            data = [col.text for col in columns]
            data_rows.append(data)
        return data_rows
    except Exception as e:
        print(f"An error occurred: {str(e)}")
        return []

Then, convert_to_dataframe reshapes this data into a pandas DataFrame:

def convert_to_dataframe(data_rows: T) -> T:
    df = pd.DataFrame(data_rows)
    df = df.rename(columns=df.iloc[0]).drop(df.index[0])
    df.columns = COLUMN_NAMES
    df.drop(columns=COLUMNS_TO_REMOVE, axis=1, inplace=True)
    df.set_index(df.columns[0], inplace=True, drop=True)
    return df

The DataFrame is cleaned up by renaming columns, removing unnecessary columns, and setting the correct index.

6. Assigning Correct Data Types

Finally, assign_correct_data_type_to ensures that the DataFrame columns have the appropriate data types:

def assign_correct_data_type_to(
    df: T,
    dict_types: dict[str, str] = COLUMN_TYPES,
    datetime_columns: list[str] = DATETIME_COLUMN_NAMES,
) -> T:
    if not isinstance(df, pd.DataFrame):
        raise ValueError("Input `df` must be a pandas DataFrame.")
    df = df.copy()
    for column in datetime_columns:
        if column in df.columns:
            df[column] = pd.to_datetime(df[column])
    for column, col_type in dict_types.items():
        if column in df.columns:
            try:
                if col_type == "numeric":
                    df[column] = pd.to_numeric(df[column], errors="coerce")
                else:
                    df[column].astype(col_type)
            except Exception as e:
                print(f"Error converting column {column} to {col_type}: {e}")
    return df

This function converts columns into numeric or datetime formats as needed, ensuring that the data is properly structured for analysis.

7. Putting It All Together

At the end of the code, the pipeline is composed by chaining all of the above functions together:

parse_gbx_bt: ParsingPipeline = compose(
    partial(read_htm_from, mode="r", encoding="utf-8"),
    read_table_from,
    read_rows_from,
    partial(
        extract_row_data_from,
        start_markers=["Closed Transactions:"],
        continue_markers=["Genbox", "balance", "Deposit"],
        end_markers=["Closed P/L:"],
    ),
    separate_columns_in,
    convert_to_dataframe,
    assign_correct_data_type_to,
)

This creates a fully automated pipeline that:

  1. Reads an HTML file.
  2. Extracts table data.
  3. Cleans and converts the data into a pandas DataFrame.
  4. Assigns the correct data types.

Conclusion

This implementation of the Chain of Command or Pipeline pattern in Python demonstrates how to apply functional programming principles to data parsing tasks. The use of functools.reduce and partial, and BeautifulSoup provides a flexible, reusable way to process HTML content and structure it into usable data.

If you’re looking to create complex data processing pipelines that need to handle dynamic data from HTML or other sources, this approach is a clean and maintainable solution.

You can find the code in the repo: https://github.com/jjeg1979/pyBacktestAnalyzer.

And if you want to watch the code clinic where I presented the tool, feel free to check it out at https://pybites.circle.so/c/circle-coaching-calls/python-for-the-trader-code-clinic.

If you cannot access…well, what are you waiting for to become a PDM member?

Want a career as a Python Developer but not sure where to start?