Skip to content
This repository has been archived by the owner on Oct 19, 2020. It is now read-only.

Sort partitions to comply with spark internal checks #6

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

spajus
Copy link

@spajus spajus commented Sep 21, 2017

See for more details:

https://issues.apache.org/jira/browse/SPARK-13021
apache/spark@32f7411

Thanks to @ljank for help implementing this fix.

/cc @koertkuipers

@@ -192,7 +192,7 @@ class KafkaRDD private (sc: SparkContext, val topic: String, val offsets: Map[In
"missing offsets for partition(s) " + (leaders.keySet -- offsets.keySet).toList.sorted.mkString(", ")
)

protected def getPartitions: Array[Partition] = leaders.zipWithIndex.map{ case ((partition, leader), index) =>
protected def getPartitions: Array[Partition] = leaders.toSeq.sortBy(_._1).zipWithIndex.map{ case ((partition, leader), index) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean that with this change partition is now always equal to index?
if so do we still need both?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants