博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Streaming实时流之实现黑名单(三)
阅读量:3961 次
发布时间:2019-05-24

本文共 11171 字,大约阅读时间需要 37 分钟。

Spark Streaming实时流之实现黑名单之干货篇,在这篇中会详细介绍两种实现黑名单的思路

首先,让我们来看一下需求

1、实现实时动态黑名单机制:将每天对某个广告点击超过N次的用户拉黑

2、基于黑名单的非法广告点击流量过滤机制
3、每天各省各城市各广告的点击流量实时统计
4、统计每天各省Top3热门广告

注:这里为了方便做测试N=5,可根据自己业务需求改动

注:整体代码在结尾

在上篇中,我们已经能使用Kafka生产者生产日志信息,sparkstreaming application订阅接收日志并进行处理汇总了,接下来我们来实现黑名单的机制

这时候就需要用到我们上篇使用的updateStateByKey算子后DStream的结果集了

使用Filter可过滤出每天对某个广告点击超过5次的黑名单用户

val totalDS = reduceDS.updateStateByKey((nowValue:Seq[Int],bfValue:Option[Int])=>{      val now = nowValue.sum      val bf = bfValue.getOrElse(0)      Option(now+bf)    }).filter(_._2>5)

然后将黑名单用户写入MySql

totalDS.foreachRDD(rdd=>{      rdd.foreachPartition(itor=>{        val url = "jdbc:mysql://hadoop:3306/day03"        val con = DriverManager.getConnection(url,"root","1234")        val sql = "insert into streaming values(?,?,?,?,?)"        val driver = con.prepareStatement(sql)        itor.foreach(x=>{          val xs = x._1.split(",")          driver.setString(1,xs(0))          driver.setString(2,xs(1))          driver.setString(3,xs(2))          driver.setString(4,xs(3))          driver.setString(5,xs(4))          driver.addBatch()        })        driver.executeBatch()        driver.close()        con.close()      })    })

 从MySql中读取黑名单(动态加载黑名单)并基于黑名单进行过滤

//动态加载MySQL中的黑名单生成RDD    val sparkcontext = new SQLContext(sc)    val url = "jdbc:mysql://hadoop:3306/day03"    val properties = new Properties()    properties.setProperty("user","root")    properties.setProperty("password","1234")    //取出黑名单用户ID,进行map操作是为了将其变成元组,后面使用subtract操作    val mysql = sparkcontext.read.jdbc(url,"streaming",properties).rdd.map(x=>{      (x.getString(3),"1")    })        //mapDS是元数据,也是转化为(userId,line)的格式为了后面使用subtract操作    val userIDkeyDS = mapDs.transform(x=>x.map(line=>{      val arr = line.split(" ")      val userId = arr(3)      (userId,line)    }))    //过滤掉batch RDD中的黑名单用户的广告点击行为    //通过subtract的操作过滤掉元数据中,userId在mysql中的数据    val filterDS = userIDkeyDS.transform(rdd=>{      rdd.subtract(mysql)    })

subtract的源码注释:Return an RDD with the elements from `this` that are not in `other`

返回一个RDD,其中包含this中不在other中的元素

截止到目前为止,我们已经完成了前三步

统计每天各省Top3热门广告,这里提供两种思路来完成

其一,将黑名单过滤后的数据直接存入MySql,新建Application对MySql中的数据进行处理,即可完成日处理

其二,直接对DStream进行操作,通过transform操作,在内部对RDD进行格式转换,以及创建临时视图,最后将视图输出

实现方法一(代码)

//将每天各省各城市的广告点击量更新到mysql    updateDS.foreachRDD(rdd=>{      rdd.foreachPartition(itor=>{        val url = "jdbc:mysql://hadoop:3306/day03"        val con = DriverManager.getConnection(url,"root","1234")        val sql = "insert into result values(?,?,?,?,?,?)"        val driver = con.prepareStatement(sql)        itor.foreach(x=>{          val xs = x._1.split(",")          driver.setString(1,xs(0))          driver.setString(2,xs(1))          driver.setString(3,xs(2))          driver.setString(4,xs(3))          driver.setString(5,xs(4))          driver.setInt(6,x._2)          driver.addBatch()        })        driver.executeBatch()        driver.close()        con.close()      })    })

 新建Application,对MySql中数据进行根据省广告ID分组,使用窗口函数根据省分区,点击量倒排取前三条处理,展示结果

