Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Add support to partitionBy and Graphx PartitionStrategy #221

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
Loading
from

Conversation

saikocat
Copy link

@saikocat saikocat commented Jul 16, 2017

Addressing enhancement request #86 . Only support Scala API for now.

Add partitionBy method to GraphFrame class (edges partitioning), which takes in an optional numPartitions and any case object extends Graphx.PartitionStrategy trait.

A new GraphFrame will be constructed based on original vertices and newly partitioned edges. Under the hood, a udf is created that uses PartitionStrategy.<Strategy>.getPartition method. A new column for for partitionId is created, and a custom low level rdd partitioner that shuffles data according to that partitionId. Unnest attribute columns and stuffs them back to the new dataframe.

@felixcheung
Copy link
Member

could you check the test failure? it might be related to python changes in travis

@saikocat
Copy link
Author

Exception: Java gateway process exited before sending the driver its port number - was the error when the python test ran. Hmph, any guidance how to proceed or how to trigger the rebuild in CI (without pushing a new commit) will be much appreciated! Otherwise I will do a dummy commit to trigger it.

@codecov-io
Copy link

codecov-io commented Sep 18, 2017

Codecov Report

Merging #221 into master will increase coverage by 0.39%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #221      +/-   ##
==========================================
+ Coverage   89.18%   89.58%   +0.39%     
==========================================
  Files          20       20              
  Lines         740      768      +28     
  Branches       57       41      -16     
==========================================
+ Hits          660      688      +28     
  Misses         80       80
Impacted Files Coverage Δ
src/main/scala/org/graphframes/GraphFrame.scala 88.2% <100%> (+1.64%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ab857ed...361ce96. Read the comment docs.

@saikocat
Copy link
Author

Alright seems like it works now. Please help following up with reviews. Thanks!

* Default to the length of edges partitions
*/
def partitionBy(strategy: PartitionStrategy): GraphFrame = {
partitionBy(edges.rdd.partitions.length, strategy)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

edges.rdd.getNumPartition?

}

/**
* Implements a watered down version of Graphx partitionBy.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add to the description on why this is "watered down"?

@@ -265,6 +266,85 @@ class GraphFrame private(
edges.select(explode(array(SRC, DST)).as(ID)).groupBy(ID).agg(count("*").cast("int").as("degree"))
}

// ========================= Partition By ====================================
val PARTITION_ID: String = "partition_id"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might want to make this private

* A [[org.apache.spark.Partitioner]] that use the key of PairRDD as partition
* id number.
*/
class ExactAsKeyPartitioner(partitions: Int) extends Partitioner {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might need to make this private

* Implements a watered down version of Graphx partitionBy.
*
* @param numPartitions Number of partitions to be split by
* @param strategy any case object of Graphx's PartitionStrategy trait
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps update the text to match http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@partitionBy(partitionStrategy:org.apache.spark.graphx.PartitionStrategy,numPartitions:Int):org.apache.spark.graphx.Graph[VD,ED]

Repartitions the edges in the graph according to partitionStrategy.
partitionStrategy
the partitioning strategy to use when partitioning the edges in the graph.
numPartitions
the number of edge partitions in the new graph.


/**
* Another version of partitionBy without specifying the numPartitions params.
* Default to the length of edges partitions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1)

// partitionBy(EdgePartition2D) puts identical edges in the same partition
assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(EdgePartition2D)).count === 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to add a test where count != 1? or count > 1?

val edgesWithPartitionIdColumns = Seq(
Seq(col(SRC), col(DST)),
unnestedAttrCols.map(c => col(ATTR + "." + c)),
Seq(col(PARTITION_ID))).flatten
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to create a Seq without having to flatten one?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will use scala.collection.mutable.ListBuffer[Column] instead, then toSeq. My bad on this one.

@saikocat
Copy link
Author

Addressed the code review comments. Cheers.

@mafernandez-stratio
Copy link

Hi, any progress in this new feature?

@sumersao
Copy link

+1 on progress. This would be really helpful for my use case -- i believe poor partitioning is what's skyrocketing read and write shuffles on my jobs

@SemyonSinchenko
Copy link
Collaborator

I'm facing a terrible performance of LablePropagation and I have a feeling that partitioning can help here. @rjurney I think we should try to resolve conflicts and think about merging this one. What do you think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants
Morty Proxy This is a proxified and sanitized view of the page, visit original site.