Skip to content

Instantly share code, notes, and snippets.

@y-shimizu
Created December 2, 2014 08:54
Show Gist options
  • Save y-shimizu/35867389c647bdbcf5a6 to your computer and use it in GitHub Desktop.
Save y-shimizu/35867389c647bdbcf5a6 to your computer and use it in GitHub Desktop.

2014-12-02 Spark Code Reading "mllib"

パッケージ[mllib]

  • pythonAPI[api.python]

  • クラスタリング[clustering]

    • K平均法[KMeans]

    KMeans.trainで計算する. trainの計算結果はKMeansModelとして戻る

    val k = 3 // クラスタの個数
    val maxItreations = 100 // K-means のイテレーション最大回数
    val clusters = KMeans.train(rddData, k, maxItreations)
    
    各RDDデータがどのクラスタに属するのかを調べる
    rddData.foreach(t => println(clusters.predict(t._2)))
    

    clusterCenters(クラスタの中心ベクトル)

     /**
     * A clustering model for K-means. Each point belongs to the cluster with the closest center.
     */
     class KMeansModel (val clusterCenters: Array[Vector]) extends Serializable {
    
     /** Total number of clusters. */
     def k: Int = clusterCenters.length
    

    trainの実装

    /**
     * Trains a k-means model using specified parameters and the default values for unspecified.
     */
    def train(
        data: RDD[Vector],
        k: Int,
        maxIterations: Int): KMeansModel = {
      train(data, k, maxIterations, 1, K_MEANS_PARALLEL)
    }

    runの内部実装 クラスタ内の各点のnormalizeはbreezeのnormを使って行っている

     // Compute squared norms and cache them.
      val norms = data.map(v => breezeNorm(v.toBreeze, 2.0))
      norms.persist()
      val breezeData = data.map(_.toBreeze).zip(norms).map { case (v, norm) =>
        new BreezeVectorWithNorm(v, norm)
      }
      val model = runBreeze(breezeData)

    KMeansModelのpredict

    /** Returns the cluster index that a given point belongs to. */
    def predict(point: Vector): Int = {
      KMeans.findClosest(clusterCentersWithNorm, new BreezeVectorWithNorm(point))._1
    }
  • 評価[evaluation]

  • feature

  • 線形代数[linalg]

    • 行列[Matrices, DenseMatrix, SparseMatrix]
    • ベクトル[Vector, DenseVector, SparseVector]

    breezeのDenseMatrixと外側の動きは似ているけど内部実装がかなり違う one, eye, zeroとかおなじみの関数はMatricesが持つ

    // mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L100
    class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) extends Matrix {
    
    def zeros(numRows: Int, numCols: Int): Matrix =
      new DenseMatrix(numRows, numCols, new Array[Double](numRows * numCols))
    
    def ones(numRows: Int, numCols: Int): Matrix =
      new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(1.0))
    
    def eye(n: Int): Matrix = {

    行列を新規に作るときはdenseで作成する. 行列の値はrows: Int, columns: Int, values: Array[Double]として持つ.

    // mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L214
    def dense(numRows: Int, numCols: Int, values: Array[Double]): Matrix = {
      new DenseMatrix(numRows, numCols, values)
    }

    BLASの関数を内部的によんで計算している. 計算関数はBLASに集約 (DenseMatrixは持っていない)

    def multiply(y: DenseVector): DenseVector = {
      val output = new DenseVector(new Array[Double](numRows))
      BLAS.gemv(1.0, this, y, 0.0, output)
      output
    }
    /**
     * C := alpha * A * B + beta * C
     * For `DenseMatrix` A.
     */
    private def gemm(
    
    /**
     * y := alpha * A * x + beta * y
     * @param trans whether to use the transpose of matrix A (true), or A itself (false).
     * @param alpha a scalar to scale the multiplication A * x.
     * @param A the matrix A that will be left multiplied to x. Size of m x n.
     * @param x the vector x that will be left multiplied by A. Size of n x 1.
     * @param beta a scalar that can be used to scale vector y.
     * @param y the resulting vector y. Size of m x 1.
     */
    def gemv(
    // * {{{
    // *   1.0 0.0 4.0
    // *   0.0 3.0 5.0
    // *   2.0 0.0 6.0
    // * }}}
    // * is stored as `values: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]`,
    // * `rowIndices=[0, 2, 1, 0, 1, 2]`, `colPointers=[0, 2, 3, 6]`.
    class SparseMatrix(
      val numRows: Int,
      val numCols: Int,
      val colPtrs: Array[Int],
      val rowIndices: Array[Int],
      val values: Array[Double]) extends Matrix {
    
    
  • 分類 [classification]

    • ロジスティック回帰[LogisticRegression] trainから計算スタートする
    class LogisticRegressionModel (
      override val weights: Vector,
      override val intercept: Double)
    extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {
    
    private var threshold: Option[Double] = Some(0.5)
    
    
    /**
    * Top-level methods for calling Logistic Regression using Stochastic Gradient Descent.
    * NOTE: Labels used in Logistic Regression should be {0, 1}
    */
    object LogisticRegressionWithSGD {
    
    def train(
        input: RDD[LabeledPoint],
        numIterations: Int,
        stepSize: Double,
        miniBatchFraction: Double,
        initialWeights: Vector): LogisticRegressionModel = {
      new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
        .run(input, initialWeights)
    }
    
  • 最適化[optimization]

  • ランダム[random]

  • RDD[rdd]

  • [recommendation]

  • 回帰[regression]

  • [stat]

  • [tree]

パッケージ[ml]

  • 分類[classification]
    • ロジスティック回帰[LogisticRegression]
  • 評価[evaluation]
  • feature
  • param
  • tuning
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment