pandera.api.pyspark.container.DataFrameSchemaΒΆ

class pandera.api.pyspark.container.DataFrameSchema(columns=None, checks=None, dtype=None, coerce=False, strict=False, name=None, ordered=False, unique=None, report_duplicates='all', unique_column_names=False, title=None, description=None, metadata=None)[source]ΒΆ

A light-weight PySpark DataFrame validator.

Initialize DataFrameSchema validator.

Parameters:
  • columns (mapping of column names and column schema component.) – a dict where keys are column names and values are Column objects specifying the datatypes and properties of a particular column.

  • checks (Optional[CheckList]) – dataframe-wide checks.

  • dtype (PySparkDtypeInputTypes) – datatype of the dataframe. This overrides the data types specified in any of the columns. If a string is specified, then assumes one of the valid pyspark string values: https://spark.apache.org/docs/latest/sql-ref-datatypes.html.

  • coerce (bool) – whether or not to coerce all of the columns on validation. This has no effect on columns where dtype=None

  • strict (StrictType) – ensure that all and only the columns defined in the schema are present in the dataframe. If set to β€˜filter’, only the columns in the schema will be passed to the validated dataframe. If set to filter and columns defined in the schema are not present in the dataframe, will throw an error.

  • name (Optional[str]) – name of the schema.

  • ordered (bool) – whether or not to validate the columns order.

  • unique (Optional[Union[str, List[str]]]) – a list of columns that should be jointly unique.

  • report_duplicates (UniqueSettings) – how to report unique errors - exclude_first: report all duplicates except first occurence - exclude_last: report all duplicates except last occurence - all: (default) report all duplicates

  • unique_column_names (bool) – whether or not column names must be unique.

  • title (Optional[str]) – A human-readable label for the schema.

  • description (Optional[str]) – An arbitrary textual description of the schema.

  • metadata (Optional[dict]) – An optional key-value data.

Raises:

SchemaInitError – if impossible to build schema from parameters

Examples:

>>> import pandera.pyspark as psa
>>> import pyspark.sql.types as pt
>>>
>>> schema = psa.DataFrameSchema({
...     "str_column": psa.Column(str),
...     "float_column": psa.Column(float),
...     "int_column": psa.Column(int),
...     "date_column": psa.Column(pt.DateType),
... })

Use the pyspark API to define checks, which takes a function with the signature: ps.Dataframe -> Union[bool] where the output contains boolean values.

>>> schema_withchecks = psa.DataFrameSchema({
...     "probability": psa.Column(
...         pt.DoubleType(), psa.Check.greater_than(0)),
...
...     # check that the "category" column contains a few discrete
...     # values, and the majority of the entries are dogs.
...     "category": psa.Column(
...         pt.StringType(), psa.Check.str_startswith("B"),
...            ),
... })

See here for more usage details.

Attributes

BACKEND_REGISTRY

coerce

Whether to coerce series to specified type.

dtype

Get the dtype property.

dtypes

A dict where the keys are column names and values are DataType s for the column.

properties

Get the properties of the schema for serialization purposes.

unique

List of columns that should be jointly unique.

Methods

__init__(columns=None, checks=None, dtype=None, coerce=False, strict=False, name=None, ordered=False, unique=None, report_duplicates='all', unique_column_names=False, title=None, description=None, metadata=None)[source]ΒΆ

Initialize DataFrameSchema validator.

Parameters:
  • columns (mapping of column names and column schema component.) – a dict where keys are column names and values are Column objects specifying the datatypes and properties of a particular column.

  • checks (Optional[CheckList]) – dataframe-wide checks.

  • dtype (PySparkDtypeInputTypes) – datatype of the dataframe. This overrides the data types specified in any of the columns. If a string is specified, then assumes one of the valid pyspark string values: https://spark.apache.org/docs/latest/sql-ref-datatypes.html.

  • coerce (bool) – whether or not to coerce all of the columns on validation. This has no effect on columns where dtype=None

  • strict (StrictType) – ensure that all and only the columns defined in the schema are present in the dataframe. If set to β€˜filter’, only the columns in the schema will be passed to the validated dataframe. If set to filter and columns defined in the schema are not present in the dataframe, will throw an error.

  • name (Optional[str]) – name of the schema.

  • ordered (bool) – whether or not to validate the columns order.

  • unique (Optional[Union[str, List[str]]]) – a list of columns that should be jointly unique.

  • report_duplicates (UniqueSettings) – how to report unique errors - exclude_first: report all duplicates except first occurence - exclude_last: report all duplicates except last occurence - all: (default) report all duplicates

  • unique_column_names (bool) – whether or not column names must be unique.

  • title (Optional[str]) – A human-readable label for the schema.

  • description (Optional[str]) – An arbitrary textual description of the schema.

  • metadata (Optional[dict]) – An optional key-value data.

Raises:

SchemaInitError – if impossible to build schema from parameters

Examples:

>>> import pandera.pyspark as psa
>>> import pyspark.sql.types as pt
>>>
>>> schema = psa.DataFrameSchema({
...     "str_column": psa.Column(str),
...     "float_column": psa.Column(float),
...     "int_column": psa.Column(int),
...     "date_column": psa.Column(pt.DateType),
... })

Use the pyspark API to define checks, which takes a function with the signature: ps.Dataframe -> Union[bool] where the output contains boolean values.

>>> schema_withchecks = psa.DataFrameSchema({
...     "probability": psa.Column(
...         pt.DoubleType(), psa.Check.greater_than(0)),
...
...     # check that the "category" column contains a few discrete
...     # values, and the majority of the entries are dogs.
...     "category": psa.Column(
...         pt.StringType(), psa.Check.str_startswith("B"),
...            ),
... })

See here for more usage details.

coerce_dtype(check_obj)[source]ΒΆ

Coerce object to the expected type.

Return type:

DataFrame

classmethod from_json(source)[source]ΒΆ

Create DataFrameSchema from json file.

Parameters:

source – str, Path to json schema, or serialized yaml string.

Return type:

DataFrameSchema

Returns:

dataframe schema.

classmethod from_yaml(yaml_schema)[source]ΒΆ

Create DataFrameSchema from yaml file.

Parameters:

yaml_schema – str, Path to yaml schema, or serialized yaml string.

Return type:

DataFrameSchema

Returns:

dataframe schema.

get_dtypes(dataframe)[source]ΒΆ

Same as the dtype property, but expands columns where regex == True based on the supplied dataframe.

Return type:

Dict[str, DataType]

Returns:

dictionary of columns and their associated dtypes.

get_metadata()[source]ΒΆ

Provide metadata for columns and schema level

Return type:

Optional[dict]

static register_default_backends(check_obj_cls)[source]ΒΆ

Register default backends.

This method is invoked in the get_backend method so that the appropriate validation backend is loaded at validation time instead of schema-definition time.

This method needs to be implemented by the schema subclass.

to_ddl()[source]ΒΆ

Recover fields of DataFrameSchema as a Pyspark DDL string.

Return type:

str

Returns:

String with current schema fields, in compact DDL format.

to_json(target: None = None, **kwargs) str[source]ΒΆ
to_json(target: PathLike, **kwargs) None

Write DataFrameSchema to json file.

Parameters:

target (Optional[PathLike]) – file target to write to. If None, dumps to string.

Return type:

Optional[str]

Returns:

json string if target is None, otherwise returns None.

to_script(fp=None)[source]ΒΆ

Create DataFrameSchema from yaml file.

Parameters:

path – str, Path to write script

Return type:

DataFrameSchema

Returns:

dataframe schema.

to_structtype()[source]ΒΆ

Recover fields of DataFrameSchema as a Pyspark StructType object.

As the output of this method will be used to specify a read schema in Pyspark

(avoiding automatic schema inference), the False nullable properties are just ignored, as this check will be executed by the Pandera validations after a dataset is read.

Return type:

StructType

Returns:

StructType object with current schema fields.

to_yaml(stream=None)[source]ΒΆ

Write DataFrameSchema to yaml file.

Parameters:

stream (Optional[PathLike]) – file stream to write to. If None, dumps to string.

Return type:

Optional[str]

Returns:

yaml string if stream is None, otherwise returns None.

validate(check_obj, head=None, tail=None, sample=None, random_state=None, lazy=True, inplace=False)[source]ΒΆ

Check if all columns in a dataframe have a column in the Schema.

Parameters:
  • check_obj (DataFrame) – DataFrame object i.e. the dataframe to be validated.

  • head (Optional[int]) – Not used since spark has no concept of head or tail

  • tail (Optional[int]) – Not used since spark has no concept of head or tail

  • sample (Optional[int]) – validate a random sample of n% rows. Value ranges from 0-1, for example 10% rows can be sampled using setting value as 0.1. refer below documentation. https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.sample.html

  • random_state (Optional[int]) – random seed for the sample argument.

  • lazy (bool) – if True, lazily evaluates dataframe against all validation checks and raises a SchemaErrors. Otherwise, raise SchemaError as soon as one occurs.

  • inplace (bool) – if True, applies coercion to the object of validation, otherwise creates a copy of the data.

Returns:

validated DataFrame

Raises:

SchemaError – when DataFrame violates built-in or custom checks.

Example:

Calling schema.validate returns the dataframe.

>>> import pandera.pyspark as psa
>>> from pyspark.sql import SparkSession
>>> import pyspark.sql.types as T
>>> spark = SparkSession.builder.getOrCreate()
>>>
>>> data = [("Bread", 9), ("Butter", 15)]
>>> spark_schema = T.StructType(
...         [
...             T.StructField("product", T.StringType(), False),
...             T.StructField("price", T.IntegerType(), False),
...         ],
...     )
>>> df = spark.createDataFrame(data=data, schema=spark_schema)
>>>
>>> schema_withchecks = psa.DataFrameSchema(
...         columns={
...             "product": psa.Column("str", checks=psa.Check.str_startswith("B")),
...             "price": psa.Column("int", checks=psa.Check.gt(5)),
...         },
...         name="product_schema",
...         description="schema for product info",
...         title="ProductSchema",
...     )
>>>
>>> schema_withchecks.validate(df).take(2)
    [Row(product='Bread', price=9), Row(product='Butter', price=15)]
__call__(dataframe, head=None, tail=None, sample=None, random_state=None, lazy=True, inplace=False)[source]ΒΆ

Alias for DataFrameSchema.validate() method.

Parameters:
  • dataframe (DataFrame) – DataFrame object i.e. the dataframe to be validated.

  • head (int) – Not used since spark has no concept of head or tail.

  • tail (int) – Not used since spark has no concept of head or tail.

  • sample (Optional[int]) – validate a random sample of n% rows. Value ranges from 0-1, for example 10% rows can be sampled using setting value as 0.1. refer below documentation. https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.sample.html

  • lazy (bool) – if True, lazily evaluates dataframe against all validation checks and raises a SchemaErrors. Otherwise, raise SchemaError as soon as one occurs.

  • inplace (bool) – if True, applies coercion to the object of validation, otherwise creates a copy of the data.