Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java.lang.IllegalStateException: /checkpoint/round/sources/0/shard-commit/1 does not exist #57

Open
HariprasadAllaka1612 opened this issue Oct 30, 2019 · 27 comments
Assignees

Comments

@HariprasadAllaka1612
Copy link

Hey,

I am using Kinesis-sql repo 2.4.0 branch and when i am trying to write to checkpoint to S3 i am facing this issue.

Initially i thought this might be an issue with eventual consistency of S3 but seems like not. This is happening in my local as well.

Seems like there is an issue with folder creation in shard-commit,

Code snapshot:

val roundStreamDf = sc.readStream
.format("kinesis")
.option("streamName",streamName)
.option("endpointUrl",s"https://kinesis.${region}.amazonaws.com")
.option("awsAccessKeyId", acceskeyId)
.option("awsSecretKey", secretAccessKey)
.option("startingposition",START_POS)
.option("kinesis.client.describeShardInterval","3600")
.load()

val roundStreamData1 = roundStreamDf
  .selectExpr("cast (data as STRING) jsonData")
  .select(from_json(col("jsonData"), ScalaReflection.schemaFor[Round].dataType.asInstanceOf[StructType]).as("round"))
  .select("round.*")

val query = roundStreamData1
.writeStream
.foreachBatch{(batchDf: DataFrame, _ ) =>
val RoundDf = commonRoundDataProcess(batchDf)
RoundDf.persist()
while(opNameIterator.hasNext){
val opName = opNameIterator.next()
val finalRoundDf = RoundDf.filter(col("OperatorShortName") === opName)
accessDataS3.writeDataToRefinedHudiS3(sc,finalRoundDf,extraOptions,opName,ROUND_TYPE_OF_DATA)
}
RoundDf.unpersist()
}
.outputMode("update")
.trigger(Trigger.ProcessingTime("5 seconds"))
.option("checkpointLocation",s"${checkPointBucket}/checkpoint/${ROUND_TYPE_OF_DATA}/")
.start()

query.awaitTermination()
Log:
2019-10-30 15:06:44 ERROR MicroBatchExecution:91 - Query [id = 56565a9a-c8bc-4818-97ab-0cfe6b345b79, runId = e9623150-3306-4cc0-8b8e-d85edc811cff] terminated with error
java.lang.IllegalStateException: Gave up after 3 retries while fetching MetaData, last exception:
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$withRetry$2.apply(HDFSMetadataCommitter.scala:241)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$withRetry$2.apply(HDFSMetadataCommitter.scala:241)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:240)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150)
at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:272)
at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:157)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.IllegalStateException: s3a://gat-datastreaming-resources-dev/checkpoint/round/sources/0/shard-commit/1 does not exist
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229)
... 30 more
org.apache.spark.sql.streaming.StreamingQueryException: Gave up after 3 retries while fetching MetaData, last exception:
=== Streaming Query ===
Identifier: [id = 56565a9a-c8bc-4818-97ab-0cfe6b345b79, runId = e9623150-3306-4cc0-8b8e-d85edc811cff]
Current Committed Offsets: {KinesisSource[gat-rounds-stream]: {"metadata":{"streamName":"gat-rounds-stream","batchId":"1"},"shardId-000000000000":{"iteratorType":"AFTER_SEQUENCE_NUMBER","iteratorPosition":"49600867143952561714236237710058640910088621183742246914"}}}
Current Available Offsets: {KinesisSource[gat-rounds-stream]: {"metadata":{"streamName":"gat-rounds-stream","batchId":"1"},"shardId-000000000000":{"iteratorType":"AFTER_SEQUENCE_NUMBER","iteratorPosition":"49600867143952561714236237710058640910088621183742246914"}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [round#17.Type AS Type#19, round#17.PId AS PId#20L, round#17.RId AS RId#21, round#17.UId AS UId#22, round#17.GId AS GId#23, round#17.GS AS GS#24, round#17.STS AS STS#25, round#17.FGExtId AS FGExtId#26, round#17.Dev AS Dev#27, round#17.CTS AS CTS#28, round#17.PCcy AS PCcy#29, round#17.PSysXR AS PSysXR#30, round#17.BetTotal AS BetTotal#31, round#17.BetBonus AS BetBonus#32, round#17.WinTotal AS WinTotal#33, round#17.WinBonus AS WinBonus#34, round#17.GGR AS GGR#35, round#17.JpContr AS JpContr#36, round#17.JpPO AS JpPO#37, round#17.JpSeed AS JpSeed#38, round#17.OpName AS OpName#39]
+- Project [jsontostructs(StructField(Type,StringType,true), StructField(PId,LongType,true), StructField(RId,StringType,true), StructField(UId,StringType,true), StructField(GId,StringType,true), StructField(GS,StringType,true), StructField(STS,StringType,true), StructField(FGExtId,StringType,true), StructField(Dev,StringType,true), StructField(CTS,StringType,true), StructField(PCcy,StringType,true), StructField(PSysXR,DoubleType,true), StructField(BetTotal,DoubleType,true), StructField(BetBonus,DoubleType,true), StructField(WinTotal,DoubleType,true), StructField(WinBonus,DoubleType,true), StructField(GGR,DoubleType,true), StructField(JpContr,DoubleType,true), StructField(JpPO,DoubleType,true), StructField(JpSeed,DoubleType,true), StructField(OpName,StringType,true), jsonData#15, Some(Europe/Berlin)) AS round#17]
+- Project [cast(data#5 as string) AS jsonData#15]
+- StreamingExecutionRelation KinesisSource[gat-rounds-stream], [data#5, streamName#6, partitionKey#7, sequenceNumber#8, approximateArrivalTimestamp#9]

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

Caused by: java.lang.IllegalStateException: Gave up after 3 retries while fetching MetaData, last exception:
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$withRetry$2.apply(HDFSMetadataCommitter.scala:241)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$withRetry$2.apply(HDFSMetadataCommitter.scala:241)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:240)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150)
at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:272)
at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:157)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
... 1 more
Caused by: java.lang.IllegalStateException: s3a://gat-datastreaming-resources-dev/checkpoint/round/sources/0/shard-commit/1 does not exist
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229)
... 30 more

Process finished with exit code 0

@itsvikramagr
Copy link
Contributor

@HariprasadAllaka1612 - You need to check the executor logs to see why the checkpoint location is not created.

@HariprasadAllaka1612
Copy link
Author

@HariprasadAllaka1612 - You need to check the executor logs to see why the checkpoint location is not created.

The checkpoint location is getting created. When using foreachbatch,

  1. Batch 0 is executing successfully and writing the data to offsets, shard-commit and commits folders.
  2. When executing batch 1 its writing to offsets and commits but not writing the data to shard-commits
  3. When it goes to batch 2 to when getting previousbatch info its failing as shard-commit is not created for batch 1

@itsvikramagr
Copy link
Contributor

Sure. Let me reproduce the issue and provide a fix.

@HariprasadAllaka1612
Copy link
Author

Hi @itsvikramagr Could you please let us know if there is any update on this issue? Sorry to bother, just wanted to know when we can have our code changed based on this fix :)

@itsvikramagr
Copy link
Contributor

Hey @HariprasadAllaka1612 - I could not spend any cycle on it. Let me do it tomorrow.

@itsvikramagr
Copy link
Contributor

@HariprasadAllaka1612 - I can reproduce the issue in certain scenarios. I still haven't got to the bottom of it. Will start a PR as soon as we have a solution for it.

@HariprasadAllaka1612
Copy link
Author

@HariprasadAllaka1612 - I can reproduce the issue in certain scenarios. I still haven't got to the bottom of it. Will start a PR as soon as we have a solution for it.

@itsvikramagr Thanks alot for looking into it. Please let me know if you need any help from my side

@3mlabs
Copy link

3mlabs commented Feb 11, 2020

Hi, we are also facing a similar issue while consuming kinesis stream through spark structured streaming. Please find below error -
20/02/11 14:24:34 WARN HDFSMetadataCommitter: Error while fetching MetaData [attempt = 1]
java.lang.IllegalStateException: file:/tmp/tmp23/checkpoint/customer/sources/0/shard-commit/0 does not exist
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150)
at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:272)
at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:157)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

