pandera.api.pyspark.model.DataFrameModel

class pandera.api.pyspark.model.DataFrameModel(*args, **kwargs)[source]

Definition of a DataFrameSchema.

new in 0.16.0

See the User Guide for more.

Check if all columns in a dataframe have a column in the Schema.

Parameters:
  • check_obj – DataFrame object i.e. the dataframe to be validated.

  • head – Not used since spark has no concept of head or tail

  • tail – Not used since spark has no concept of head or tail

  • sample – validate a random sample of n% rows. Value ranges from 0-1, for example 10% rows can be sampled using setting value as 0.1. refer below documentation. https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.sample.html

  • random_state – random seed for the sample argument.

  • lazy – if True, lazily evaluates dataframe against all validation checks and raises a SchemaErrors. Otherwise, raise SchemaError as soon as one occurs.

  • inplace – if True, applies coercion to the object of validation, otherwise creates a copy of the data.

Returns:

validated DataFrame

Raises:

SchemaError – when DataFrame violates built-in or custom checks.

Example:

Calling schema.validate returns the dataframe.

>>> import pandera.pyspark as psa
>>> from pyspark.sql import SparkSession
>>> import pyspark.sql.types as T
>>> spark = SparkSession.builder.getOrCreate()
>>>
>>> data = [("Bread", 9), ("Butter", 15)]
>>> spark_schema = T.StructType(
...         [
...             T.StructField("product", T.StringType(), False),
...             T.StructField("price", T.IntegerType(), False),
...         ],
...     )
>>> df = spark.createDataFrame(data=data, schema=spark_schema)
>>>
>>> schema_withchecks = psa.DataFrameSchema(
...         columns={
...             "product": psa.Column("str", checks=psa.Check.str_startswith("B")),
...             "price": psa.Column("int", checks=psa.Check.gt(5)),
...         },
...         name="product_schema",
...         description="schema for product info",
...         title="ProductSchema",
...     )
>>>
>>> schema_withchecks.validate(df).take(2)
    [Row(product='Bread', price=9), Row(product='Butter', price=15)]

Methods

classmethod get_metadata()[source]

Provide metadata for columns and schema level

Return type:

Optional[dict]

classmethod pydantic_validate(schema_model)[source]

Verify that the input is a compatible dataframe model.

Return type:

DataFrameModel

classmethod to_ddl()[source]

Recover fields of DataFrameModel as a PySpark DDL string.

Return type:

str

Returns:

String with current model fields, in compact DDL format.

classmethod to_schema()[source]

Create DataFrameSchema from the DataFrameModel.

Return type:

DataFrameSchema

classmethod to_structtype()[source]

Recover fields of DataFrameModel as a PySpark StructType object.

Return type:

StructType

Returns:

StructType object with current model fields.

classmethod to_yaml(stream=None)[source]

Convert Schema to yaml using io.to_yaml.

classmethod validate(check_obj, head=None, tail=None, sample=None, random_state=None, lazy=True, inplace=False)[source]

Check if all columns in a dataframe have a column in the Schema.

Parameters:
  • check_obj (DataFrame) – DataFrame object i.e. the dataframe to be validated.

  • head (Optional[int]) – Not used since spark has no concept of head or tail

  • tail (Optional[int]) – Not used since spark has no concept of head or tail

  • sample (Optional[int]) – validate a random sample of n% rows. Value ranges from 0-1, for example 10% rows can be sampled using setting value as 0.1. refer below documentation. https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.sample.html

  • random_state (Optional[int]) – random seed for the sample argument.

  • lazy (bool) – if True, lazily evaluates dataframe against all validation checks and raises a SchemaErrors. Otherwise, raise SchemaError as soon as one occurs.

  • inplace (bool) – if True, applies coercion to the object of validation, otherwise creates a copy of the data.

Return type:

DataFrame[~TDataFrameModel]

Returns:

validated DataFrame

Raises:

SchemaError – when DataFrame violates built-in or custom checks.

Example:

Calling schema.validate returns the dataframe.

>>> import pandera.pyspark as psa
>>> from pyspark.sql import SparkSession
>>> import pyspark.sql.types as T
>>> spark = SparkSession.builder.getOrCreate()
>>>
>>> data = [("Bread", 9), ("Butter", 15)]
>>> spark_schema = T.StructType(
...         [
...             T.StructField("product", T.StringType(), False),
...             T.StructField("price", T.IntegerType(), False),
...         ],
...     )
>>> df = spark.createDataFrame(data=data, schema=spark_schema)
>>>
>>> schema_withchecks = psa.DataFrameSchema(
...         columns={
...             "product": psa.Column("str", checks=psa.Check.str_startswith("B")),
...             "price": psa.Column("int", checks=psa.Check.gt(5)),
...         },
...         name="product_schema",
...         description="schema for product info",
...         title="ProductSchema",
...     )
>>>
>>> schema_withchecks.validate(df).take(2)
    [Row(product='Bread', price=9), Row(product='Butter', price=15)]