项目采用ALS作为协同过滤算法,根据MongoDB中的用户评分表计算离线的用户商品推荐列表以及商品相似度矩阵。
通过ALS训练出来的Model来计算所有当前用户商品的推荐列表,主要思路如下:
1. userId和productId做笛卡尔积,产生(userId,productId)的元组
2. 通过模型预测(userId,productId)对应的评分。
3. 将预测结果通过预测分值进行排序。
4. 返回分值最大的K个商品,作为当前用户的推荐列表。
最后生成的数据结构如下:将数据保存到MongoDB的UserRecs表中
新建recommender的子项目OfflineRecommender,引入spark、scala、mongo和jblas的依赖:
<dependencies>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>jblas</artifactId>
<version>${jblas.version}</version>
</dependency>
<!-- Spark的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
</dependency>
<!-- 引入Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入MongoDB的驱动 -->
<!-- 用于代码方式连接MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用于Spark和MongoDB的对接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
同样经过前期的构建样例类、声明配置、创建SparkSession等步骤,可以加载数据开始计算模型了。
核心代码如下:
case class ProductRating(userId: Int, productId: Int, score: Double, timestamp: Int)
case class MongoConfig(uri:String, db:String)
// 标准推荐对象,productId,score
case class Recommendation(productId: Int, score:Double)
// 用户推荐列表
case class UserRecs(userId: Int, recs: Seq[Recommendation])// 商品相似度(商品推荐)
case class ProductRecs(productId: Int, recs: Seq[Recommendation])
object OfflineRecommmeder {
// 定义常量
val MONGODB_RATING_COLLECTION = "Rating"
// 推荐表的名称
val USER_RECS = "UserRecs"
val PRODUCT_RECS = "ProductRecs"
val USER_MAX_RECOMMENDATION = 20
def main(args: Array[String]): Unit = {
// 定义配置
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
// 创建spark session
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
implicit val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))
import spark.implicits._
//读取mongoDB中的业务数据
val ratingRDD = spark
.read
.option("uri",mongoConfig.uri)
.option("collection",MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.rdd
.map(rating=> (rating.userId, rating.productId, rating.score)).cache()
//用户的数据集 RDD[Int]
val userRDD = ratingRDD.map(_._1).distinct()
val prodcutRDD = ratingRDD.map(_._2).distinct()
//创建训练数据集
val trainData = ratingRDD.map(x => Rating(x._1,x._2,x._3))
// rank 是模型中隐语义因子的个数, iterations 是迭代的次数, lambda 是ALS的正则化参
val (rank,iterations,lambda) = (50, 5, 0.01)
//调用ALS算法训练隐语义模型
val model = ALS.train(trainData,rank,iterations,lambda)
//计算用户推荐矩阵
val userProducts = userRDD.cartesian(productRDD)
// model已训练好,把id传进去就可以得到预测评分列表RDD[Rating] (userId,productId,rating)
val preRatings = model.predict(userProducts)
val userRecs = preRatings
.filter(_.rating > 0)
.map(rating => (rating.user,(rating.product, rating.rating)))
.groupByKey()
.map{
case (userId,recs) => UserRecs(userId,recs.toList.sortWith(_._2 >
_._2).take(USER_MAX_RECOMMENDATION).map(x => Recommendation(x._1,x._2)))
}.toDF()
userRecs.write
.option("uri",mongoConfig.uri)
.option("collection",USER_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
//TODO:计算商品相似度矩阵
// 关闭spark
spark.stop()
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。