Data Validation with Pyspark Pandas¶
new in 0.10.0
Pyspark is a
distributed compute framework that offers a pandas drop-in replacement dataframe
implementation via the pyspark.pandas API .
You can use pandera to validate DataFrame()
and Series() objects directly. First, install
pandera with the pyspark extra:
pip install 'pandera[pyspark]'
Then you can use pandera schemas to validate pyspark dataframes. In the example
below we’ll use the class-based API to define a
DataFrameModel for validation.
import pyspark.pandas as ps
import pandas as pd
import pandera.pandas as pa
from pandera.typing.pyspark import DataFrame, Series
class Schema(pa.DataFrameModel):
state: Series[str]
city: Series[str]
price: Series[int] = pa.Field(in_range={"min_value": 5, "max_value": 20})
# create a pyspark.pandas dataframe that's validated on object initialization
df = DataFrame[Schema](
{
'state': ['FL','FL','FL','CA','CA','CA'],
'city': [
'Orlando',
'Miami',
'Tampa',
'San Francisco',
'Los Angeles',
'San Diego',
],
'price': [8, 12, 10, 16, 20, 18],
}
)
print(df)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/22 19:06:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 0:> (0 + 2) / 2]
[Stage 5:> (0 + 2) / 2]
25/12/22 19:06:25 ERROR ArrowPythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/docs/checkouts/readthedocs.org/user_builds/pandera/envs/v0.27.1/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1225, in main
eval_type = read_int(infile)
^^^^^^^^^^^^^^^^
File "/home/docs/checkouts/readthedocs.org/user_builds/pandera/envs/v0.27.1/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 596, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:621)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:624)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/12/22 19:06:25 ERROR ArrowPythonRunner: This may have been caused by a prior exception:
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/12/22 19:06:25 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/12/22 19:06:25 ERROR ArrowPythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/docs/checkouts/readthedocs.org/user_builds/pandera/envs/v0.27.1/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1225, in main
eval_type = read_int(infile)
^^^^^^^^^^^^^^^^
File "/home/docs/checkouts/readthedocs.org/user_builds/pandera/envs/v0.27.1/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 596, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:621)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:624)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/12/22 19:06:25 ERROR ArrowPythonRunner: This may have been caused by a prior exception:
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/12/22 19:06:25 ERROR Executor: Exception in task 1.0 in stage 5.0 (TID 6)
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/12/22 19:06:25 WARN TaskSetManager: Lost task 1.0 in stage 5.0 (TID 6) (localhost executor driver): java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/12/22 19:06:25 ERROR TaskSetManager: Task 1 in stage 5.0 failed 1 times; aborting job
state city price
0 FL Orlando 8
1 FL Miami 12
2 FL Tampa 10
3 CA San Francisco 16
4 CA Los Angeles 20
5 CA San Diego 18
You can also use the check_types() decorator to validate
pyspark pandas dataframes at runtime:
@pa.check_types
def function(df: DataFrame[Schema]) -> DataFrame[Schema]:
return df[df["state"] == "CA"]
print(function(df))
25/12/22 19:06:26 ERROR ArrowPythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/docs/checkouts/readthedocs.org/user_builds/pandera/envs/v0.27.1/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1225, in main
eval_type = read_int(infile)
^^^^^^^^^^^^^^^^
File "/home/docs/checkouts/readthedocs.org/user_builds/pandera/envs/v0.27.1/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 596, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:621)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:624)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/12/22 19:06:26 ERROR ArrowPythonRunner: This may have been caused by a prior exception:
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/12/22 19:06:26 ERROR ArrowPythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/docs/checkouts/readthedocs.org/user_builds/pandera/envs/v0.27.1/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1225, in main
eval_type = read_int(infile)
^^^^^^^^^^^^^^^^
File "/home/docs/checkouts/readthedocs.org/user_builds/pandera/envs/v0.27.1/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 596, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:621)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:624)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/12/22 19:06:26 ERROR ArrowPythonRunner: This may have been caused by a prior exception:
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/12/22 19:06:26 ERROR Executor: Exception in task 0.0 in stage 13.0 (TID 14)
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/12/22 19:06:26 ERROR Executor: Exception in task 1.0 in stage 13.0 (TID 15)
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/12/22 19:06:26 WARN TaskSetManager: Lost task 1.0 in stage 13.0 (TID 15) (localhost executor driver): java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
25/12/22 19:06:26 ERROR TaskSetManager: Task 1 in stage 13.0 failed 1 times; aborting job
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Cell In[2], line 5
1 @pa.check_types
2 def function(df: DataFrame[Schema]) -> DataFrame[Schema]:
3 return df[df["state"] == "CA"]
----> 5 print(function(df))
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.27.1/pandera/decorators.py:835, in check_types.<locals>._wrapper(*args, **kwargs)
833 out = validate_arguments(wrapped)(*args, **kwargs)
834 else:
--> 835 validated_pos, validated_kwd = validate_inputs(args, kwargs)
836 out = wrapped(*validated_pos, **validated_kwd)
837 return _check_arg("return", out)
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.27.1/pandera/decorators.py:811, in check_types.<locals>.validate_inputs(args, kwargs)
807 def validate_inputs(
808 args: tuple[Any, ...],
809 kwargs: dict[str, Any],
810 ) -> tuple[list[Any], dict[str, Any]]:
--> 811 validated_pos = validate_args(sig.bind_partial(*args).arguments, args)
812 validated_kwd = validate_kwargs(
813 sig.bind_partial(**kwargs).arguments, kwargs
814 )
815 return validated_pos, validated_kwd
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.27.1/pandera/decorators.py:762, in check_types.<locals>.validate_args(named_arguments, arguments)
759 return list((*explicit_args_tuple, *star_args_tuple))
761 else:
--> 762 return list(
763 _check_arg(arg_name, arg_value)
764 for arg_name, arg_value in named_arguments.items()
765 )
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.27.1/pandera/decorators.py:763, in <genexpr>(.0)
759 return list((*explicit_args_tuple, *star_args_tuple))
761 else:
762 return list(
--> 763 _check_arg(arg_name, arg_value)
764 for arg_name, arg_value in named_arguments.items()
765 )
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.27.1/pandera/decorators.py:683, in check_types.<locals>._check_arg(arg_name, arg_value)
675 if (
676 not hasattr(arg_value, "pandera")
677 or arg_value.pandera.schema is None
(...) 680 or arg_value.pandera.schema != schema
681 ):
682 try:
--> 683 arg_value = schema.validate(
684 arg_value,
685 head,
686 tail,
687 sample,
688 random_state,
689 lazy,
690 inplace,
691 )
692 except errors.SchemaError as e:
693 error_handler.collect_error(
694 validation_type(
695 errors.SchemaErrorReason.INVALID_TYPE
(...) 705 ),
706 )
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.27.1/pandera/api/pandas/container.py:117, in DataFrameSchema.validate(self, check_obj, head, tail, sample, random_state, lazy, inplace)
105 check_obj = check_obj.map_partitions( # type: ignore [operator]
106 self._validate,
107 head=head,
(...) 113 meta=check_obj,
114 )
115 return check_obj.pandera.add_schema(self)
--> 117 return self._validate(
118 check_obj=check_obj,
119 head=head,
120 tail=tail,
121 sample=sample,
122 random_state=random_state,
123 lazy=lazy,
124 inplace=inplace,
125 )
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.27.1/pandera/api/pandas/container.py:137, in DataFrameSchema._validate(self, check_obj, head, tail, sample, random_state, lazy, inplace)
127 def _validate(
128 self,
129 check_obj: pd.DataFrame,
(...) 135 inplace: bool = False,
136 ) -> pd.DataFrame:
--> 137 return self.get_backend(check_obj).validate(
138 check_obj,
139 schema=self,
140 head=head,
141 tail=tail,
142 sample=sample,
143 random_state=random_state,
144 lazy=lazy,
145 inplace=inplace,
146 )
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.27.1/pandera/backends/pandas/container.py:105, in DataFrameSchemaBackend.validate(self, check_obj, schema, head, tail, sample, random_state, lazy, inplace)
100 components = self.collect_schema_components(
101 check_obj, schema, column_info
102 )
104 # run the checks
--> 105 error_handler = self.run_checks_and_handle_errors(
106 error_handler,
107 schema,
108 check_obj,
109 column_info,
110 sample,
111 components,
112 lazy,
113 head,
114 tail,
115 random_state,
116 )
118 if error_handler.collected_errors:
119 if getattr(schema, "drop_invalid_rows", False):
120 # if the failure cases are a string, it means the error is
121 # a schema-level error.
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.27.1/pandera/backends/pandas/container.py:171, in DataFrameSchemaBackend.run_checks_and_handle_errors(self, error_handler, schema, check_obj, column_info, sample, components, lazy, head, tail, random_state)
160 core_checks = [
161 (self.check_column_names_are_unique, (check_obj, schema)),
162 (self.check_column_presence, (check_obj, schema, column_info)),
(...) 168 (self.run_checks, (sample, schema)),
169 ]
170 for check, args in core_checks:
--> 171 results = check(*args)
172 if isinstance(results, CoreCheckResult):
173 results = [results]
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.27.1/pandera/backends/pandas/container.py:227, in DataFrameSchemaBackend.run_schema_component_checks(self, check_obj, schema, schema_components, lazy)
223 # disable coercion at the schema component level since the
224 # dataframe-level schema already coerced it.
225 schema_component.coerce = False # type: ignore
--> 227 result = schema_component.validate(
228 check_obj, lazy=lazy, inplace=True
229 )
231 check_passed.append(is_table(result))
232 except SchemaError as err:
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.27.1/pandera/api/dataframe/components.py:148, in ComponentSchema.validate(self, check_obj, head, tail, sample, random_state, lazy, inplace)
120 def validate(
121 self,
122 check_obj,
(...) 128 inplace: bool = False,
129 ):
130 """Validate a series or specific column in dataframe.
131
132 :check_obj: data object to validate.
(...) 146
147 """
--> 148 return self.get_backend(check_obj).validate(
149 check_obj,
150 schema=self,
151 head=head,
152 tail=tail,
153 sample=sample,
154 random_state=random_state,
155 lazy=lazy,
156 inplace=inplace,
157 )
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.27.1/pandera/backends/pandas/components.py:144, in ColumnBackend.validate(self, check_obj, schema, head, tail, sample, random_state, lazy, inplace)
138 if getattr(schema, "drop_invalid_rows", False):
139 # replace the check_obj with the validated
140 check_obj = validate_column(
141 check_obj, column_name, return_check_obj=True
142 )
--> 144 validated_column = validate_column(
145 check_obj,
146 column_name,
147 return_check_obj=True,
148 )
149 if schema.parsers:
150 check_obj[column_name] = validated_column
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.27.1/pandera/backends/pandas/components.py:80, in ColumnBackend.validate.<locals>.validate_column(check_obj, column_name, return_check_obj)
76 try:
77 # make sure the schema component mutations are reverted after
78 # validation
79 _orig_name = schema.name
---> 80 validated_check_obj = super(ColumnBackend, self).validate(
81 check_obj,
82 schema.set_name(column_name),
83 head=head,
84 tail=tail,
85 sample=sample,
86 random_state=random_state,
87 lazy=lazy,
88 inplace=inplace,
89 )
90 # revert the schema component mutations
91 schema.name = _orig_name
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.27.1/pandera/backends/pandas/array.py:73, in ArraySchemaBackend.validate(self, check_obj, schema, head, tail, sample, random_state, lazy, inplace)
66 error_handler.collect_error(
67 validation_type(exc.reason_code),
68 exc.reason_code,
69 exc,
70 )
72 # run the core checks
---> 73 error_handler = self.run_checks_and_handle_errors(
74 error_handler,
75 schema,
76 check_obj,
77 head=head,
78 tail=tail,
79 sample=sample,
80 random_state=random_state,
81 )
83 if lazy and error_handler.collected_errors:
84 if getattr(schema, "drop_invalid_rows", False):
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.27.1/pandera/backends/pandas/array.py:116, in ArraySchemaBackend.run_checks_and_handle_errors(self, error_handler, schema, check_obj, **subsample_kwargs)
107 core_checks = [
108 (self.check_name, (field_obj_subsample, schema)),
109 (self.check_nullable, (field_obj_subsample, schema)),
(...) 112 (self.run_checks, (check_obj_subsample, schema)),
113 ]
115 for check, args in core_checks:
--> 116 results = check(*args)
117 if isinstance(results, CoreCheckResult):
118 results = [results]
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.27.1/pandera/validation_depth.py:68, in validate_scope.<locals>._wrapper.<locals>.wrapper(self, check_obj, *args, **kwargs)
62 logger.debug(
63 f"Skipping execution of check {func.__name__} since "
64 "validation depth is set to DATA_ONLY.",
65 stacklevel=2,
66 )
67 return CoreCheckResult(passed=True)
---> 68 return func(self, check_obj, *args, **kwargs)
70 elif scope == ValidationScope.DATA:
71 if config.validation_depth == ValidationDepth.SCHEMA_ONLY:
File ~/checkouts/readthedocs.org/user_builds/pandera/checkouts/v0.27.1/pandera/backends/pandas/array.py:296, in ArraySchemaBackend.check_dtype(self, check_obj, schema)
291 msg = (
292 f"expected series '{check_obj.name}' to have type "
293 f"{schema.dtype}, got {check_obj.dtype}"
294 )
295 else:
--> 296 passed = dtype_check_results.all()
297 failure_cases = reshape_failure_cases(
298 check_obj[~dtype_check_results.astype(bool)],
299 ignore_na=False,
300 )
301 msg = (
302 f"expected series '{check_obj.name}' to have type "
303 f"{schema.dtype}:\nfailure cases:\n{failure_cases}"
304 )
File ~/checkouts/readthedocs.org/user_builds/pandera/envs/v0.27.1/lib/python3.11/site-packages/pyspark/pandas/base.py:1128, in IndexOpsMixin.all(self, axis, skipna)
1123 # `any` and `every` was added as of Spark 3.0.
1124 # ret = sdf.select(F.expr("every(CAST(`%s` AS BOOLEAN))" % sdf.columns[0])).collect()[0][0]
1125 # We use min as its alternative as below.
1126 if isinstance(self.spark.data_type, NumericType) or skipna:
1127 # np.nan takes no effect to the result; None takes no effect if `skipna`
-> 1128 ret = sdf.select(F.min(F.coalesce(col.cast("boolean"), F.lit(True)))).collect()[0][0]
1129 else:
1130 # Take None as False when not `skipna`
1131 ret = sdf.select(
1132 F.min(F.when(col.isNull(), F.lit(False)).otherwise(col.cast("boolean")))
1133 ).collect()[0][0]
File ~/checkouts/readthedocs.org/user_builds/pandera/envs/v0.27.1/lib/python3.11/site-packages/pyspark/sql/dataframe.py:1263, in DataFrame.collect(self)
1243 """Returns all the records as a list of :class:`Row`.
1244
1245 .. versionadded:: 1.3.0
(...) 1260 [Row(age=14, name='Tom'), Row(age=23, name='Alice'), Row(age=16, name='Bob')]
1261 """
1262 with SCCallSiteSync(self._sc):
-> 1263 sock_info = self._jdf.collectToPython()
1264 return list(_load_from_socket(sock_info, BatchedSerializer(CPickleSerializer())))
File ~/checkouts/readthedocs.org/user_builds/pandera/envs/v0.27.1/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File ~/checkouts/readthedocs.org/user_builds/pandera/envs/v0.27.1/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw)
177 def deco(*a: Any, **kw: Any) -> Any:
178 try:
--> 179 return f(*a, **kw)
180 except Py4JJavaError as e:
181 converted = convert_exception(e.java_exception)
File ~/checkouts/readthedocs.org/user_builds/pandera/envs/v0.27.1/lib/python3.11/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o393.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 13.0 failed 1 times, most recent failure: Lost task 1.0 in stage 13.0 (TID 15) (localhost executor driver): java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2898)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2834)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2833)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2833)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1253)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1253)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1253)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3102)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3036)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3025)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
And of course, you can use the object-based API to validate dask dataframes:
schema = pa.DataFrameSchema({
"state": pa.Column(str),
"city": pa.Column(str),
"price": pa.Column(int, pa.Check.in_range(min_value=5, max_value=20))
})
schema(df)