Data Validation with Pyspark SQL¶
new in 0.16.0
Apache Spark is an open-source unified analytics engine for large-scale data processing. Spark provides an interface for programming clusters with implicit data parallelism and fault tolerance.
Pyspark is the Python API for Apache Spark, an open source, distributed computing framework and set of libraries for real-time, large-scale data processing.
You can use pandera to validate pyspark.sql.DataFrame
objects directly. First,
install pandera
with the pyspark
extra:
pip install 'pandera[pyspark]'
What’s different?¶
Compared to the way pandera
deals with pandas dataframes, there are some
small changes to support the nuances of pyspark SQL and the expectations that
users have when working with pyspark SQL dataframes:
The output of
schema.validate
will produce a dataframe in pyspark SQL even in case of errors during validation. Instead of raising the error, the errors are collected and can be accessed via thedataframe.pandera.errors
attribute as shown in this example.Note
This design decision is based on the expectation that most use cases for pyspark SQL dataframes means entails a production ETL setting. In these settings, pandera prioritizes completing the production load and saving the data quality issues for downstream rectification.
Unlike the pandera pandas schemas, the default behaviour of the pyspark SQL version for errors is
lazy=True
, i.e. all the errors would be collected instead of raising at first error instance.There is no support for lambda based vectorized checks since in spark lambda checks needs UDFs, which is inefficient. However pyspark sql does support custom checks via the
register_check_method()
decorator.The custom check has to return a scalar boolean value instead of a series.
In defining the type annotation, there is limited support for default python data types such as
int
,str
, etc. When using thepandera.pyspark
API, usingpyspark.sql.types
based datatypes such asStringType
,IntegerType
, etc. is highly recommended.
Basic Usage¶
In this section, lets look at an end to end example of how pandera would work in a native pyspark implementation.
import pandera.pyspark as pa
import pyspark.sql.types as T
from decimal import Decimal
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pandera.pyspark import DataFrameModel
spark = SparkSession.builder.getOrCreate()
class PanderaSchema(DataFrameModel):
id: T.IntegerType() = pa.Field(gt=5)
product_name: T.StringType() = pa.Field(str_startswith="B")
price: T.DecimalType(20, 5) = pa.Field()
description: T.ArrayType(T.StringType()) = pa.Field()
meta: T.MapType(T.StringType(), T.StringType()) = pa.Field()
data = [
(5, "Bread", Decimal(44.4), ["description of product"], {"product_category": "dairy"}),
(15, "Butter", Decimal(99.0), ["more details here"], {"product_category": "bakery"}),
]
spark_schema = T.StructType(
[
T.StructField("id", T.IntegerType(), False),
T.StructField("product", T.StringType(), False),
T.StructField("price", T.DecimalType(20, 5), False),
T.StructField("description", T.ArrayType(T.StringType(), False), False),
T.StructField(
"meta", T.MapType(T.StringType(), T.StringType(), False), False
),
],
)
df = spark.createDataFrame(data, spark_schema)
df.show()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/07 15:13:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 0:> (0 + 1) / 1]
+---+-------+--------+--------------------+--------------------+
| id|product| price| description| meta|
+---+-------+--------+--------------------+--------------------+
| 5| Bread|44.40000|[description of p...|{product_category...|
| 15| Butter|99.00000| [more details here]|{product_category...|
+---+-------+--------+--------------------+--------------------+
In example above, the PanderaSchema
class inherits from the DataFrameModel
base
class. It has type annotations for 5 fields with 2 of the fields having checks
enforced e.g. gt=5
and str_startswith="B"
.
Just to simulate some schema and data validations, we also defined native spark’s
schema spark_schema
and enforced it on our dataframe df
.
Next, you can use the validate()
function to validate
pyspark sql dataframes at runtime.
df_out = PanderaSchema.validate(check_obj=df)
df_out
DataFrame[id: int, product: string, price: decimal(20,5), description: array<string>, meta: map<string,string>]
After running validate()
, the returned object df_out
will be a pyspark
dataframe extended to hold validation results exposed via
a pandera
attribute.
Pandera Pyspark Error Report¶
new in 0.16.0
You can print the validation results as follows:
import json
df_out_errors = df_out.pandera.errors
print(json.dumps(dict(df_out_errors), indent=4))
{
"SCHEMA": {
"COLUMN_NOT_IN_DATAFRAME": [
{
"schema": "PanderaSchema",
"column": "PanderaSchema",
"check": "column_in_dataframe",
"error": "column 'product_name' not in dataframe Row(id=5, product='Bread', price=Decimal('44.40000'), description=['description of product'], meta={'product_category': 'dairy'})"
}
],
"WRONG_DATATYPE": [
{
"schema": "PanderaSchema",
"column": "description",
"check": "dtype('ArrayType(StringType(), True)')",
"error": "expected column 'description' to have type ArrayType(StringType(), True), got ArrayType(StringType(), False)"
},
{
"schema": "PanderaSchema",
"column": "meta",
"check": "dtype('MapType(StringType(), StringType(), True)')",
"error": "expected column 'meta' to have type MapType(StringType(), StringType(), True), got MapType(StringType(), StringType(), False)"
}
]
},
"DATA": {
"DATAFRAME_CHECK": [
{
"schema": "PanderaSchema",
"column": "id",
"check": "greater_than(5)",
"error": "column 'id' with type IntegerType() failed validation greater_than(5)"
}
]
}
}
As seen above, the error report is aggregated on 2 levels in a python dict
object:
The type of validation:
SCHEMA
orDATA
The category of errors such as
DATAFRAME_CHECK
orWRONG_DATATYPE
, etc.
This error report is easily consumed by downstream applications such as timeseries visualization of errors over time.
Important
It’s critical to extract errors report from df_out.pandera.errors
as any
further pyspark
operations may reset the attribute.
Granular Control of Pandera’s Execution¶
new in 0.16.0
By default, error reports are generated for both schema and data level validation. Adding support for pysqark SQL also comes with more granular control over the execution of Pandera’s validation flow.
This is achieved by introducing configurable settings using environment variables that allow you to control execution at three different levels:
SCHEMA_ONLY
: perform schema validations only. It checks that data conforms to the schema definition, but does not perform any data-level validations on dataframe.DATA_ONLY
: perform data-level validations only. It validates that data conforms to the definedchecks
, but does not validate the schema.SCHEMA_AND_DATA
: (default) perform both schema and data level validations. It runs most exhaustive validation and could be compute intensive.
You can override default behaviour by setting an environment variable from terminal
before running the pandera
process as:
export PANDERA_VALIDATION_DEPTH=SCHEMA_ONLY
This will be picked up by pandera
to only enforce SCHEMA level validations.
Switching Validation On and Off¶
new in 0.16.0
It’s very common in production to enable or disable certain services to save computing resources. We thought about it and thus introduced a switch to enable or disable pandera in production.
You can override default behaviour by setting an environment variable from terminal
before running the pandera
process as follow:
export PANDERA_VALIDATION_ENABLED=False
This will be picked up by pandera
to disable all validations in the application.
By default, validations are enabled and depth is set to SCHEMA_AND_DATA
which
can be changed to SCHEMA_ONLY
or DATA_ONLY
as required by the use case.
Caching control¶
new in 0.17.3
Given Spark’s architecture and Pandera’s internal implementation of PySpark integration that relies on filtering conditions and count commands, the PySpark DataFrame being validated by a Pandera schema may be reprocessed multiple times, as each count command triggers a new underlying Spark action. This processing overhead is directly related to the amount of schema and data checks added to the Pandera schema.
To avoid such reprocessing time, Pandera allows you to cache the PySpark DataFrame before validation starts, through the use of two environment variables:
export PANDERA_CACHE_DATAFRAME=True # Default is False, do not `cache()` by default
export PANDERA_KEEP_CACHED_DATAFRAME=True # Default is False, `unpersist()` by default
The first controls if current DataFrame state should be cached in your Spark Session before the validation starts. The second controls if such cached state should still be kept after the validation ends.
Note
To cache or not is a trade-off analysis: if you have enough memory to keep the dataframe cached, it will speed up the validation timings as the validation process will make use of this cached state.
Keeping the cached state and opting for not throwing it away when the validation ends is important when the Pandera validation of a dataset is not an individual process, but one step of the pipeline: if you have a pipeline that, in a single Spark session, uses Pandera to evaluate all input dataframes before transforming them in an result that will be written to disk, it may make sense to not throw away the cached states in this session. In the end, the already processed states of these dataframes will still be used after the validation ends and storing them in memory may be beneficial.
Registering Custom Checks¶
pandera
already offers an interface to register custom checks functions so
that they’re available in the Check
namespace. See
the extensions document for more information.
Unlike the pandera pandas API, pyspark sql does not support lambda function inside check
.
It is because to implement lambda functions would mean introducing spark UDF which
is expensive operation due to serialization, hence it is better to create native pyspark function.
Note: The output of the function should be a boolean value True
for passed and
False
for failure. Unlike the Pandas version which expect it to be a series
of boolean values.
from pandera.extensions import register_check_method
import pyspark.sql.types as T
@register_check_method
def new_pyspark_check(pyspark_obj, *, max_value) -> bool:
"""Ensure values of the data are strictly below a maximum value.
:param max_value: Upper bound not to be exceeded. Must be
a type comparable to the dtype of the column datatype of pyspark
"""
cond = col(pyspark_obj.column_name) <= max_value
return pyspark_obj.dataframe.filter(~cond).limit(1).count() == 0
class Schema(DataFrameModel):
"""Schema"""
product: T.StringType()
code: T.IntegerType() = pa.Field(
new_pyspark_check={
"max_value": 30
}
)
Adding Metadata at the Dataframe and Field level¶
new in 0.16.0
In real world use cases, we often need to embed additional information on objects.
Pandera that allows users to store additional metadata at Field
and
Schema
/ Model
levels. This feature is designed to provide greater context
and information about the data, which can be leveraged by other applications.
For example, by storing details about a specific column, such as data type, format, or units, developers can ensure that downstream applications are able to interpret and use the data correctly. Similarly, by storing information about which columns of a schema are needed for a specific use case, developers can optimize data processing pipelines, reduce storage costs, and improve query performance.
import pyspark.sql.types as T
class PanderaSchema(DataFrameModel):
"""Pandera Schema Class"""
product_id: T.IntegerType() = pa.Field()
product_class: T.StringType() = pa.Field(
metadata={
"search_filter": "product_pricing",
},
)
product_name: T.StringType() = pa.Field()
price: T.DecimalType(20, 5) = pa.Field()
class Config:
"""Config of pandera class"""
name = "product_info"
strict = True
coerce = True
metadata = {"category": "product-details"}
As seen in above example, product_class
field has additional embedded information
such as search_filter
. This metadata can be leveraged to search and filter
multiple schemas for certain keywords.
This is clearly a very basic example, but the possibilities are endless with having
metadata at Field
and `DataFrame`
levels.
We also provided a helper function to extract metadata from a schema as follows:
PanderaSchema.get_metadata()
{'product_info': {'columns': {'product_id': None,
'product_class': {'search_filter': 'product_pricing'},
'product_name': None,
'price': None},
'dataframe': {'category': 'product-details'}}}
Note
This feature is available for pyspark.sql
and pandas
both.
unique
support¶
new in 0.17.3
Warning
The unique
support for PySpark-based validations to define which columns must be
tested for unique values may incur in a performance hit, given Spark’s distributed
nature. It only works with Config
.
Use with caution.
Supported and Unsupported Functionality¶
Since the pandera-pyspark-sql integration is less mature than pandas support, some of the functionality offered by the pandera with pandas DataFrames are not yet supported with pyspark sql DataFrames.
Here is a list of supported and unsupported features. You can refer to the supported features matrix to see which features are implemented in the pyspark-sql validation backend.