Last active
August 29, 2015 14:10
-
-
Save ankurdave/cb89391101e4e87497ae to your computer and use it in GitHub Desktop.
Interface between GraphX and SampleClean
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.graphx._ | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.sql.catalyst.expressions.Row | |
import scala.reflect.ClassTag | |
// Take edges and build a graph | |
def a(vertices: RDD[(Long, Row)], edges: RDD[(Long, Long)]): Graph[Row, Unit] = | |
Graph(vertices, edges.map(pair => Edge(pair._1, pair._2, Unit))) | |
// Run connected components on the graph | |
def b(graph: Graph[Row, Unit], compare: (Row, Row) => Int): RDD[(Long, Row)] = | |
CustomConnectedComponents.run(graph, compare) | |
// Add edges to the graph | |
// newEdges must refer to existing vertices | |
def c(graph: Graph[Row, Unit], newEdges: RDD[(Long, Long)]): Graph[Row, Unit] = { | |
// TODO: Incrementally update the routing tables | |
Graph(graph.vertices, graph.edges.union(newEdges.map(pair => Edge(pair._1, pair._2, Unit)))) | |
} | |
/** Run connected components with a custom component comparison function. */ | |
object CustomConnectedComponents { | |
def run[VD: ClassTag, ED: ClassTag]( | |
graph: Graph[VD, ED], compare: (VD, VD) => Int): VertexRDD[VD] = { | |
def sendMessage(edge: EdgeTriplet[VD, ED]) = { | |
if (compare(edge.srcAttr, edge.dstAttr) < 0) { | |
Iterator((edge.dstId, edge.srcAttr)) | |
} else if (compare(edge.srcAttr, edge.dstAttr) > 0) { | |
Iterator((edge.srcId, edge.dstAttr)) | |
} else { | |
Iterator.empty | |
} | |
} | |
def min(a: VD, b: VD): VD = if (compare(a, b) < 0) a else b | |
Pregel[VD, ED, VD](graph, null.asInstanceOf[VD], activeDirection = EdgeDirection.Either)( | |
vprog = (id, attr, msg) => if (msg == null) attr else min(attr, msg), | |
sendMsg = sendMessage, | |
mergeMsg = min).vertices | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment