.. currentmodule:: pandera .. _scaling_fugue: Data Validation with Fugue ========================== Validation on big data comes in two forms. The first is performing one set of validations on data that doesn't fit in memory. The second happens when a large dataset is comprised of multiple groups that require different validations. In pandas semantics, this would be the equivalent of a ``groupby-validate`` operation. This section will cover using ``pandera`` for both of these scenarios. ``Pandera`` has support for ``Spark`` and ``Dask`` DataFrames through ``Modin`` and ``PySpark Pandas``. Another option for running ``pandera`` on top of native ``Spark`` or ``Dask`` engines is `Fugue `_ . ``Fugue`` is an open source abstraction layer that ports ``Python``, ``pandas``, and ``SQL`` code to ``Spark`` and ``Dask``. Operations will be applied on DataFrames natively, minimizing overhead. What is Fugue? -------------- ``Fugue`` serves as an interface to distributed computing. Because of its non-invasive design, existing ``Python`` code can be scaled to a distributed setting without significant changes. To run the example, ``Fugue`` needs to installed separately. Using pip: .. code:: bash pip install fugue[spark] This will also install ``PySpark`` because of the ``spark`` extra. ``Dask`` is available with the ``dask`` extra. Example ------- In this example, a pandas ``DataFrame`` is created with ``state``, ``city`` and ``price`` columns. ``Pandera`` will be used to validate that the ``price`` column values are within a certain range. .. testcode:: scaling_fugue import pandas as pd data = pd.DataFrame( { 'state': ['FL','FL','FL','CA','CA','CA'], 'city': [ 'Orlando', 'Miami', 'Tampa', 'San Francisco', 'Los Angeles', 'San Diego' ], 'price': [8, 12, 10, 16, 20, 18], } ) print(data) .. testoutput:: scaling_fugue state city price 0 FL Orlando 8 1 FL Miami 12 2 FL Tampa 10 3 CA San Francisco 16 4 CA Los Angeles 20 5 CA San Diego 18 Validation is then applied using pandera. A ``price_validation`` function is created that runs the validation. None of this will be new. .. testcode:: scaling_fugue from pandera import Column, DataFrameSchema, Check price_check = DataFrameSchema( {"price": Column(int, Check.in_range(min_value=5,max_value=20))} ) def price_validation(data:pd.DataFrame) -> pd.DataFrame: return price_check.validate(data) The ``transform`` function in ``Fugue`` is the easiest way to use ``Fugue`` with existing ``Python`` functions as seen in the following code snippet. The first two arguments are the ``DataFrame`` and function to apply. The keyword argument ``schema`` is required because schema is strictly enforced in distributed settings. Here, the ``schema`` is simply `*` because no new columns are added. The last part of the ``transform`` function is the ``engine``. Here, a ``SparkSession`` object is used to run the code on top of ``Spark``. For Dask, users can pass a string ``"dask"`` or can pass a Dask Client. Passing nothing uses the default pandas-based engine. Because we passed a SparkSession in this example, the output is a Spark DataFrame. .. testcode:: scaling_fugue :skipif: SKIP_SCALING from fugue import transform from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() spark_df = transform(data, price_validation, schema="*", engine=spark) spark_df.show() .. testoutput:: scaling_fugue :skipif: SKIP_SCALING +-----+-------------+-----+ |state| city|price| +-----+-------------+-----+ | FL| Orlando| 8| | FL| Miami| 12| | FL| Tampa| 10| | CA|San Francisco| 16| | CA| Los Angeles| 20| | CA| San Diego| 18| +-----+-------------+-----+ Validation by Partition ----------------------- There is an interesting use case that arises with bigger datasets. Frequently, there are logical groupings of data that require different validations. In the earlier sample data, the price range for the records with ``state`` FL is lower than the range for the ``state`` CA. Two :class:`~pandera.api.pandas.container.DataFrameSchema` will be created to reflect this. Notice their ranges for the :class:`~pandera.api.checks.Check` differ. .. testcode:: scaling_fugue price_check_FL = DataFrameSchema({ "price": Column(int, Check.in_range(min_value=7,max_value=13)), }) price_check_CA = DataFrameSchema({ "price": Column(int, Check.in_range(min_value=15,max_value=21)), }) price_checks = {'CA': price_check_CA, 'FL': price_check_FL} A slight modification is needed to our ``price_validation`` function. ``Fugue`` will partition the whole dataset into multiple pandas ``DataFrames``. Think of this as a ``groupby``. By the time ``price_validation`` is used, it only contains the data for one ``state``. The appropriate ``DataFrameSchema`` is pulled and then applied. To partition our data by ``state``, all we need to do is pass it into the ``transform`` function through the ``partition`` argument. This splits up the data across different workers before they each run the ``price_validation`` function. Again, this is like a groupby-validation. .. testcode:: scaling_fugue :skipif: SKIP_SCALING def price_validation(df:pd.DataFrame) -> pd.DataFrame: location = df['state'].iloc[0] check = price_checks[location] check.validate(df) return df spark_df = transform(data, price_validation, schema="*", partition=dict(by="state"), engine=spark) spark_df.show() .. testoutput:: scaling_fugue :skipif: SKIP_SCALING SparkDataFrame state:str|city:str |price:long ---------+---------------------------------------------------------+---------- CA |San Francisco |16 CA |Los Angeles |20 CA |San Diego |18 FL |Orlando |8 FL |Miami |12 FL |Tampa |10 Total count: 6 .. note:: Because operations in a distributed setting are applied per partition, statistical validators will be applied on each partition rather than the global dataset. If no partitioning scheme is specified, ``Spark`` and ``Dask`` use default partitions. Be careful about using operations like mean, min, and max without partitioning beforehand. All row-wise validations scale well with this set-up. Returning Errors ----------------- ``Pandera`` will raise a ``SchemaError`` by default that gets buried by the Spark error messages. To return the errors as a DataFrame, we use can use the following approach. If there are no errors in the data, it will just return an empty DataFrame. To keep the errors for each partition, you can attach the partition key as a column in the returned DataFrame. .. testcode:: scaling_fugue :skipif: SKIP_SCALING from pandera.errors import SchemaErrors out_schema = "schema_context:str, column:str, check:str, \ check_number:int, failure_case:str, index:int" out_columns = ["schema_context", "column", "check", "check_number", "failure_case", "index"] price_check = DataFrameSchema( {"price": Column(int, Check.in_range(min_value=12,max_value=20))} ) def price_validation(data:pd.DataFrame) -> pd.DataFrame: try: price_check.validate(data, lazy=True) return pd.DataFrame(columns=out_columns) except SchemaErrors as err: return err.failure_cases transform(data, price_validation, schema=out_schema, engine=spark).show() .. testoutput:: scaling_fugue :skipif: SKIP_SCALING +--------------+------+----------------+------------+------------+-----+ |schema_context|column| check|check_number|failure_case|index| +--------------+------+----------------+------------+------------+-----+ | Column| price|in_range(12, 20)| 0| 8| 0| | Column| price|in_range(12, 20)| 0| 10| 0| +--------------+------+----------------+------------+------------+-----+