scala day 40 (spark cosine similarity)
使用 spark 找出 cosine similarity 相關的人
LookLikePerson.scala
package ght.mi.ml
import com.datastax.spark.connector._
import org.apache.spark.sql.SparkSession
//ght.mi.ml.LookLikePerson
object LookLikePerson {
def main(args: Array[String]): Unit = {
val inputIdList = "hash:5172647,hash:48474168,hash:54263647,hash:61311833,hash:63397014"
val outputPersons = 10
val inputIdCount = inputIdList.split(",").size
val othercount = outputPersons % inputIdCount
val takeCount = (outputPersons / inputIdCount) + othercount
val spark = SparkSession.builder().master("local[*]").appName("testtest").config("spark.cassandra.connection.host", "127.0.0.1").getOrCreate()
import spark.implicits._
//get all persons
val comparePersons = spark.sparkContext.cassandraTable("miks1","twmperson")
.select("id","labels").map(row => (row.get[String]("id") , row.get[String]("labels")
.replaceAll("[{]" , "").replaceAll("[}]","")))
.toDF("id","labels").cache()
comparePersons.createOrReplaceTempView("comparePersons")
val whereIds = inputIdList.split(",").map(s => "\'" + s + "\'").mkString(",")
//val basePerson = spark.sql("select b.id,b.labels from inputIds a left join comparePersons b on a.baseid = b.id")
val selectSql = new StringBuffer()
selectSql.append("select a.id as baseid ,a.labels as baselabels from comparePersons a ")
selectSql.append(" where a.id in (" + whereIds + ") ")
//get query persons
val inputIds = spark.sql(selectSql.toString)
inputIds.createOrReplaceTempView("inputIds")
//all persons cross join query persons
val allPerson = spark.sql("select b.baseid,b.baselabels,a.id,a.labels from comparePersons a cross join inputIds b ").toDF().map {
case(allRow) => {
val pid = allRow.getAs[String]("id")
val bid = allRow.getAs[String]("baseid")
val alabels = allRow.getAs[String]("labels")
val blabels = allRow.getAs[String]("baselabels")
val alVector = LabelVectorUtil().labelToVector(alabels)
val blVector = LabelVectorUtil().labelToVector(blabels)
val csvalue = LabelVectorUtil().cosineSimilarity(alVector.toArray , blVector.toArray)
(bid,(pid , csvalue))
}
}.rdd.groupByKey.map(personInfos => {
val dataAry = personInfos._2
dataAry.toList.sortWith(_._2 > _._2).take(takeCount) // 11,8,7,4,3,2,1
}).take(outputPersons).flatMap(l => l)
//allPerson.map(_._1).foreach(println(_))
allPerson.foreach(println(_))
}
}
LabelVectorUtil.scala
package ght.mi.ml
import org.apache.spark.mllib.linalg.{DenseVector, Matrices, Matrix, Vector, Vectors}
class LabelVectorUtil extends Serializable {
def labelToVector(labelStr:String): Vector = {
val linfo = labelStr.split(",").sortWith(_.split(":")(0).toInt < _.split(":")(0).toInt)
val lindexs = linfo.map(_.split(":")(0).toInt).toArray
val lvalues = linfo.map(_.split(":")(1).toDouble).toArray
Vectors.sparse(254, lindexs, lvalues)
}
def labelToVectorValue(labelStr:String): Double = {
//println("labelStr ---> " + labelStr)
val linfo = labelStr.split(",").sortWith(_.split(":")(0).toInt < _.split(":")(0).toInt)
val lindexs = linfo.map(_.split(":")(0).toInt).toArray
val lvalues = linfo.map(_.split(":")(1).toDouble).toArray
Vectors.sparse(254, lindexs, lvalues).toArray.sum
}
def parseLabelStr(labelStr:String):Matrix = {
val linfo = labelStr.split(",").sortWith(_.split(":")(0).toInt < _.split(":")(0).toInt)
val lindexs = linfo.map(_.split(":")(0).toInt).toArray
val lvalues = linfo.map(_.split(":")(1).toDouble).toArray
Matrices.sparse(254, 1, Array(0, lvalues.length), lindexs, lvalues)
}
def convertToMatrix(labelStr:String):Matrix = {
val linfo = labelStr.split(",").sortWith(_.split(":")(0).toInt < _.split(":")(0).toInt)
val lindexs = linfo.map(_.split(":")(0).toInt).toArray
val lvalues = linfo.map(_.split(":")(1).toDouble).toArray
Matrices.sparse(254, 1, Array(0, lvalues.length), lindexs, lvalues)
}
def convertToMatrix(baryLength:Int,paryLength:Int,lindexs:Array[Int],lvalues:Array[Double]):Matrix = {
Matrices.sparse(254, 2, Array(0, baryLength , (baryLength + paryLength)), lindexs, lvalues)
}
def matrixToRDD(m: Matrix): Seq[Vector] = {
val columns = m.toArray.grouped(m.numRows)
val rows = columns.toSeq.transpose // Skip this if you want a column-major RDD.
val vectors = rows.map(row => new DenseVector(row.toArray))
vectors
}
def cosineSimilarity(x: Array[Double], y: Array[Double]): Double = {
require(x.size == y.size)
genDot(x, y)/(magnitude(x) * magnitude(y))
}
def genDot(x: Array[Double], y: Array[Double]): Double = {
(for((a, b) <- x.zip(y)) yield a * b).sum
}
def magnitude(x: Array[Double]): Double = {
math.sqrt(x.map(i => i*i).sum)
}
}
object LabelVectorUtil {
def apply(): LabelVectorUtil = new LabelVectorUtil()
}
執行結果 :
(hash:63397014,0.9999999999999998)
(hash:17911189,0.9460301837833882)
(hash:5172647,1.0000000000000002)
(hash:49371337,1.0000000000000002)
(hash:61311833,0.9999999999999998)
(hash:62352846,0.8238994537074561)
(hash:48474168,1.0)
(hash:57796913,0.9611306812888629)
(hash:54263647,1.0)
(hash:17385608,0.9676812174493781)
Spark MinHashLSH :
test("labelMinHashLSHTest") {
val spark = SparkSession.builder().master("local[*]").appName("testtest").config("spark.cassandra.connection.host", "127.0.0.1").getOrCreate()
import spark.implicits._
val comparePersons = spark.sparkContext.cassandraTable("miks1","twmperson")
.select("id","labels").map(row => (row.get[String]("id") , row.get[String]("labels")
.replaceAll("[{]" , "").replaceAll("[}]","")))
.toDF("id","labels").cache()
comparePersons.createOrReplaceTempView("comparePersons")
//val basePerson = spark.sql("select b.id,b.labels from inputIds a left join comparePersons b on a.baseid = b.id")
val selectSql = new StringBuffer()
selectSql.append("select a.id as baseid ,a.labels as baselabels from comparePersons a ")
selectSql.append(" where a.id in ('hash:5172647' , 'hash:48474168' , 'hash:54263647' , 'hash:61311833' , 'hash:63397014') ")
val inputIds = spark.sql(selectSql.toString)
inputIds.createOrReplaceTempView("inputIds")
//inputIds.foreach(println(_))
val dfA = comparePersons.map(row => {
val personId = row.getAs[String]("id")
val labels = row.getAs[String]("labels")
(personId , LabelVectorUtil().labelToSparsemlVector(labels))
}).toDF("id" , "dflabel")
val dfB = inputIds.map(row => {
val personId = row.getAs[String]("baseid")
val labels = row.getAs[String]("baselabels")
(personId , LabelVectorUtil().labelToSparsemlVector(labels))
}).toDF("id" , "dflabel")
val mh = new MinHashLSH()
.setNumHashTables(150)
.setInputCol("dflabel")
.setOutputCol("hashes")
val model = mh.fit(dfA)
model.transform(dfA).show()
val allPerson = model.approxSimilarityJoin(dfA, dfB, 0.9, "JaccardDistance")
.select(col("datasetA.id").alias("ida"),
col("datasetB.id").alias("idb"),
col("JaccardDistance")).map(row => {
val ida = row.getAs[String]("ida")
val idb = row.getAs[String]("idb")
val jds = row.getAs[Double]("JaccardDistance")
(idb,(ida,jds))
}).rdd.groupByKey.map(personInfos => {
val dataAry = personInfos._2
dataAry.toList.sortWith(_._2 > _._2).take(3) // 11,8,7,4,3,2,1
})
allPerson.flatMap(l => l).foreach(println(_))
}