Data Validation with Pyspark Pandas¶
new in 0.10.0
Pyspark is a
distributed compute framework that offers a pandas drop-in replacement dataframe
implementation via the pyspark.pandas API .
You can use pandera to validate DataFrame()
and Series() objects directly. First, install
pandera with the pyspark extra:
pip install 'pandera[pyspark]'
Then you can use pandera schemas to validate pyspark dataframes. In the example
below we’ll use the class-based API to define a
DataFrameModel for validation.
import pyspark.pandas as ps
import pandas as pd
import pandera as pa
from pandera.typing.pyspark import DataFrame, Series
class Schema(pa.DataFrameModel):
state: Series[str]
city: Series[str]
price: Series[int] = pa.Field(in_range={"min_value": 5, "max_value": 20})
# create a pyspark.pandas dataframe that's validated on object initialization
df = DataFrame[Schema](
{
'state': ['FL','FL','FL','CA','CA','CA'],
'city': [
'Orlando',
'Miami',
'Tampa',
'San Francisco',
'Los Angeles',
'San Diego',
],
'price': [8, 12, 10, 16, 20, 18],
}
)
print(df)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/01 19:03:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 0:> (0 + 2) / 2]
[Stage 5:> (0 + 2) / 2]
25/03/01 19:03:41 ERROR ArrowPythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/docs/checkouts/readthedocs.org/user_builds/pandera/envs/v0.23.0/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1225, in main
eval_type = read_int(infile)
^^^^^^^^^^^^^^^^
File "/home/docs/checkouts/readthedocs.org/user_builds/pandera/envs/v0.23.0/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 596, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/03/01 19:03:41 ERROR ArrowPythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/docs/checkouts/readthedocs.org/user_builds/pandera/envs/v0.23.0/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1225, in main
eval_type = read_int(infile)
^^^^^^^^^^^^^^^^
File "/home/docs/checkouts/readthedocs.org/user_builds/pandera/envs/v0.23.0/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 596, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/03/01 19:03:41 ERROR ArrowPythonRunner: This may have been caused by a prior exception:
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/03/01 19:03:41 ERROR ArrowPythonRunner: This may have been caused by a prior exception:
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/03/01 19:03:41 ERROR Executor: Exception in task 1.0 in stage 5.0 (TID 6)
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/03/01 19:03:41 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/03/01 19:03:41 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5) (build-27359557-project-456848-pandera executor driver): java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/03/01 19:03:41 ERROR TaskSetManager: Task 0 in stage 5.0 failed 1 times; aborting job
state city price
0 FL Orlando 8
1 FL Miami 12
2 FL Tampa 10
3 CA San Francisco 16
4 CA Los Angeles 20
5 CA San Diego 18
You can also use the check_types() decorator to validate
pyspark pandas dataframes at runtime:
@pa.check_types
def function(df: DataFrame[Schema]) -> DataFrame[Schema]:
return df[df["state"] == "CA"]
print(function(df))
25/03/01 19:03:43 ERROR ArrowPythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/docs/checkouts/readthedocs.org/user_builds/pandera/envs/v0.23.0/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1225, in main
eval_type = read_int(infile)
^^^^^^^^^^^^^^^^
File "/home/docs/checkouts/readthedocs.org/user_builds/pandera/envs/v0.23.0/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 596, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/03/01 19:03:43 ERROR ArrowPythonRunner: This may have been caused by a prior exception:
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/03/01 19:03:43 ERROR Executor: Exception in task 1.0 in stage 13.0 (TID 15)
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/03/01 19:03:43 WARN TaskSetManager: Lost task 1.0 in stage 13.0 (TID 15) (build-27359557-project-456848-pandera executor driver): java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/03/01 19:03:43 ERROR TaskSetManager: Task 1 in stage 13.0 failed 1 times; aborting job
25/03/01 19:03:43 WARN TaskSetManager: Lost task 0.0 in stage 13.0 (TID 14) (build-27359557-project-456848-pandera executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 13.0 failed 1 times, most recent failure: Lost task 1.0 in stage 13.0 (TID 15) (build-27359557-project-456848-pandera executor driver): java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
Driver stacktrace:)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Cell In[2], line 5
1 @pa.check_types
2 def function(df: DataFrame[Schema]) -> DataFrame[Schema]:
3 return df[df["state"] == "CA"]
----> 5 print(function(df))
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.23.0/pandera/decorators.py:838, in check_types.<locals>._wrapper(*args, **kwargs)
836 out = validate_arguments(wrapped)(*args, **kwargs)
837 else:
--> 838 validated_pos, validated_kwd = validate_inputs(args, kwargs)
839 out = wrapped(*validated_pos, **validated_kwd)
840 return _check_arg("return", out)
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.23.0/pandera/decorators.py:814, in check_types.<locals>.validate_inputs(args, kwargs)
810 def validate_inputs(
811 args: Tuple[Any, ...],
812 kwargs: Dict[str, Any],
813 ) -> Tuple[List[Any], Dict[str, Any]]:
--> 814 validated_pos = validate_args(sig.bind_partial(*args).arguments, args)
815 validated_kwd = validate_kwargs(
816 sig.bind_partial(**kwargs).arguments, kwargs
817 )
818 return validated_pos, validated_kwd
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.23.0/pandera/decorators.py:765, in check_types.<locals>.validate_args(named_arguments, arguments)
762 return list((*explicit_args_tuple, *star_args_tuple))
764 else:
--> 765 return list(
766 _check_arg(arg_name, arg_value)
767 for arg_name, arg_value in named_arguments.items()
768 )
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.23.0/pandera/decorators.py:766, in <genexpr>(.0)
762 return list((*explicit_args_tuple, *star_args_tuple))
764 else:
765 return list(
--> 766 _check_arg(arg_name, arg_value)
767 for arg_name, arg_value in named_arguments.items()
768 )
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.23.0/pandera/decorators.py:682, in check_types.<locals>._check_arg(arg_name, arg_value)
674 if (
675 not hasattr(arg_value, "pandera")
676 or arg_value.pandera.schema is None
(...) 679 or arg_value.pandera.schema != schema
680 ):
681 try:
--> 682 arg_value = schema.validate(
683 arg_value,
684 head,
685 tail,
686 sample,
687 random_state,
688 lazy,
689 inplace,
690 )
691 except errors.SchemaError as e:
692 error_handler.collect_error(
693 validation_type(
694 errors.SchemaErrorReason.INVALID_TYPE
(...) 704 ),
705 )
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.23.0/pandera/api/pandas/container.py:126, in DataFrameSchema.validate(self, check_obj, head, tail, sample, random_state, lazy, inplace)
114 check_obj = check_obj.map_partitions( # type: ignore [operator]
115 self._validate,
116 head=head,
(...) 122 meta=check_obj,
123 )
124 return check_obj.pandera.add_schema(self)
--> 126 return self._validate(
127 check_obj=check_obj,
128 head=head,
129 tail=tail,
130 sample=sample,
131 random_state=random_state,
132 lazy=lazy,
133 inplace=inplace,
134 )
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.23.0/pandera/api/pandas/container.py:147, in DataFrameSchema._validate(self, check_obj, head, tail, sample, random_state, lazy, inplace)
136 def _validate(
137 self,
138 check_obj: pd.DataFrame,
(...) 144 inplace: bool = False,
145 ) -> pd.DataFrame:
--> 147 return self.get_backend(check_obj).validate(
148 check_obj,
149 schema=self,
150 head=head,
151 tail=tail,
152 sample=sample,
153 random_state=random_state,
154 lazy=lazy,
155 inplace=inplace,
156 )
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.23.0/pandera/backends/pandas/container.py:104, in DataFrameSchemaBackend.validate(self, check_obj, schema, head, tail, sample, random_state, lazy, inplace)
99 components = self.collect_schema_components(
100 check_obj, schema, column_info
101 )
103 # run the checks
--> 104 error_handler = self.run_checks_and_handle_errors(
105 error_handler,
106 schema,
107 check_obj,
108 column_info,
109 sample,
110 components,
111 lazy,
112 head,
113 tail,
114 random_state,
115 )
117 if error_handler.collected_errors:
118 if getattr(schema, "drop_invalid_rows", False):
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.23.0/pandera/backends/pandas/container.py:161, in DataFrameSchemaBackend.run_checks_and_handle_errors(self, error_handler, schema, check_obj, column_info, sample, components, lazy, head, tail, random_state)
150 core_checks = [
151 (self.check_column_names_are_unique, (check_obj, schema)),
152 (self.check_column_presence, (check_obj, schema, column_info)),
(...) 158 (self.run_checks, (sample, schema)),
159 ]
160 for check, args in core_checks:
--> 161 results = check(*args) # type: ignore [operator]
162 if isinstance(results, CoreCheckResult):
163 results = [results]
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.23.0/pandera/backends/pandas/container.py:217, in DataFrameSchemaBackend.run_schema_component_checks(self, check_obj, schema, schema_components, lazy)
213 # disable coercion at the schema component level since the
214 # dataframe-level schema already coerced it.
215 schema_component.coerce = False # type: ignore
--> 217 result = schema_component.validate(
218 check_obj, lazy=lazy, inplace=True
219 )
221 # revert the schema component mutations
222 schema_component.dtype = _orig_dtype
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.23.0/pandera/api/dataframe/components.py:149, in ComponentSchema.validate(self, check_obj, head, tail, sample, random_state, lazy, inplace)
120 def validate(
121 self,
122 check_obj,
(...) 129 ):
130 # pylint: disable=too-many-locals,too-many-branches,too-many-statements
131 """Validate a series or specific column in dataframe.
132
133 :check_obj: data object to validate.
(...) 147
148 """
--> 149 return self.get_backend(check_obj).validate(
150 check_obj,
151 schema=self,
152 head=head,
153 tail=tail,
154 sample=sample,
155 random_state=random_state,
156 lazy=lazy,
157 inplace=inplace,
158 )
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.23.0/pandera/backends/pandas/components.py:138, in ColumnBackend.validate(self, check_obj, schema, head, tail, sample, random_state, lazy, inplace)
132 if getattr(schema, "drop_invalid_rows", False):
133 # replace the check_obj with the validated
134 check_obj = validate_column(
135 check_obj, column_name, return_check_obj=True
136 )
--> 138 validated_column = validate_column(
139 check_obj,
140 column_name,
141 return_check_obj=True,
142 )
143 if schema.parsers:
144 check_obj[column_name] = validated_column
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.23.0/pandera/backends/pandas/components.py:74, in ColumnBackend.validate.<locals>.validate_column(check_obj, column_name, return_check_obj)
69 try:
70 # pylint: disable=super-with-arguments
71 # make sure the schema component mutations are reverted after
72 # validation
73 _orig_name = schema.name
---> 74 validated_check_obj = super(ColumnBackend, self).validate(
75 check_obj,
76 schema.set_name(column_name),
77 head=head,
78 tail=tail,
79 sample=sample,
80 random_state=random_state,
81 lazy=lazy,
82 inplace=inplace,
83 )
84 # revert the schema component mutations
85 schema.name = _orig_name
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.23.0/pandera/backends/pandas/array.py:77, in ArraySchemaBackend.validate(self, check_obj, schema, head, tail, sample, random_state, lazy, inplace)
71 check_obj = self.run_parsers(
72 schema,
73 check_obj,
74 )
76 # run the core checks
---> 77 error_handler = self.run_checks_and_handle_errors(
78 error_handler,
79 schema,
80 check_obj,
81 head=head,
82 tail=tail,
83 sample=sample,
84 random_state=random_state,
85 )
87 if lazy and error_handler.collected_errors:
88 if getattr(schema, "drop_invalid_rows", False):
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.23.0/pandera/backends/pandas/array.py:120, in ArraySchemaBackend.run_checks_and_handle_errors(self, error_handler, schema, check_obj, **subsample_kwargs)
111 core_checks = [
112 (self.check_name, (field_obj_subsample, schema)),
113 (self.check_nullable, (field_obj_subsample, schema)),
(...) 116 (self.run_checks, (check_obj_subsample, schema)),
117 ]
119 for core_check, args in core_checks:
--> 120 results = core_check(*args)
121 if isinstance(results, CoreCheckResult):
122 results = [results]
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.23.0/pandera/validation_depth.py:69, in validate_scope.<locals>._wrapper.<locals>.wrapper(self, check_obj, *args, **kwargs)
63 logger.debug(
64 f"Skipping execution of check {func.__name__} since "
65 "validation depth is set to DATA_ONLY.",
66 stacklevel=2,
67 )
68 return CoreCheckResult(passed=True)
---> 69 return func(self, check_obj, *args, **kwargs)
71 elif scope == ValidationScope.DATA:
72 if config.validation_depth == ValidationDepth.SCHEMA_ONLY:
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.23.0/pandera/backends/pandas/array.py:302, in ArraySchemaBackend.check_dtype(self, check_obj, schema)
297 msg = (
298 f"expected series '{check_obj.name}' to have type "
299 f"{schema.dtype}, got {check_obj.dtype}"
300 )
301 else:
--> 302 passed = dtype_check_results.all()
303 failure_cases = reshape_failure_cases(
304 check_obj[~dtype_check_results.astype(bool)],
305 ignore_na=False,
306 )
307 msg = (
308 f"expected series '{check_obj.name}' to have type "
309 f"{schema.dtype}:\nfailure cases:\n{failure_cases}"
310 )
File ~/checkouts/readthedocs.org/user_builds/pandera/envs/v0.23.0/lib/python3.11/site-packages/pyspark/pandas/base.py:1128, in IndexOpsMixin.all(self, axis, skipna)
1123 # `any` and `every` was added as of Spark 3.0.
1124 # ret = sdf.select(F.expr("every(CAST(`%s` AS BOOLEAN))" % sdf.columns[0])).collect()[0][0]
1125 # We use min as its alternative as below.
1126 if isinstance(self.spark.data_type, NumericType) or skipna:
1127 # np.nan takes no effect to the result; None takes no effect if `skipna`
-> 1128 ret = sdf.select(F.min(F.coalesce(col.cast("boolean"), F.lit(True)))).collect()[0][0]
1129 else:
1130 # Take None as False when not `skipna`
1131 ret = sdf.select(
1132 F.min(F.when(col.isNull(), F.lit(False)).otherwise(col.cast("boolean")))
1133 ).collect()[0][0]
File ~/checkouts/readthedocs.org/user_builds/pandera/envs/v0.23.0/lib/python3.11/site-packages/pyspark/sql/dataframe.py:1263, in DataFrame.collect(self)
1243 """Returns all the records as a list of :class:`Row`.
1244
1245 .. versionadded:: 1.3.0
(...) 1260 [Row(age=14, name='Tom'), Row(age=23, name='Alice'), Row(age=16, name='Bob')]
1261 """
1262 with SCCallSiteSync(self._sc):
-> 1263 sock_info = self._jdf.collectToPython()
1264 return list(_load_from_socket(sock_info, BatchedSerializer(CPickleSerializer())))
File ~/checkouts/readthedocs.org/user_builds/pandera/envs/v0.23.0/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File ~/checkouts/readthedocs.org/user_builds/pandera/envs/v0.23.0/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw)
177 def deco(*a: Any, **kw: Any) -> Any:
178 try:
--> 179 return f(*a, **kw)
180 except Py4JJavaError as e:
181 converted = convert_exception(e.java_exception)
File ~/checkouts/readthedocs.org/user_builds/pandera/envs/v0.23.0/lib/python3.11/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o393.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 13.0 failed 1 times, most recent failure: Lost task 1.0 in stage 13.0 (TID 15) (build-27359557-project-456848-pandera executor driver): java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
And of course, you can use the object-based API to validate dask dataframes:
schema = pa.DataFrameSchema({
"state": pa.Column(str),
"city": pa.Column(str),
"price": pa.Column(int, pa.Check.in_range(min_value=5, max_value=20))
})
schema(df)