import java.util.Propertiesimport org.apache.log4j.{Level, Logger}import org.apache.spark.sql.SQLContextimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.streaming.{Seconds, StreamingContext}object test {  def main(args: Array[String]): Unit = {    Logger.getLogger("org").setLevel(Level.WARN)    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkReceiver")    val sc= new SparkContext(sparkConf)    val ssc = new StreamingContext(sc,Seconds(2))       ssc.checkpoint("E:\\checkpoint")    val sparkcontext = new SQLContext(sc)    val url = "jdbc:mysql://hadoop:3306/day03"    val properties = new Properties()    properties.setProperty("user","root")    properties.setProperty("password","1234")            val mysql2 = sparkcontext.read.jdbc(url,"result",properties)    mysql2.createOrReplaceTempView("mysql2")    val view = sparkcontext.sql( "select province,adID,count(*) click_count from mysql2 group by province,adID")    view.createOrReplaceTempView("end")    //结果展示    val result = "select * from (select *,row_number() over(partition by province order by click_count desc) rowNumber from end) s where rowNumber<=3 "    sparkcontext.sql(result).show()  }}

实现方法二(代码)

通过transform操作,在内部对RDD进行格式转换,创建临时视图,最后将视图输出

val perProvinceDs = updateDS.transform(rdd=>{      val topN = rdd.map(x=>{        val line = x._1.split(",")        val province = line(0)        val city = line(1)        val day = line(2)        val userID = line(3)        val adID = line(4)        val click_count = x._2        Row(province,city,day,userID,adID,click_count)      })      val schema = StructType(List(        StructField("province",StringType),        StructField("city",StringType),        StructField("day",StringType),        StructField("userID",StringType),        StructField("adID",StringType),        StructField("click_count",IntegerType)      ))      val joinedDF = sparkcontext.createDataFrame(topN,schema)      joinedDF.createOrReplaceTempView("tmp_day_adid")      val sqlDf=sparkcontext.sql("select * from (select * ,row_number() over(partition by province sort by click_count desc) row_number from tmp_day_adid) t where  t.row_number<=3")      sqlDf.rdd    })    perProvinceDs.print()

 最后,整体代码

import java.sql.DriverManagerimport java.text.SimpleDateFormatimport java.util.{Date, Locale, Properties}import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.log4j.{Level, Logger}import org.apache.parquet.format.IntTypeimport org.apache.spark.sql.types._import org.apache.spark.sql.{Row, SQLContext, SparkSession}import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeimport org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentimport org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkReceiver1{  def main(args: Array[String]): Unit = {    Logger.getLogger("org").setLevel(Level.WARN)    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkReceiver")    val sc= new SparkContext(sparkConf)    val ssc = new StreamingContext(sc,Seconds(2))    val session = SparkSession.builder().master("local[2]").getOrCreate()    ssc.checkpoint("E:\\checkpoint")    //主要参数一    val kafkaParams = Map[String, Object](      "bootstrap.servers" -> "hadoop:9092",      "key.deserializer" -> classOf[StringDeserializer],      "value.deserializer" -> classOf[StringDeserializer],      "group.id" -> "test"    )    //主要参数二    val topics = List("day3")    val stream = KafkaUtils.createDirectStream[String,String](      ssc,      PreferConsistent,      Subscribe[String, String](topics, kafkaParams)    )    val mapDs= stream.map(_.value())    //  mapDs.print()    //transform以rdd为单位进行修改离散流    val userDS = mapDs.transform(x=>x.map(line=>{      val arr = line.split(" ")      val day = new SimpleDateFormat("yyyy-MM-dd").format(new Date(arr(0).toLong)) //时间戳      val province = arr(1)      val city = arr(2)      val userID = arr(3)      val adID = arr(4)      (province+","+city+","+day+","+userID+","+adID,1)    }))    val reduceDS = userDS.reduceByKey(_+_)    //使用filter过滤出每天对某个广告点击超过100次的黑名单用户,并写入MySQL中    val totalDS = reduceDS.updateStateByKey((nowValue:Seq[Int],bfValue:Option[Int])=>{      val now = nowValue.sum      val bf = bfValue.getOrElse(0)      Option(now+bf)    }).filter(_._2>5)    totalDS.print()    totalDS.foreachRDD(rdd=>{      rdd.foreachPartition(itor=>{        val url = "jdbc:mysql://hadoop:3306/day03"        val con = DriverManager.getConnection(url,"root","1234")        val sql = "insert into streaming values(?,?,?,?,?)"        val driver = con.prepareStatement(sql)        itor.foreach(x=>{          val xs = x._1.split(",")          driver.setString(1,xs(0))          driver.setString(2,xs(1))          driver.setString(3,xs(2))          driver.setString(4,xs(3))          driver.setString(5,xs(4))          driver.addBatch()        })        driver.executeBatch()        driver.close()        con.close()      })    })    //动态加载MySQL中的黑名单生成RDD    val sparkcontext = new SQLContext(sc)    val url = "jdbc:mysql://hadoop:3306/day03"    val properties = new Properties()    properties.setProperty("user","root")    properties.setProperty("password","1234")    val mysql = sparkcontext.read.jdbc(url,"streaming",properties).rdd.map(x=>{      (x.getString(3),"1")    })    val userIDkeyDS = mapDs.transform(x=>x.map(line=>{      val arr = line.split(" ")      val userId = arr(3)      (userId,line)    }))    //过滤掉batch RDD中的黑名单用户的广告点击行为    val filterDS = userIDkeyDS.transform(rdd=>{      rdd.subtract(mysql)    })    val dayDs = filterDS.transform(rdd=>rdd.map(line=>{      val arr = line._2.split(" ")      val day = new SimpleDateFormat("yyyy-MM-dd").format(new Date(arr(0).toLong)) //时间戳      val province = arr(1)      val city = arr(2)      val userID = arr(3)      val adID = arr(4)      (province+","+city+","+day+","+userID+","+adID,1)    }))    val reduceDS2 = dayDs.reduceByKey(_+_)    //updateStateByKey操作,实时计算每天各省各城市各广告的点击量    val updateDS = reduceDS2.updateStateByKey((nowValue:Seq[Int],bfValue:Option[Int])=>{      val now = nowValue.sum      val bf = bfValue.getOrElse(0)      Option(now+bf)    })    updateDS.print()    val perProvinceDs = updateDS.transform(rdd=>{      val topN = rdd.map(x=>{        val line = x._1.split(",")        val province = line(0)        val city = line(1)        val day = line(2)        val userID = line(3)        val adID = line(4)        val click_count = x._2        Row(province,city,day,userID,adID,click_count)      })      val schema = StructType(List(        StructField("province",StringType),        StructField("city",StringType),        StructField("day",StringType),        StructField("userID",StringType),        StructField("adID",StringType),        StructField("click_count",IntegerType)      ))      //val sqlContext = new SQLContext(rdd.context)      val joinedDF = sparkcontext.createDataFrame(topN,schema)      joinedDF.createOrReplaceTempView("tmp_day_adid")      val sqlDf=sparkcontext.sql("select * from (select * ,row_number() over(partition by province sort by click_count desc) row_number from tmp_day_adid) t where  t.row_number<=3")      sqlDf.rdd    })    perProvinceDs.print()    /*//将每天各省各城市的广告点击量更新到mysql    updateDS.foreachRDD(rdd=>{      rdd.foreachPartition(itor=>{        val url = "jdbc:mysql://hadoop:3306/day03"        val con = DriverManager.getConnection(url,"root","1234")        val sql = "insert into result values(?,?,?,?,?,?)"        val driver = con.prepareStatement(sql)        itor.foreach(x=>{          val xs = x._1.split(",")          driver.setString(1,xs(0))          driver.setString(2,xs(1))          driver.setString(3,xs(2))          driver.setString(4,xs(3))          driver.setString(5,xs(4))          driver.setInt(6,x._2)          driver.addBatch()        })        driver.executeBatch()        driver.close()        con.close()      })    })    */    ssc.start()    ssc.awaitTermination()  }}

方式一的新建Application代码上面已贴过,就不再复贴

到此,我们就完成了整个黑名单的四个需求,当然我们的结果应该保存在MySql中,导入过程不再赘述

✿✿ヽ(°▽°)ノ✿✿✿ヽ(°▽°)ノ✿✿✿ヽ(°▽°)ノ✿✿✿ヽ(°▽°)ノ✿(完结,撒花)

转载地址:http://maazi.baihongyu.com/

你可能感兴趣的文章
matplot pip安装
查看>>
序列S的所有可能情况
查看>>
在Linux上用pip安装scipy
查看>>
随机salt二次加密及hash加密漫谈
查看>>
linux 技巧:使用 screen 管理你的远程会话
查看>>
同时装了Python3和Python2,怎么用pip?
查看>>
linux tar 解压缩zip文件报错的解决
查看>>
vim,ctag和Taglist
查看>>
Ubuntu的apt命令详解
查看>>
Ubuntu Server 设置sshd
查看>>
sort,uniq命令的使用。
查看>>
linux下md5加密(使用openssl库C实现)
查看>>
openssl、MD5的linux安装方法
查看>>
DevC++ 工程没有调试信息的解决办法
查看>>
http消息长度的确定
查看>>
手机和电脑如何连接蓝牙
查看>>
HTTP协议参数
查看>>
wireshark检索命令
查看>>
五人分鱼问题(附答案)
查看>>
linux查看文件有多少行
查看>>