使用 spark 找出 cosine similarity 相關的人

LookLikePerson.scala

package ght.mi.ml

import com.datastax.spark.connector._
import org.apache.spark.sql.SparkSession

//ght.mi.ml.LookLikePerson
object LookLikePerson {

  def main(args: Array[String]): Unit = {

    val inputIdList = "hash:5172647,hash:48474168,hash:54263647,hash:61311833,hash:63397014"
    val outputPersons = 10

    val inputIdCount = inputIdList.split(",").size
    val othercount = outputPersons % inputIdCount
    val takeCount =  (outputPersons / inputIdCount) + othercount


    val spark = SparkSession.builder().master("local[*]").appName("testtest").config("spark.cassandra.connection.host", "127.0.0.1").getOrCreate()
    import spark.implicits._

    //get all persons
    val comparePersons = spark.sparkContext.cassandraTable("miks1","twmperson")
      .select("id","labels").map(row => (row.get[String]("id") , row.get[String]("labels")
      .replaceAll("[{]" , "").replaceAll("[}]","")))
      .toDF("id","labels").cache()
    comparePersons.createOrReplaceTempView("comparePersons")

    val whereIds = inputIdList.split(",").map(s => "\'" + s + "\'").mkString(",")
    //val basePerson = spark.sql("select b.id,b.labels from inputIds a left join comparePersons b on a.baseid = b.id")
    val selectSql = new StringBuffer()
    selectSql.append("select a.id as baseid ,a.labels as baselabels from comparePersons a ")
    selectSql.append(" where a.id in (" + whereIds + ") ")

    //get query persons
    val inputIds = spark.sql(selectSql.toString)
    inputIds.createOrReplaceTempView("inputIds")

    //all persons cross join query persons
    val allPerson = spark.sql("select b.baseid,b.baselabels,a.id,a.labels from comparePersons a cross join inputIds b ").toDF().map {
      case(allRow) => {
        val pid = allRow.getAs[String]("id")
        val bid = allRow.getAs[String]("baseid")
        val alabels = allRow.getAs[String]("labels")
        val blabels = allRow.getAs[String]("baselabels")
        val alVector = LabelVectorUtil().labelToVector(alabels)
        val blVector = LabelVectorUtil().labelToVector(blabels)
        val csvalue = LabelVectorUtil().cosineSimilarity(alVector.toArray , blVector.toArray)
        (bid,(pid , csvalue))
      }
    }.rdd.groupByKey.map(personInfos => {
      val dataAry = personInfos._2
      dataAry.toList.sortWith(_._2 > _._2).take(takeCount) // 11,8,7,4,3,2,1
    }).take(outputPersons).flatMap(l => l)

    //allPerson.map(_._1).foreach(println(_))
    allPerson.foreach(println(_))
  }
}

LabelVectorUtil.scala

package ght.mi.ml

import org.apache.spark.mllib.linalg.{DenseVector, Matrices, Matrix, Vector, Vectors}

class LabelVectorUtil extends Serializable {

  def labelToVector(labelStr:String): Vector = {
    val linfo = labelStr.split(",").sortWith(_.split(":")(0).toInt < _.split(":")(0).toInt)
    val lindexs = linfo.map(_.split(":")(0).toInt).toArray
    val lvalues = linfo.map(_.split(":")(1).toDouble).toArray
    Vectors.sparse(254, lindexs, lvalues)
  }

  def labelToVectorValue(labelStr:String): Double = {
    //println("labelStr ---> " + labelStr)
    val linfo = labelStr.split(",").sortWith(_.split(":")(0).toInt < _.split(":")(0).toInt)
    val lindexs = linfo.map(_.split(":")(0).toInt).toArray
    val lvalues = linfo.map(_.split(":")(1).toDouble).toArray
    Vectors.sparse(254, lindexs, lvalues).toArray.sum
  }

  def parseLabelStr(labelStr:String):Matrix = {
    val linfo = labelStr.split(",").sortWith(_.split(":")(0).toInt < _.split(":")(0).toInt)
    val lindexs = linfo.map(_.split(":")(0).toInt).toArray
    val lvalues = linfo.map(_.split(":")(1).toDouble).toArray
    Matrices.sparse(254, 1, Array(0, lvalues.length), lindexs, lvalues)
  }

