-
Notifications
You must be signed in to change notification settings - Fork 251
Add support to partitionBy
and Graphx PartitionStrategy
#221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
could you check the test failure? it might be related to python changes in travis |
|
Codecov Report
@@ Coverage Diff @@
## master #221 +/- ##
==========================================
+ Coverage 89.18% 89.58% +0.39%
==========================================
Files 20 20
Lines 740 768 +28
Branches 57 41 -16
==========================================
+ Hits 660 688 +28
Misses 80 80
Continue to review full report at Codecov.
|
Alright seems like it works now. Please help following up with reviews. Thanks! |
* Default to the length of edges partitions | ||
*/ | ||
def partitionBy(strategy: PartitionStrategy): GraphFrame = { | ||
partitionBy(edges.rdd.partitions.length, strategy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
edges.rdd.getNumPartition
?
} | ||
|
||
/** | ||
* Implements a watered down version of Graphx partitionBy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you add to the description on why this is "watered down"?
@@ -265,6 +266,85 @@ class GraphFrame private( | ||
edges.select(explode(array(SRC, DST)).as(ID)).groupBy(ID).agg(count("*").cast("int").as("degree")) | ||
} | ||
|
||
// ========================= Partition By ==================================== | ||
val PARTITION_ID: String = "partition_id" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might want to make this private
* A [[org.apache.spark.Partitioner]] that use the key of PairRDD as partition | ||
* id number. | ||
*/ | ||
class ExactAsKeyPartitioner(partitions: Int) extends Partitioner { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might need to make this private
* Implements a watered down version of Graphx partitionBy. | ||
* | ||
* @param numPartitions Number of partitions to be split by | ||
* @param strategy any case object of Graphx's PartitionStrategy trait |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps update the text to match http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@partitionBy(partitionStrategy:org.apache.spark.graphx.PartitionStrategy,numPartitions:Int):org.apache.spark.graphx.Graph[VD,ED]
Repartitions the edges in the graph according to partitionStrategy.
partitionStrategy
the partitioning strategy to use when partitioning the edges in the graph.
numPartitions
the number of edge partitions in the new graph.
|
||
/** | ||
* Another version of partitionBy without specifying the numPartitions params. | ||
* Default to the length of edges partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1) | ||
|
||
// partitionBy(EdgePartition2D) puts identical edges in the same partition | ||
assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(EdgePartition2D)).count === 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to add a test where count != 1? or count > 1?
val edgesWithPartitionIdColumns = Seq( | ||
Seq(col(SRC), col(DST)), | ||
unnestedAttrCols.map(c => col(ATTR + "." + c)), | ||
Seq(col(PARTITION_ID))).flatten |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to create a Seq
without having to flatten one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will use scala.collection.mutable.ListBuffer[Column] instead, then toSeq. My bad on this one.
Addressed the code review comments. Cheers. |
Hi, any progress in this new feature? |
+1 on progress. This would be really helpful for my use case -- i believe poor partitioning is what's skyrocketing read and write shuffles on my jobs |
I'm facing a terrible performance of |
Addressing enhancement request #86 . Only support Scala API for now.
Add
partitionBy
method to GraphFrame class (edges partitioning), which takes in an optionalnumPartitions
and any case object extendsGraphx.PartitionStrategy
trait.A new GraphFrame will be constructed based on original vertices and newly partitioned edges. Under the hood, a udf is created that uses
PartitionStrategy.<Strategy>.getPartition
method. A new column for for partitionId is created, and a custom low level rdd partitioner that shuffles data according to that partitionId. Unnest attribute columns and stuffs them back to the new dataframe.