Partition error writing from s3 to dynamoDB with spark

I need to write into dynamodb from s3 using spark, and I am getting a writing error in the middle of the writing. I have the feeling the problem comes while changing the partition, because it actually writes SOME of the rows. My code is like this:

SQL_DISTANCE = """
    select 
        origin||destination `primary_key`,
        origin, 
        destination, 
        distance 
    from distance
    """

df_haul = sqlc.read.csv(os.environ["S3_PATH"], schema=customSchema)
df_haul.createOrReplaceTempView("distance")
df_haul = sqlc.sql(SQL_DISTANCE)
df_haul.write.option("tableName", os.environ["DYNAMODB_TABLE"]).option("region", "eu-west-1") 
          .format("dynamodb") 
          .save()

I run it in docker, and it seems to be working at first (I can see many elements added if I scan the table, but then, the pod breaks:

2021-10-06 08:12:22 INFO  TaskSchedulerImpl:54 - Adding task set 1.0 with 4 tasks
2021-10-06 08:12:22 INFO  TaskSetManager:54 - Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, PROCESS_LOCAL, 8355 bytes)
2021-10-06 08:12:22 INFO  TaskSetManager:54 - Starting task 1.0 in stage 1.0 (TID 2, localhost, executor driver, partition 1, PROCESS_LOCAL, 8355 bytes)
2021-10-06 08:12:22 INFO  TaskSetManager:54 - Starting task 2.0 in stage 1.0 (TID 3, localhost, executor driver, partition 2, PROCESS_LOCAL, 8355 bytes)
2021-10-06 08:12:22 INFO  TaskSetManager:54 - Starting task 3.0 in stage 1.0 (TID 4, localhost, executor driver, partition 3, PROCESS_LOCAL, 8355 bytes)
2021-10-06 08:12:22 INFO  Executor:54 - Running task 0.0 in stage 1.0 (TID 1)
2021-10-06 08:12:22 INFO  Executor:54 - Running task 2.0 in stage 1.0 (TID 3)
2021-10-06 08:12:22 INFO  Executor:54 - Running task 3.0 in stage 1.0 (TID 4)
2021-10-06 08:12:22 INFO  Executor:54 - Running task 1.0 in stage 1.0 (TID 2)


2021-10-06 08:12:23 INFO  FileScanRDD:54 - Reading File path: s3a://my-bucket/origin_location_dist_part/0003_part_00, range: 0-24975329, partition values: [empty row]
2021-10-06 08:12:23 INFO  FileScanRDD:54 - Reading File path: s3a://my-bucket/origin_location_dist_part/0000_part_00, range: 0-24974189, partition values: [empty row]
2021-10-06 08:12:23 INFO  FileScanRDD:54 - Reading File path: s3a://my-bucket/origin_location_dist_part/0002_part_00, range: 0-24974688, partition values: [empty row]
2021-10-06 08:12:23 INFO  FileScanRDD:54 - Reading File path: s3a://my-bucket/origin_location_dist_part/0001_part_00, range: 0-24975233, partition values: [empty row]
2021-10-06 08:12:51 ERROR Utils:91 - Aborting task
java.lang.NullPointerException
    at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:34)
    at com.audienceproject.spark.dynamodb.catalyst.JavaConverter$.convertRowValue(JavaConverter.scala:39)
    at com.audienceproject.spark.dynamodb.connector.TableConnector$$anonfun$putItems$1.apply(TableConnector.scala:124)
    at com.audienceproject.spark.dynamodb.connector.TableConnector$$anonfun$putItems$1.apply(TableConnector.scala:118)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at com.audienceproject.spark.dynamodb.connector.TableConnector.putItems(TableConnector.scala:118)
    at com.audienceproject.spark.dynamodb.datasource.DynamoBatchWriter.flush(DynamoBatchWriter.scala:56)
    at com.audienceproject.spark.dynamodb.datasource.DynamoBatchWriter.write(DynamoBatchWriter.scala:43)
    at com.audienceproject.spark.dynamodb.datasource.DynamoBatchWriter.write(DynamoBatchWriter.scala:31)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
2021-10-06 08:12:51 ERROR DataWritingSparkTask:70 - Aborting commit for partition 2 (task 3, attempt 0stage 1.0)
2021-10-06 08:12:51 ERROR DataWritingSparkTask:70 - Aborted commit for partition 2 (task 3, attempt 0stage 1.0)
2021-10-06 08:12:51 ERROR Executor:91 - Exception in task 2.0 in stage 1.0 (TID 3)

I don’t get where this error is coming from, because it actually writes some of them. Just seeing this error, It looks like it is a partition problem, but I don’t know how to solve it.
Thank you very much in advance.

Source: Docker Questions

LEAVE A COMMENT