  def convertToMatrix(labelStr:String):Matrix = {
    val linfo = labelStr.split(",").sortWith(_.split(":")(0).toInt < _.split(":")(0).toInt)
    val lindexs = linfo.map(_.split(":")(0).toInt).toArray
    val lvalues = linfo.map(_.split(":")(1).toDouble).toArray
    Matrices.sparse(254, 1, Array(0, lvalues.length), lindexs, lvalues)
  }

  def convertToMatrix(baryLength:Int,paryLength:Int,lindexs:Array[Int],lvalues:Array[Double]):Matrix = {
    Matrices.sparse(254, 2, Array(0, baryLength , (baryLength + paryLength)), lindexs, lvalues)
  }

  def matrixToRDD(m: Matrix): Seq[Vector] = {
    val columns = m.toArray.grouped(m.numRows)
    val rows = columns.toSeq.transpose // Skip this if you want a column-major RDD.
    val vectors = rows.map(row => new DenseVector(row.toArray))
    vectors
  }

  def cosineSimilarity(x: Array[Double], y: Array[Double]): Double = {
    require(x.size == y.size)
    genDot(x, y)/(magnitude(x) * magnitude(y))
  }

  def genDot(x: Array[Double], y: Array[Double]): Double = {
    (for((a, b) <- x.zip(y)) yield a * b).sum
  }

  def magnitude(x: Array[Double]): Double = {
    math.sqrt(x.map(i => i*i).sum)
  }
}

object LabelVectorUtil {
  def apply(): LabelVectorUtil = new LabelVectorUtil()
}

執行結果 :

(hash:63397014,0.9999999999999998)
(hash:17911189,0.9460301837833882)
(hash:5172647,1.0000000000000002)
(hash:49371337,1.0000000000000002)
(hash:61311833,0.9999999999999998)
(hash:62352846,0.8238994537074561)
(hash:48474168,1.0)
(hash:57796913,0.9611306812888629)
(hash:54263647,1.0)
(hash:17385608,0.9676812174493781)

Spark MinHashLSH :

 test("labelMinHashLSHTest") {
    val spark = SparkSession.builder().master("local[*]").appName("testtest").config("spark.cassandra.connection.host", "127.0.0.1").getOrCreate()
    import spark.implicits._

    val comparePersons = spark.sparkContext.cassandraTable("miks1","twmperson")
      .select("id","labels").map(row => (row.get[String]("id") , row.get[String]("labels")
      .replaceAll("[{]" , "").replaceAll("[}]","")))
      .toDF("id","labels").cache()
    comparePersons.createOrReplaceTempView("comparePersons")

    //val basePerson = spark.sql("select b.id,b.labels from inputIds a left join comparePersons b on a.baseid = b.id")
    val selectSql = new StringBuffer()
    selectSql.append("select a.id as baseid ,a.labels as baselabels from comparePersons a ")
    selectSql.append(" where a.id in ('hash:5172647' , 'hash:48474168' , 'hash:54263647' , 'hash:61311833' , 'hash:63397014') ")

    val inputIds = spark.sql(selectSql.toString)
    inputIds.createOrReplaceTempView("inputIds")

    //inputIds.foreach(println(_))
    val dfA = comparePersons.map(row => {
      val personId = row.getAs[String]("id")
      val labels = row.getAs[String]("labels")
      (personId , LabelVectorUtil().labelToSparsemlVector(labels))
    }).toDF("id" , "dflabel")

    val dfB = inputIds.map(row => {
      val personId = row.getAs[String]("baseid")
      val labels = row.getAs[String]("baselabels")
      (personId , LabelVectorUtil().labelToSparsemlVector(labels))
    }).toDF("id" , "dflabel")

    val mh = new MinHashLSH()
      .setNumHashTables(150)
      .setInputCol("dflabel")
      .setOutputCol("hashes")

    val model = mh.fit(dfA)
    model.transform(dfA).show()

    val allPerson = model.approxSimilarityJoin(dfA, dfB, 0.9, "JaccardDistance")
      .select(col("datasetA.id").alias("ida"),
        col("datasetB.id").alias("idb"),
        col("JaccardDistance")).map(row => {
      val ida = row.getAs[String]("ida")
      val idb = row.getAs[String]("idb")
      val jds = row.getAs[Double]("JaccardDistance")
      (idb,(ida,jds))
    }).rdd.groupByKey.map(personInfos => {
      val dataAry = personInfos._2
      dataAry.toList.sortWith(_._2 > _._2).take(3) // 11,8,7,4,3,2,1
    })

    allPerson.flatMap(l => l).foreach(println(_))
  }