Decorators for Pipeline IntegrationΒΆ

If you have an existing data pipeline that uses pandas data structures, you can use the check_input() and check_output() decorators to easily check function arguments or returned variables from existing functions.

Check InputΒΆ

Validates input pandas DataFrame/Series before entering the wrapped function.

import pandas as pd
import pandera.pandas as pa


df = pd.DataFrame({
    "column1": [1, 4, 0, 10, 9],
    "column2": [-1.3, -1.4, -2.9, -10.1, -20.4],
})

in_schema = pa.DataFrameSchema({
    "column1": pa.Column(
        int, pa.Check(lambda x: 0 <= x <= 10, element_wise=True)
    ),
    "column2": pa.Column(float, pa.Check(lambda x: x < -1.2)),
})

# by default, check_input assumes that the first argument is
# dataframe/series.
@pa.check_input(in_schema)
def preprocessor(dataframe):
    dataframe["column3"] = dataframe["column1"] + dataframe["column2"]
    return dataframe

preprocessed_df = preprocessor(df)
print(preprocessed_df)
   column1  column2  column3
0        1     -1.3     -0.3
1        4     -1.4      2.6
2        0     -2.9     -2.9
3       10    -10.1     -0.1
4        9    -20.4    -11.4

You can also provide the argument name as a string

@pa.check_input(in_schema, "dataframe")
def preprocessor(dataframe):
    ...

Or an integer representing the index in the positional arguments.

@pa.check_input(in_schema, 1)
def preprocessor(foo, dataframe):
    ...

Check OutputΒΆ

The same as check_input, but this decorator checks the output DataFrame/Series of the decorated function.

import pandas as pd
import pandera.pandas as pa


preprocessed_df = pd.DataFrame({
    "column1": [1, 4, 0, 10, 9],
})

# assert that all elements in "column1" are zero
out_schema = pa.DataFrameSchema({
    "column1": pa.Column(int, pa.Check(lambda x: x == 0))
})


# by default assumes that the pandas DataFrame/Schema is the only output
@pa.check_output(out_schema)
def zero_column_1(df):
    df["column1"] = 0
    return df


# you can also specify in the index of the argument if the output is list-like
@pa.check_output(out_schema, 1)
def zero_column_1_arg(df):
    df["column1"] = 0
    return "foobar", df


# or the key containing the data structure to verify if the output is dict-like
@pa.check_output(out_schema, "out_df")
def zero_column_1_dict(df):
    df["column1"] = 0
    return {"out_df": df, "out_str": "foobar"}


# for more complex outputs, you can specify a function
@pa.check_output(out_schema, lambda x: x[1]["out_df"])
def zero_column_1_custom(df):
    df["column1"] = 0
    return ("foobar", {"out_df": df})


zero_column_1(preprocessed_df)
zero_column_1_arg(preprocessed_df)
zero_column_1_dict(preprocessed_df)
zero_column_1_custom(preprocessed_df)
('foobar',
 {'out_df':    column1
  0        0
  1        0
  2        0
  3        0
  4        0})

Check Inputs And OutputsΒΆ

The check_types() decorator validates both inputs and outputs using type annotations. This is especially useful when working with DataFrameModel schemas and the DataFrame generic type.

import pandas as pd
import pandera.pandas as pa
from pandera.typing import DataFrame, Series


class InputSchema(pa.DataFrameModel):
    column1: Series[int] = pa.Field(ge=0, le=10)
    column2: Series[float] = pa.Field(lt=-1.0)


class OutputSchema(InputSchema):
    column3: Series[float]


@pa.check_types
def preprocessor(df: DataFrame[InputSchema]) -> DataFrame[OutputSchema]:
    return df.assign(column3=df["column1"] + df["column2"])


df = pd.DataFrame({
    "column1": [1, 4, 0, 10, 9],
    "column2": [-1.3, -1.4, -2.9, -10.1, -20.4],
})

