本文共 11171 字,大约阅读时间需要 37 分钟。
Spark Streaming实时流之实现黑名单之干货篇,在这篇中会详细介绍两种实现黑名单的思路
首先,让我们来看一下需求
1、实现实时动态黑名单机制:将每天对某个广告点击超过N次的用户拉黑
2、基于黑名单的非法广告点击流量过滤机制 3、每天各省各城市各广告的点击流量实时统计 4、统计每天各省Top3热门广告注:这里为了方便做测试N=5,可根据自己业务需求改动
注:整体代码在结尾
在上篇中,我们已经能使用Kafka生产者生产日志信息,sparkstreaming application订阅接收日志并进行处理汇总了,接下来我们来实现黑名单的机制
这时候就需要用到我们上篇使用的updateStateByKey算子后DStream的结果集了
使用Filter可过滤出每天对某个广告点击超过5次的黑名单用户
val totalDS = reduceDS.updateStateByKey((nowValue:Seq[Int],bfValue:Option[Int])=>{ val now = nowValue.sum val bf = bfValue.getOrElse(0) Option(now+bf) }).filter(_._2>5)
然后将黑名单用户写入MySql
totalDS.foreachRDD(rdd=>{ rdd.foreachPartition(itor=>{ val url = "jdbc:mysql://hadoop:3306/day03" val con = DriverManager.getConnection(url,"root","1234") val sql = "insert into streaming values(?,?,?,?,?)" val driver = con.prepareStatement(sql) itor.foreach(x=>{ val xs = x._1.split(",") driver.setString(1,xs(0)) driver.setString(2,xs(1)) driver.setString(3,xs(2)) driver.setString(4,xs(3)) driver.setString(5,xs(4)) driver.addBatch() }) driver.executeBatch() driver.close() con.close() }) })
从MySql中读取黑名单(动态加载黑名单)并基于黑名单进行过滤
//动态加载MySQL中的黑名单生成RDD val sparkcontext = new SQLContext(sc) val url = "jdbc:mysql://hadoop:3306/day03" val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","1234") //取出黑名单用户ID,进行map操作是为了将其变成元组,后面使用subtract操作 val mysql = sparkcontext.read.jdbc(url,"streaming",properties).rdd.map(x=>{ (x.getString(3),"1") }) //mapDS是元数据,也是转化为(userId,line)的格式为了后面使用subtract操作 val userIDkeyDS = mapDs.transform(x=>x.map(line=>{ val arr = line.split(" ") val userId = arr(3) (userId,line) })) //过滤掉batch RDD中的黑名单用户的广告点击行为 //通过subtract的操作过滤掉元数据中,userId在mysql中的数据 val filterDS = userIDkeyDS.transform(rdd=>{ rdd.subtract(mysql) })
subtract的源码注释:Return an RDD with the elements from `this` that are not in `other`
返回一个RDD,其中包含this中不在other中的元素
截止到目前为止,我们已经完成了前三步
统计每天各省Top3热门广告,这里提供两种思路来完成
其一,将黑名单过滤后的数据直接存入MySql,新建Application对MySql中的数据进行处理,即可完成日处理
其二,直接对DStream进行操作,通过transform操作,在内部对RDD进行格式转换,以及创建临时视图,最后将视图输出
实现方法一(代码)
//将每天各省各城市的广告点击量更新到mysql updateDS.foreachRDD(rdd=>{ rdd.foreachPartition(itor=>{ val url = "jdbc:mysql://hadoop:3306/day03" val con = DriverManager.getConnection(url,"root","1234") val sql = "insert into result values(?,?,?,?,?,?)" val driver = con.prepareStatement(sql) itor.foreach(x=>{ val xs = x._1.split(",") driver.setString(1,xs(0)) driver.setString(2,xs(1)) driver.setString(3,xs(2)) driver.setString(4,xs(3)) driver.setString(5,xs(4)) driver.setInt(6,x._2) driver.addBatch() }) driver.executeBatch() driver.close() con.close() }) })
新建Application,对MySql中数据进行根据省广告ID分组,使用窗口函数根据省分区,点击量倒排取前三条处理,展示结果
import java.util.Propertiesimport org.apache.log4j.{Level, Logger}import org.apache.spark.sql.SQLContextimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.streaming.{Seconds, StreamingContext}object test { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkReceiver") val sc= new SparkContext(sparkConf) val ssc = new StreamingContext(sc,Seconds(2)) ssc.checkpoint("E:\\checkpoint") val sparkcontext = new SQLContext(sc) val url = "jdbc:mysql://hadoop:3306/day03" val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","1234") val mysql2 = sparkcontext.read.jdbc(url,"result",properties) mysql2.createOrReplaceTempView("mysql2") val view = sparkcontext.sql( "select province,adID,count(*) click_count from mysql2 group by province,adID") view.createOrReplaceTempView("end") //结果展示 val result = "select * from (select *,row_number() over(partition by province order by click_count desc) rowNumber from end) s where rowNumber<=3 " sparkcontext.sql(result).show() }}
实现方法二(代码)
通过transform操作,在内部对RDD进行格式转换,创建临时视图,最后将视图输出
val perProvinceDs = updateDS.transform(rdd=>{ val topN = rdd.map(x=>{ val line = x._1.split(",") val province = line(0) val city = line(1) val day = line(2) val userID = line(3) val adID = line(4) val click_count = x._2 Row(province,city,day,userID,adID,click_count) }) val schema = StructType(List( StructField("province",StringType), StructField("city",StringType), StructField("day",StringType), StructField("userID",StringType), StructField("adID",StringType), StructField("click_count",IntegerType) )) val joinedDF = sparkcontext.createDataFrame(topN,schema) joinedDF.createOrReplaceTempView("tmp_day_adid") val sqlDf=sparkcontext.sql("select * from (select * ,row_number() over(partition by province sort by click_count desc) row_number from tmp_day_adid) t where t.row_number<=3") sqlDf.rdd }) perProvinceDs.print()
最后,整体代码
import java.sql.DriverManagerimport java.text.SimpleDateFormatimport java.util.{Date, Locale, Properties}import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.log4j.{Level, Logger}import org.apache.parquet.format.IntTypeimport org.apache.spark.sql.types._import org.apache.spark.sql.{Row, SQLContext, SparkSession}import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeimport org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentimport org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkReceiver1{ def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkReceiver") val sc= new SparkContext(sparkConf) val ssc = new StreamingContext(sc,Seconds(2)) val session = SparkSession.builder().master("local[2]").getOrCreate() ssc.checkpoint("E:\\checkpoint") //主要参数一 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hadoop:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "test" ) //主要参数二 val topics = List("day3") val stream = KafkaUtils.createDirectStream[String,String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) val mapDs= stream.map(_.value()) // mapDs.print() //transform以rdd为单位进行修改离散流 val userDS = mapDs.transform(x=>x.map(line=>{ val arr = line.split(" ") val day = new SimpleDateFormat("yyyy-MM-dd").format(new Date(arr(0).toLong)) //时间戳 val province = arr(1) val city = arr(2) val userID = arr(3) val adID = arr(4) (province+","+city+","+day+","+userID+","+adID,1) })) val reduceDS = userDS.reduceByKey(_+_) //使用filter过滤出每天对某个广告点击超过100次的黑名单用户,并写入MySQL中 val totalDS = reduceDS.updateStateByKey((nowValue:Seq[Int],bfValue:Option[Int])=>{ val now = nowValue.sum val bf = bfValue.getOrElse(0) Option(now+bf) }).filter(_._2>5) totalDS.print() totalDS.foreachRDD(rdd=>{ rdd.foreachPartition(itor=>{ val url = "jdbc:mysql://hadoop:3306/day03" val con = DriverManager.getConnection(url,"root","1234") val sql = "insert into streaming values(?,?,?,?,?)" val driver = con.prepareStatement(sql) itor.foreach(x=>{ val xs = x._1.split(",") driver.setString(1,xs(0)) driver.setString(2,xs(1)) driver.setString(3,xs(2)) driver.setString(4,xs(3)) driver.setString(5,xs(4)) driver.addBatch() }) driver.executeBatch() driver.close() con.close() }) }) //动态加载MySQL中的黑名单生成RDD val sparkcontext = new SQLContext(sc) val url = "jdbc:mysql://hadoop:3306/day03" val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","1234") val mysql = sparkcontext.read.jdbc(url,"streaming",properties).rdd.map(x=>{ (x.getString(3),"1") }) val userIDkeyDS = mapDs.transform(x=>x.map(line=>{ val arr = line.split(" ") val userId = arr(3) (userId,line) })) //过滤掉batch RDD中的黑名单用户的广告点击行为 val filterDS = userIDkeyDS.transform(rdd=>{ rdd.subtract(mysql) }) val dayDs = filterDS.transform(rdd=>rdd.map(line=>{ val arr = line._2.split(" ") val day = new SimpleDateFormat("yyyy-MM-dd").format(new Date(arr(0).toLong)) //时间戳 val province = arr(1) val city = arr(2) val userID = arr(3) val adID = arr(4) (province+","+city+","+day+","+userID+","+adID,1) })) val reduceDS2 = dayDs.reduceByKey(_+_) //updateStateByKey操作,实时计算每天各省各城市各广告的点击量 val updateDS = reduceDS2.updateStateByKey((nowValue:Seq[Int],bfValue:Option[Int])=>{ val now = nowValue.sum val bf = bfValue.getOrElse(0) Option(now+bf) }) updateDS.print() val perProvinceDs = updateDS.transform(rdd=>{ val topN = rdd.map(x=>{ val line = x._1.split(",") val province = line(0) val city = line(1) val day = line(2) val userID = line(3) val adID = line(4) val click_count = x._2 Row(province,city,day,userID,adID,click_count) }) val schema = StructType(List( StructField("province",StringType), StructField("city",StringType), StructField("day",StringType), StructField("userID",StringType), StructField("adID",StringType), StructField("click_count",IntegerType) )) //val sqlContext = new SQLContext(rdd.context) val joinedDF = sparkcontext.createDataFrame(topN,schema) joinedDF.createOrReplaceTempView("tmp_day_adid") val sqlDf=sparkcontext.sql("select * from (select * ,row_number() over(partition by province sort by click_count desc) row_number from tmp_day_adid) t where t.row_number<=3") sqlDf.rdd }) perProvinceDs.print() /*//将每天各省各城市的广告点击量更新到mysql updateDS.foreachRDD(rdd=>{ rdd.foreachPartition(itor=>{ val url = "jdbc:mysql://hadoop:3306/day03" val con = DriverManager.getConnection(url,"root","1234") val sql = "insert into result values(?,?,?,?,?,?)" val driver = con.prepareStatement(sql) itor.foreach(x=>{ val xs = x._1.split(",") driver.setString(1,xs(0)) driver.setString(2,xs(1)) driver.setString(3,xs(2)) driver.setString(4,xs(3)) driver.setString(5,xs(4)) driver.setInt(6,x._2) driver.addBatch() }) driver.executeBatch() driver.close() con.close() }) }) */ ssc.start() ssc.awaitTermination() }}
方式一的新建Application代码上面已贴过,就不再复贴
到此,我们就完成了整个黑名单的四个需求,当然我们的结果应该保存在MySql中,导入过程不再赘述
✿✿ヽ(°▽°)ノ✿✿✿ヽ(°▽°)ノ✿✿✿ヽ(°▽°)ノ✿✿✿ヽ(°▽°)ノ✿(完结,撒花)
转载地址:http://maazi.baihongyu.com/