Error Reports

new in 0.19.0

The pandera error report is a generalised machine-readable summary of failures which occured during schema validation. It is available for both pysparksql and pandas objects.

By default, error reports are generated for both schema and data level validation, but more granular control over schema or data only validations is available.

This is achieved by introducing configurable settings using environment variables that allow you to control execution at three different levels:

  1. SCHEMA_ONLY: perform schema validations only. It checks that data conforms to the schema definition, but does not perform any data-level validations on dataframe.

  2. DATA_ONLY: perform data-level validations only. It validates that data conforms to the defined checks, but does not validate the schema.

  3. SCHEMA_AND_DATA: (default) perform both schema and data level validations. It runs most exhaustive validation and could be compute intensive.

You can override default behaviour by setting an environment variable from terminal before running the pandera process as:

export PANDERA_VALIDATION_DEPTH=SCHEMA_ONLY

This will be picked up by pandera to only enforce SCHEMA level validations.

Error reports with pandas

To create an error report with pandas, you must specify lazy=True to allow all errors to be aggregated and raised together as a SchemaErrors.

import pandas as pd
import pandera as pa
import json

pandas_schema = pa.DataFrameSchema(
    {
        "color": pa.Column(str, pa.Check.isin(["red", "green", "blue"])),
        "length": pa.Column(int, pa.Check.gt(10)),
    }
)
data = [("red", 4), ("blue", 11), ("purple", 15), ("green", 39)]

df = pd.DataFrame(
    {
        "color": ["red", "blue", "purple", "green"],
        "length": [4, 11, 15, 39],
    }
)

try:
    pandas_schema.validate(df, lazy=True)
except pa.errors.SchemaErrors as e:
    print(json.dumps(e.message, indent=2))
{
  "DATA": {
    "DATAFRAME_CHECK": [
      {
        "schema": null,
        "column": "color",
        "check": "isin(['red', 'green', 'blue'])",
        "error": "Column 'color' failed element-wise validator number 0: isin(['red', 'green', 'blue']) failure cases: purple"
      },
      {
        "schema": null,
        "column": "length",
        "check": "greater_than(10)",
        "error": "Column 'length' failed element-wise validator number 0: greater_than(10) failure cases: 4"
      }
    ]
  }
}

Error reports with pyspark.sql

Accessing the error report on a validated pyspark dataframe can be done via the errors attribute on the pandera accessor.

import pandera.pyspark as pa
import pyspark.sql.types as T
import json

from decimal import Decimal
from pyspark.sql import SparkSession
from pandera.pyspark import DataFrameModel

spark = SparkSession.builder.getOrCreate()

class PysparkPanderSchema(DataFrameModel):
    color: T.StringType() = pa.Field(isin=["red", "green", "blue"])
    length: T.IntegerType() = pa.Field(gt=10)

data = [("red", 4), ("blue", 11), ("purple", 15), ("green", 39)]

spark_schema = T.StructType(
    [
        T.StructField("color", T.StringType(), False),
        T.StructField("length", T.IntegerType(), False),
    ],
)

df = spark.createDataFrame(data, spark_schema)
df_out = PysparkPanderSchema.validate(check_obj=df)

print(json.dumps(dict(df_out.pandera.errors), indent=4))
JAVA_HOME is not set
---------------------------------------------------------------------------
PySparkRuntimeError                       Traceback (most recent call last)
Cell In[2], line 9
      6 from pyspark.sql import SparkSession
      7 from pandera.pyspark import DataFrameModel
----> 9 spark = SparkSession.builder.getOrCreate()
     11 class PysparkPanderSchema(DataFrameModel):
     12     color: T.StringType() = pa.Field(isin=["red", "green", "blue"])

File ~/checkouts/readthedocs.org/user_builds/pandera/envs/latest/lib/python3.10/site-packages/pyspark/sql/session.py:497, in SparkSession.Builder.getOrCreate(self)
    495     sparkConf.set(key, value)
    496 # This SparkContext may be an existing one.
--> 497 sc = SparkContext.getOrCreate(sparkConf)
    498 # Do not update `SparkConf` for existing `SparkContext`, as it's shared
    499 # by all sessions.
    500 session = SparkSession(sc, options=self._options)

File ~/checkouts/readthedocs.org/user_builds/pandera/envs/latest/lib/python3.10/site-packages/pyspark/context.py:515, in SparkContext.getOrCreate(cls, conf)
    513 with SparkContext._lock:
    514     if SparkContext._active_spark_context is None:
--> 515         SparkContext(conf=conf or SparkConf())
    516     assert SparkContext._active_spark_context is not None
    517     return SparkContext._active_spark_context

File ~/checkouts/readthedocs.org/user_builds/pandera/envs/latest/lib/python3.10/site-packages/pyspark/context.py:201, in SparkContext.__init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls, udf_profiler_cls, memory_profiler_cls)
    195 if gateway is not None and gateway.gateway_parameters.auth_token is None:
    196     raise ValueError(
    197         "You are trying to pass an insecure Py4j gateway to Spark. This"
    198         " is not allowed as it is a security risk."
    199     )
--> 201 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
    202 try:
    203     self._do_init(
    204         master,
    205         appName,
   (...)
    215         memory_profiler_cls,
    216     )

File ~/checkouts/readthedocs.org/user_builds/pandera/envs/latest/lib/python3.10/site-packages/pyspark/context.py:436, in SparkContext._ensure_initialized(cls, instance, gateway, conf)
    434 with SparkContext._lock:
    435     if not SparkContext._gateway:
--> 436         SparkContext._gateway = gateway or launch_gateway(conf)
    437         SparkContext._jvm = SparkContext._gateway.jvm
    439     if instance:

File ~/checkouts/readthedocs.org/user_builds/pandera/envs/latest/lib/python3.10/site-packages/pyspark/java_gateway.py:107, in launch_gateway(conf, popen_kwargs)
    104     time.sleep(0.1)
    106 if not os.path.isfile(conn_info_file):
--> 107     raise PySparkRuntimeError(
    108         error_class="JAVA_GATEWAY_EXITED",
    109         message_parameters={},
    110     )
    112 with open(conn_info_file, "rb") as info:
    113     gateway_port = read_int(info)

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.