preprocessed_df = preprocessor(df)
print(preprocessed_df)
   column1  column2  column3
0        1     -1.3     -0.3
1        4     -1.4      2.6
2        0     -2.9     -2.9
3       10    -10.1     -0.1
4        9    -20.4    -11.4

Collection TypesΒΆ

The check_input(), check_output(), and check_types() decorators all support validating DataFrames inside both Union and collection types (ex. tuple, list, dict).

Union TypesΒΆ

import typing


class OnlyZeroesSchema(pa.DataFrameModel):
    a: Series[int] = pa.Field(eq=0)


class OnlyOnesSchema(pa.DataFrameModel):
    a: Series[int] = pa.Field(eq=1)


@pa.check_types
def process_either(
    df: typing.Union[DataFrame[OnlyZeroesSchema], DataFrame[OnlyOnesSchema]],
) -> typing.Union[DataFrame[OnlyZeroesSchema], DataFrame[OnlyOnesSchema]]:
    return df


# Both of these are valid:
process_either(pd.DataFrame({"a": [0, 0, 0]}))
process_either(pd.DataFrame({"a": [1, 1, 1]}))
a
0 1
1 1
2 1

Collection TypesΒΆ

@pa.check_types
def process_tuple_and_return_dict(
    dfs: tuple[DataFrame[OnlyZeroesSchema], DataFrame[OnlyOnesSchema]],
) -> dict[str, DataFrame[OnlyZeroesSchema]]:
    return {
        "foo": dfs[0],
        "bar": dfs[0]
    }


result = process_tuple_and_return_dict((
    pd.DataFrame({"a": [0, 0]}),
    pd.DataFrame({"a": [1, 1]}),
))
print(result)
{'foo':    a
0  0
1  0, 'bar':    a
0  0
1  0}

Check IOΒΆ

For convenience, you can also use the check_io() decorator where you can specify input and output schemas more concisely:

import pandas as pd
import pandera.pandas as pa


df = pd.DataFrame({
    "column1": [1, 4, 0, 10, 9],
    "column2": [-1.3, -1.4, -2.9, -10.1, -20.4],
})

in_schema = pa.DataFrameSchema({
    "column1": pa.Column(int),
    "column2": pa.Column(float),
})

out_schema = in_schema.add_columns({"column3": pa.Column(float)})

@pa.check_io(df1=in_schema, df2=in_schema, out=out_schema)
def preprocessor(df1, df2):
    return (df1 + df2).assign(column3=lambda x: x.column1 + x.column2)

preprocessed_df = preprocessor(df, df)
print(preprocessed_df)
   column1  column2  column3
0        2     -2.6     -0.6
1        8     -2.8      5.2
2        0     -5.8     -5.8
3       20    -20.2     -0.2
4       18    -40.8    -22.8

Decorate Functions and CoroutinesΒΆ

All pandera decorators work on synchronous as well as asynchronous code, on both bound and unbound functions/coroutines. For example, one can use the same decorators on:

  • sync/async functions

  • sync/async methods

  • sync/async class methods

  • sync/async static methods

All decorators work on sync/async regular/class/static methods of metaclasses as well.

import pandera.pandas as pa
from pandera.typing import DataFrame, Series

class Schema(pa.DataFrameModel):
    col1: Series[int]

    class Config:
        strict = True

@pa.check_types
async def coroutine(df: DataFrame[Schema]) -> DataFrame[Schema]:
    return df

@pa.check_types
async def function(df: DataFrame[Schema]) -> DataFrame[Schema]:
    return df

class SomeClass:
    @pa.check_output(Schema.to_schema())
    async def regular_coroutine(self, df) -> DataFrame[Schema]:
        return df

    @classmethod
    @pa.check_input(Schema.to_schema(), "df")
    async def class_coroutine(cls, df):
        return Schema.validate(df)

    @staticmethod
    @pa.check_io(df=Schema.to_schema(), out=Schema.to_schema())
    def static_method(df):
        return df