it tries for 3 times and failed. I think this is same issue except one difference that our script is failing for 0th batch itself and not going for 1 st batch.

Please let us know if we ave any workaround for this.

@HariprasadAllaka1612
Copy link
Author

Hi, we are also facing a similar issue while consuming kinesis stream through spark structured streaming. Please find below error -
20/02/11 14:24:34 WARN HDFSMetadataCommitter: Error while fetching MetaData [attempt = 1]
java.lang.IllegalStateException: file:/tmp/tmp23/checkpoint/customer/sources/0/shard-commit/0 does not exist
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150)
at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:272)
at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:157)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

it tries for 3 times and failed. I think this is same issue except one difference that our script is failing for 0th batch itself and not going for 1 st batch.

Please let us know if we ave any workaround for this.

Hey,

As of now, I am streaming the data in micro batches and not trying to do any foreach batch operations. I separated my process as stream and batch where i am batch processing based on the time its written to S3.

@3mlabs
Copy link

3mlabs commented Feb 12, 2020

@HariprasadAllaka1612, I was referring to the comment you added issue for 'foreachBatch' Support (#58), we are trying to do the same thing and facing similar issue you posted. Do you have any resolution to use 'foreachBatch' function.

@HariprasadAllaka1612
Copy link
Author

@HariprasadAllaka1612, I was referring to the comment you added issue for 'foreachBatch' Support (#58), we are trying to do the same thing and facing similar issue you posted. Do you have any resolution to use 'foreachBatch' function.

unfortunately, no. @itsvikramagr is working on fixing the issue

@itsvikramagr
Copy link
Contributor

@HariprasadAllaka1612 and @3mlabs - Apologies. I am a little behind on this one. This looks like an involved issue and we have to understand how forEachBatch is working and why is it impacting us. We might have to revisit the design of the connector.

@itsvikramagr
Copy link
Contributor

assigning the issue to @abhishekd0907

@abhishekd0907
Copy link
Contributor

@HariprasadAllaka1612 and @3mlabs - Can you please give more details on the following points:

  1. Can you check if new tasks are created by foreachBatch for every batch? Since tasks write to checkpoint/round/sources/0/shard-commit, it is possible that directory is not created if there are no tasks in the previous batch. I could reproduce this by writing a
    .foreachBatch(batchDf: Dataframe, batchId: Long) { //no-op }

  2. Can you give more details about the operation inside your foreachBatch code. It is possible that foreachBatch is leading to a no-op or not triggering any tasks for some batches.

  3. Can you trigger an action for batchDf inside foreachBatch code, eg. batchDf.count to trigger tasks?

@itsvikramagr
Copy link
Contributor

@HariprasadAllaka1612 and @3mlabs - We debugged the issue and could reproduce it only when forEachBatch was a no-op. i.e it wasn't generating any tasks. Can you please confirm this if this is true in your case.

We added a explicit action in the foreachBatch code such as batchDf.count. Can you try this hack and see if you can make you application run without any issues?

@HariprasadAllaka1612
Copy link
Author

Hi @itsvikramagr @abhishekd0907 I will try this and let you know

@arnehuang
Copy link
Contributor

Hi, ran into this as well writing to s3. Just wondering if any updates?

@itsvikramagr
Copy link
Contributor

@arnehuang - are you using forEachbatch for writing into S3? If not, can you raise another ticket with a way to reproduce and stack trace?

@arnehuang
Copy link
Contributor

Sorry, after debugging I realized it has to do with s3 consistency and not this library. Using hdfs / s3guard fixed it for me.

@sudeshnasengupta
Copy link

Hi @itsvikramagr and @abhishekd0907 : we're facing the same call stack issue in our streaming job and are not using any "foreach" or "foreachBatch". Has there been any further progress on this issue, if not a fix? Is there any relationship between this error and the configuration option "kinesis.client.avoidEmptyBatches"?

Our streaming job does the following when it handles the StreamingQuery:

{
  org.apache.spark.sql.streaming.StreamingQuery query = eventsToWriteOut.writeStream()
          .format("parquet")
          .outputMode("append")
          .option("checkpointLocation", checkpointLocation)
          .option("path", path)
          .option("charset", "UTF-8")              
          .partitionBy("solution_id", "env", "event_date")
          .trigger(Trigger.ProcessingTime(triggerInterval))
          .start();
  
  if(query.lastProgress() != null) {
    System.out.println("PROGRESS " + query.lastProgress().prettyJson());
  }
  try {
    query.awaitTermination();
  } catch (StreamingQueryException e) {        
    LOGGER.error("Error terminating query", e);
  }        
}

FYI, I'm pasting below a snippet of our specific error's call stack:

20/08/14 17:51:53 INFO IngressJob: In runJob Parameters Config(SimpleConfigObject({"checkpointDirectory":,"describeShardInterval":"5m","duplicateWindow":"0 minutes","endpointUrl":,"fetchBufferSize":"25gb","maxFetchTime":"30000","maxRecordsPerFetch":"25000","maxRecordsPerRead":"10000","path":,"region":,"runInterval":-1,"streamName":,"triggerInterval":"60 seconds"}))
20/08/14 17:51:53 INFO IngressJob: Stream Name , Trigger interval 60 seconds, EndpointRUL
20/08/14 17:51:53 INFO IngressJob: Continuous mode false, run for (-60000 millisecs)
20/08/14 17:51:53 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.executor.memoryOverhead' instead.
20/08/14 17:51:53 WARN SparkConf: The configuration key 'spark.yarn.driver.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.driver.memoryOverhead' instead.
20/08/14 17:51:53 WARN SparkContext: Using an existing SparkContext; some configuration may not take effect.
20/08/14 17:51:53 INFO SharedState: loading hive config file: file:/mnt/yarn/usercache/hadoop/appcache/application_1597427394593_0001/container_1597427394593_0001_01_000001/hive-site.xml
20/08/14 17:51:53 INFO JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /SQL.
20/08/14 17:51:53 INFO JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /SQL/json.
20/08/14 17:51:53 INFO JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /SQL/execution.
20/08/14 17:51:53 INFO JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /SQL/execution/json.
20/08/14 17:51:53 INFO JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /static/sql.
20/08/14 17:51:53 INFO AMRMClientImpl: Received new token for : ip-10-15-190-241.us-west-2.compute.internal:8041
20/08/14 17:51:54 INFO YarnAllocator: Launching container container_1597427394593_0001_01_000002 on host ip-10-15-190-241.us-west-2.compute.internal for executor with ID 1
20/08/14 17:51:54 INFO YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them.
20/08/14 17:51:54 INFO ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
20/08/14 17:51:54 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
20/08/14 17:51:55 INFO YarnAllocator: Launching container container_1597427394593_0001_01_000003 on host ip-10-15-190-241.us-west-2.compute.internal for executor with ID 2
20/08/14 17:51:55 INFO YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them.
20/08/14 17:51:55 INFO ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
20/08/14 17:51:56 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
20/08/14 17:51:56 INFO AMRMClientImpl: Received new token for : ip-10-15-190-228.us-west-2.compute.internal:8041
20/08/14 17:51:56 INFO YarnAllocator: Launching container container_1597427394593_0001_01_000004 on host ip-10-15-190-228.us-west-2.compute.internal for executor with ID 3
20/08/14 17:51:56 INFO YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them.
20/08/14 17:51:56 INFO ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
20/08/14 17:51:58 INFO S3NativeFileSystem: Opening for reading
20/08/14 17:51:59 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.15.190.241:40008) with ID 1
20/08/14 17:51:59 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.15.190.241:40010) with ID 2
20/08/14 17:51:59 INFO ExecutorAllocationManager: New executor 1 has registered (new total is 1)
20/08/14 17:51:59 INFO ExecutorAllocationManager: New executor 2 has registered (new total is 2)
20/08/14 17:51:59 INFO MicroBatchExecution: Starting [id = 0240d731-82b2-47b9-8cde-c7fd5a15ded0, runId = 900fc90e-5747-44d5-a209-89cc7399e210]. Use to store the query checkpoint.
20/08/14 17:51:59 INFO MicroBatchExecution: Using Source [KinesisSource[]] from DataSourceV2 named 'kinesis' [org.apache.spark.sql.kinesis.KinesisSourceProvider@48719d58]
20/08/14 17:51:59 INFO BlockManagerMasterEndpoint: Registering block manager ip-10-15-190-241.us-west-2.compute.internal:32799 with 5.8 GB RAM, BlockManagerId(2, ip-10-15-190-241.us-west-2.compute.internal, 32799, None)
20/08/14 17:51:59 INFO BlockManagerMasterEndpoint: Registering block manager ip-10-15-190-241.us-west-2.compute.internal:32867 with 5.8 GB RAM, BlockManagerId(1, ip-10-15-190-241.us-west-2.compute.internal, 32867, None)
20/08/14 17:51:59 INFO deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
20/08/14 17:51:59 INFO S3NativeFileSystem: Opening 's3:///checkpoint/offsets/157' for reading
20/08/14 17:51:59 INFO S3NativeFileSystem: Opening 's3:///checkpoint/offsets/156' for reading
20/08/14 17:51:59 INFO S3NativeFileSystem: Opening 's3:///checkpoint/commits/157' for reading
20/08/14 17:51:59 INFO KinesisSource: End Offset is {"shardId-000000000539":{"iteratorType":"SHARD_END","iteratorPosition":""},"metadata":{"streamName":,"batchId":"157"},"shardId-000000000535":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000538":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000534":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000537":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000536":{"iteratorType":"SHARD_END","iteratorPosition":""}}
20/08/14 17:51:59 INFO YarnAllocator: Launching container container_1597427394593_0001_01_000007 on host ip-10-15-190-228.us-west-2.compute.internal for executor with ID 4
20/08/14 17:51:59 INFO YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them.
20/08/14 17:51:59 INFO KinesisSource: Processing 0 shards from Stream()
20/08/14 17:51:59 INFO ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
20/08/14 17:52:00 INFO KinesisSource: GetBatch generating RDD of offset range:
20/08/14 17:52:00 INFO MicroBatchExecution: Resuming at batch 158 with committed offsets {KinesisSource[]: {"shardId-000000000539":{"iteratorType":"SHARD_END","iteratorPosition":""},"metadata":{"streamName":,"batchId":"157"},"shardId-000000000535":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000538":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000534":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000537":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000536":{"iteratorType":"SHARD_END","iteratorPosition":""}}} and available offsets {KinesisSource[]: {"shardId-000000000539":{"iteratorType":"SHARD_END","iteratorPosition":""},"metadata":{"streamName":,"batchId":"157"},"shardId-000000000535":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000538":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000534":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000537":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000536":{"iteratorType":"SHARD_END","iteratorPosition":""}}}
20/08/14 17:52:00 INFO MicroBatchExecution: Stream started from {KinesisSource[]: {"shardId-000000000539":{"iteratorType":"SHARD_END","iteratorPosition":""},"metadata":{"streamName":,"batchId":"157"},"shardId-000000000535":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000538":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000534":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000537":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000536":{"iteratorType":"SHARD_END","iteratorPosition":""}}}
20/08/14 17:52:00 WARN HDFSMetadataCommitter: Error while fetching MetaData [attempt = 1]
java.lang.IllegalStateException: s3:///checkpoint/sources/0/shard-commit/157 does not exist
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150)
at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:275)
at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:163)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
20/08/14 17:52:01 WARN HDFSMetadataCommitter: Error while fetching MetaData [attempt = 2]
java.lang.IllegalStateException: s3:///checkpoint/sources/0/shard-commit/157 does not exist
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150)
at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:275)
at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:163)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
20/08/14 17:52:01 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.15.190.228:52110) with ID 3
20/08/14 17:52:01 INFO ExecutorAllocationManager: New executor 3 has registered (new total is 3)
20/08/14 17:52:01 INFO BlockManagerMasterEndpoint: Registering block manager ip-10-15-190-228.us-west-2.compute.internal:40549 with 5.8 GB RAM, BlockManagerId(3, ip-10-15-190-228.us-west-2.compute.internal, 40549, None)
20/08/14 17:52:02 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.15.190.228:52118) with ID 4
20/08/14 17:52:02 INFO ExecutorAllocationManager: New executor 4 has registered (new total is 4)
20/08/14 17:52:03 INFO BlockManagerMasterEndpoint: Registering block manager ip-10-15-190-228.us-west-2.compute.internal:41259 with 5.8 GB RAM, BlockManagerId(4, ip-10-15-190-228.us-west-2.compute.internal, 41259, None)
20/08/14 17:52:03 WARN HDFSMetadataCommitter: Error while fetching MetaData [attempt = 3]
java.lang.IllegalStateException: s3:///checkpoint/sources/0/shard-commit/157 does not exist
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150)
at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:275)
at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:163)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
20/08/14 17:52:03 ERROR MicroBatchExecution: Query [id = 0240d731-82b2-47b9-8cde-c7fd5a15ded0, runId = 900fc90e-5747-44d5-a209-89cc7399e210] terminated with error
java.lang.IllegalStateException: Gave up after 3 retries while fetching MetaData, last exception:
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$withRetry$2.apply(HDFSMetadataCommitter.scala:241)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$withRetry$2.apply(HDFSMetadataCommitter.scala:241)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:240)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150)
at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:275)
at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:163)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: java.lang.IllegalStateException: s3:///checkpoint/sources/0/shard-commit/157 does not exist
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151)
at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229)
... 30 more
20/08/14 17:52:03 ERROR IngressJob: Error terminating query
org.apache.spark.sql.streaming.StreamingQueryException: Gave up after 3 retries while fetching MetaData, last exception:
=== Streaming Query ===
Identifier: [id = 0240d731-82b2-47b9-8cde-c7fd5a15ded0, runId = 900fc90e-5747-44d5-a209-89cc7399e210]
Current Committed Offsets: {KinesisSource[]: {"shardId-000000000539":{"iteratorType":"SHARD_END","iteratorPosition":""},"metadata":{"streamName":,"batchId":"157"},"shardId-000000000535":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000538":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000534":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000537":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000536":{"iteratorType":"SHARD_END","iteratorPosition":""}}}
Current Available Offsets: {KinesisSource[]: {"shardId-000000000539":{"iteratorType":"SHARD_END","iteratorPosition":""},"metadata":{"streamName":,"batchId":"157"},"shardId-000000000535":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000538":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000534":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000537":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000536":{"iteratorType":"SHARD_END","iteratorPosition":""}}}

