Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OutOfMemoryError when uploading large partition to GCP snowflake #518

Open
kposborne opened this issue Jul 18, 2023 · 1 comment
Open

OutOfMemoryError when uploading large partition to GCP snowflake #518

kposborne opened this issue Jul 18, 2023 · 1 comment

Comments

@kposborne
Copy link

In the code, it looks like this buffers the whole partition to memory. This was hit when attempting to sync a Postgres table to snowflake. The workaround was to reduce the partition size by setting the partitionColumn, lowerBound, upperBound in the reader options, but this is not always convenient.

   // For GCP, the rows are cached and then uploaded.
    else if (fileTransferMetadata.isDefined) {
      // cache the data in buffer
      val outputStream = new ByteArrayOutputStream(4 * 1024 * 1024)
      while (rows.hasNext) {
        outputStream.write(rows.next.getBytes("UTF-8"))
        outputStream.write('\n')
        rowCount += 1
      }
      val data = outputStream.toByteArray
      dataSize = data.size
      outputStream.close()
2023-07-13T16:05:54,485 ERROR executor.Executor: Exception in task 0.0 in stage 8.0 (TID 34)
java.lang.OutOfMemoryError: null
	at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:125) ~[?:?]
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:119) ~[?:?]
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95) ~[?:?]
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156) ~[?:?]
	at java.io.OutputStream.write(OutputStream.java:122) ~[?:?]
	at net.snowflake.spark.snowflake.io.CloudStorage.doUploadPartition(CloudStorageOperations.scala:717) ~[net_snowflake_spark_snowflake_2.12_2.12.0_spark_3.4_spark_snowflake_2.12_2.12.0_spark_3.4.jar:2.12.0-spark_3.4]
	at net.snowflake.spark.snowflake.io.CloudStorage.uploadPartition(CloudStorageOperations.scala:611) ~[net_snowflake_spark_snowflake_2.12_2.12.0_spark_3.4_spark_snowflake_2.12_2.12.0_spark_3.4.jar:2.12.0-spark_3.4]
	at net.snowflake.spark.snowflake.io.CloudStorage.uploadPartition$(CloudStorageOperations.scala:594) ~[net_snowflake_spark_snowflake_2.12_2.12.0_spark_3.4_spark_snowflake_2.12_2.12.0_spark_3.4.jar:2.12.0-spark_3.4]
	at net.snowflake.spark.snowflake.io.InternalGcsStorage.uploadPartition(CloudStorageOperations.scala:1726) ~[net_snowflake_spark_snowflake_2.12_2.12.0_spark_3.4_spark_snowflake_2.12_2.12.0_spark_3.4.jar:2.12.0-spark_3.4]
	at net.snowflake.spark.snowflake.io.InternalGcsStorage.$anonfun$upload$2(CloudStorageOperations.scala:1855) ~[net_snowflake_spark_snowflake_2.12_2.12.0_spark_3.4_spark_snowflake_2.12_2.12.0_spark_3.4.jar:2.12.0-spark_3.4]
	at net.snowflake.spark.snowflake.io.InternalGcsStorage.$anonfun$upload$2$adapted(CloudStorageOperations.scala:1839) ~[net_snowflake_spark_snowflake_2.12_2.12.0_spark_3.4_spark_snowflake_2.12_2.12.0_spark_3.4.jar:2.12.0-spark_3.4]
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:905) ~[spark_core_core.jar:?]
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:905) ~[spark_core_core.jar:?]
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark_core_core.jar:?]
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark_core_core.jar:?]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark_core_core.jar:?]
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) ~[spark_core_core.jar:?]
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) ~[spark_core_core.jar:?]
	at org.apache.spark.scheduler.Task.run(Task.scala:139) ~[spark_core_core.jar:?]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) ~[spark_core_core.jar:?]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) ~[spark_core_core.jar:?]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) [spark_core_core.jar:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]
@sixdimensionalarray
Copy link

Any update on the status of this issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants