Data Validation with Fugue#

Validation on big data comes in two forms. The first is performing one set of validations on data that doesn’t fit in memory. The second happens when a large dataset is comprised of multiple groups that require different validations. In pandas semantics, this would be the equivalent of a groupby-validate operation. This section will cover using pandera for both of these scenarios.

Pandera has support for Spark and Dask DataFrames through Modin and PySpark Pandas. Another option for running pandera on top of native Spark or Dask engines is Fugue . Fugue is an open source abstraction layer that ports Python, pandas, and SQL code to Spark and Dask. Operations will be applied on DataFrames natively, minimizing overhead.

What is Fugue?#

Fugue serves as an interface to distributed computing. Because of its non-invasive design, existing Python code can be scaled to a distributed setting without significant changes.

To run the example, Fugue needs to installed separately. Using pip:

pip install fugue[spark]

This will also install PySpark because of the spark extra. Dask is available with the dask extra.

Example#

In this example, a pandas DataFrame is created with state, city and price columns. Pandera will be used to validate that the price column values are within a certain range.

import pandas as pd

data = pd.DataFrame(
    {
        'state': ['FL','FL','FL','CA','CA','CA'],
        'city': [
            'Orlando', 'Miami', 'Tampa', 'San Francisco', 'Los Angeles', 'San Diego'
        ],
        'price': [8, 12, 10, 16, 20, 18],
    }
)
print(data)
  state           city  price
0    FL        Orlando      8
1    FL          Miami     12
2    FL          Tampa     10
3    CA  San Francisco     16
4    CA    Los Angeles     20
5    CA      San Diego     18

Validation is then applied using pandera. A price_validation function is created that runs the validation. None of this will be new.

from pandera import Column, DataFrameSchema, Check

price_check = DataFrameSchema(
    {"price": Column(int, Check.in_range(min_value=5,max_value=20))}
)

def price_validation(data:pd.DataFrame) -> pd.DataFrame:
    return price_check.validate(data)

The transform function in Fugue is the easiest way to use Fugue with existing Python functions as seen in the following code snippet. The first two arguments are the DataFrame and function to apply. The keyword argument schema is required because schema is strictly enforced in distributed settings. Here, the schema is simply * because no new columns are added.

The last part of the transform function is the engine. Here, a SparkSession object is used to run the code on top of Spark. For Dask, users can pass a string "dask" or can pass a Dask Client. Passing nothing uses the default pandas-based engine. Because we passed a SparkSession in this example, the output is a Spark DataFrame.

from fugue import transform
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark_df = transform(data, price_validation, schema="*", engine=spark)
spark_df.show()
+-----+-------------+-----+
|state|         city|price|
+-----+-------------+-----+
|   FL|      Orlando|    8|
|   FL|        Miami|   12|
|   FL|        Tampa|   10|
|   CA|San Francisco|   16|
|   CA|  Los Angeles|   20|
|   CA|    San Diego|   18|
+-----+-------------+-----+

Validation by Partition#

There is an interesting use case that arises with bigger datasets. Frequently, there are logical groupings of data that require different validations. In the earlier sample data, the price range for the records with state FL is lower than the range for the state CA. Two DataFrameSchema will be created to reflect this. Notice their ranges for the Check differ.

price_check_FL = DataFrameSchema({
    "price": Column(int, Check.in_range(min_value=7,max_value=13)),
})

price_check_CA = DataFrameSchema({
    "price": Column(int, Check.in_range(min_value=15,max_value=21)),
})

price_checks = {'CA': price_check_CA, 'FL': price_check_FL}

A slight modification is needed to our price_validation function. Fugue will partition the whole dataset into multiple pandas DataFrames. Think of this as a groupby. By the time price_validation is used, it only contains the data for one state. The appropriate DataFrameSchema is pulled and then applied.

To partition our data by state, all we need to do is pass it into the transform function through the partition argument. This splits up the data across different workers before they each run the price_validation function. Again, this is like a groupby-validation.

def price_validation(df:pd.DataFrame) -> pd.DataFrame:
    location = df['state'].iloc[0]
    check = price_checks[location]
    check.validate(df)
    return df

spark_df = transform(data,
          price_validation,
          schema="*",
          partition=dict(by="state"),
          engine=spark)

spark_df.show()
SparkDataFrame
state:str|city:str                                                 |price:long
---------+---------------------------------------------------------+----------
CA       |San Francisco                                            |16
CA       |Los Angeles                                              |20
CA       |San Diego                                                |18
FL       |Orlando                                                  |8
FL       |Miami                                                    |12
FL       |Tampa                                                    |10
Total count: 6

Note

Because operations in a distributed setting are applied per partition, statistical validators will be applied on each partition rather than the global dataset. If no partitioning scheme is specified, Spark and Dask use default partitions. Be careful about using operations like mean, min, and max without partitioning beforehand.

All row-wise validations scale well with this set-up.

Returning Errors#

Pandera will raise a SchemaError by default that gets buried by the Spark error messages. To return the errors as a DataFrame, we use can use the following approach. If there are no errors in the data, it will just return an empty DataFrame.

To keep the errors for each partition, you can attach the partition key as a column in the returned DataFrame.

from pandera.errors import SchemaErrors

out_schema = "schema_context:str, column:str, check:str, \
check_number:int, failure_case:str, index:int"

out_columns = ["schema_context", "column", "check",
"check_number", "failure_case", "index"]

price_check = DataFrameSchema(
    {"price": Column(int, Check.in_range(min_value=12,max_value=20))}
)

def price_validation(data:pd.DataFrame) -> pd.DataFrame:
    try:
        price_check.validate(data, lazy=True)
        return pd.DataFrame(columns=out_columns)
    except SchemaErrors as err:
        return err.failure_cases

transform(data, price_validation, schema=out_schema, engine=spark).show()
+--------------+------+----------------+------------+------------+-----+
|schema_context|column|           check|check_number|failure_case|index|
+--------------+------+----------------+------------+------------+-----+
|        Column| price|in_range(12, 20)|           0|           8|    0|
|        Column| price|in_range(12, 20)|           0|          10|    0|
+--------------+------+----------------+------------+------------+-----+