Thanks!

@itsvikramagr
Copy link
Contributor

itsvikramagr commented Aug 17, 2020

@sudeshnasengupta - the issue is because of a design choice we have taken in this connector. shard Metadata is updated by the executor tasks which read from kinesis on task completion which works fine since every micro-batch will some shard to read from.

In your case, I can see that your existing shards are closed and the streaming job has read all the data in the closed shard '

INFO KinesisSource: End Offset is {"shardId-000000000539":{"iteratorType":"SHARD_END","iteratorPosition":""},"metadata":{"streamName":,"batchId":"157"},"shardId-000000000535":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000538":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000534":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000537":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000536":{"iteratorType":"SHARD_END","iteratorPosition":""}}

There is no shard to read from. Hence no tasks are launched in the micro-batch job
20/08/14 17:51:59 INFO KinesisSource: Processing 0 shards from Stream()
The issue with no open shards resulting in streaming job failure can be handled by adding a few checks. But the core of the problem is that, for a micro-batch, we need to have at tasks for KinesisRDD to update metadata. We either have to force the task launch, or we handed the job completion event which takes care of it (or add some RPC call).

Any fixes with RPC call or handling job completion might require a change in spark core which can not be provided as a separate module/jar. We will add it to the Qubole distribution of Apache Spark and I can share the patch here (once it's done) but then you have to ask your vendor to include it in their Apache Spark distributions.

@sudeshnasengupta
Copy link

Hi @itsvikramagr : Indeed, our use case where this error happened is: we resharded the Kinesis stream, while it was being queried by the Spark streaming job, and we continued to push events to the stream, post-resharding, as well as query the stream, without waiting the 24-hours (retention period) after resharding.

@itsvikramagr
Copy link
Contributor

@sudeshnasengupta - Your use-case can be handled within this connector. Let me start a PR for the same.

@sudeshnasengupta
Copy link

That'll be great, @itsvikramagr! Just to clarify our desired use case:

  • While one user (user1) is running the Spark streaming job, another user (user2) reshards (up or down) the Kinesis stream and waits until the resharding completes successfully, before firing new events to the stream.
  • User user1 expects Spark streaming job to query the stream successfully for all events posted to the stream prior to the resharding.
  • User user2 expects to fire new events to post to the Kinesis stream without waiting for the stream's retention period to expire after resharding has completed.
  • After new events are posted to stream, user2 expects all events to be successfully consumed/queried by Spark streaming job already kicked off by user1.
    Thanks!

@ghost
Copy link

ghost commented Nov 7, 2020

I am also running into a similar error as others here:

java.lang.IllegalStateException: s3://......./sources/0/shard-commit/1630 does not exist at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150) at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:275) at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:163) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

This is a Spark structured stream job reading from a Kinesis stream. I did notice this error occurred around the time I made some shard adjustments on my stream.

It seems this path s3://......./sources/0/shard-commit is the checkpoint location. Would doing something like this - #93 - and persisting it the last offset (or timestamp) after successful batch processing work instead of relying on Spark checkpointing?

@gabor-arki-epam
Copy link

@WTa-hash this is unrelated to the issue. You should not checkpoint on S3 because it is not an HDFS compliant filesystem.

The checkpointing logic writes temporary files renaming them at the end, but on S3 there is no rename instead the entire object is written again and the old one is deleted. Due to this being not an atomic operation the checkpoint can stuck in a bad, inconsistent state and a job trying to use it will fail due to missing files. The larger your checkpoint is the more likely this is going to happen. For reliable checkpointing you can either use HDFS or attach EFS. See:
https://cm.engineering/using-hdfs-to-store-spark-streaming-application-checkpoints-2146c1960d30
https://blog.yuvalitzchakov.com/improving-spark-streaming-checkpoint-performance-with-aws-efs/

@dgoldenberg-audiomack
Copy link

I'm seeing the same problem. Is the recommended workaround then to use HDFS or EFS for checkpointing?

Feels like the whole qubole package then is basically not in a working state...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants