潍坊市论坛

首页 » 分类 » 定义 » spark学习04核心编程
TUhjnbcbe - 2021/7/30 16:49:00

1.RDD依赖关系----依赖血缘关系

相邻的两个RDD的关系称之为依赖关系。

多个连续的RDD的依赖关系,称之为血缘关系。

RDD不会保存数据。

RDD为了提供容错性,需要将RDD间的关系保存下来,一旦出现错误,可以根据血缘关系将数据源重新读取进行计算。

打印血缘关系toDebugString

valwords:RDD[String]=lines.flatMap(_.split(""))println(words.toDebugString)//打印血缘关系

valwords:RDD[String]=lines.flatMap(_.split(""))println(words.dependencies)//打印依赖关系

新的RDD的一个分区的数据依赖于旧的RDD的一个分区的数据,这个依赖称之为OneToOne依赖(窄依赖)。

新的RDD的一个分区的数据依赖于旧的RDD的多个分区的数据,这个依赖称之为Shuffle依赖(宽依赖)。

2.依赖关系---阶段分区任务

shuffle前后,任务发生了改变。

shuffle会打乱数据,出现等待,所以会划分阶段。

一个阶段的task任务都执行完毕才能执行下一个阶段的任务。

当RDD中存在Shuffle依赖时,阶段会自动增加一个

阶段的数量=shuffle依赖的数量+1(为ResultStage),ResultStage只有一个,最后需要执行的阶段。

RDD的任务划分:

RDD任务切分,中间分为:Application、Job、Stage、Task

Application:初始化一个SparkContext即生成一个Application

Job:一个Action算子就会生成一个Job

Stage:Stage等于宽依赖(ShuffleDependency)的个数加1

Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。

注意:Application-Job-Stage-Task每一层都是1对n的关系。

handleJobSubmitted

submitStage----submitMissingTasks(只有一个stage时)---submitMissingTasks

valpartitionsToCompute:Seq[Int]=stage.findMissingPartitions()overridedeffindMissingPartitions():Seq[Int]={valjob=activeJob.get(0untiljob.numPartitions).filter(id=!job.finished(id))}

任务数计算是根据分区数决定的。

3.RDD的持久化

1)RDDCache缓存

RDD通过Cache或Persist方法将前面的计算结果缓存,默认情况下会把数据缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用(在数据执行时间较长或数据比较重要的场合也可以持久化)。

cache操作会增加血缘关系(一旦出现问题,可以从头读取数据),不改变原有的血缘关系。

//数据缓存wordToOneRdd.cache()//可以更改存储级别mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)

RDD的持久化引入

valconf:SparkConf=newSparkConf().setAppName("Spark01_WordCount_02").setMaster("local[2]")valsc:SparkContext=newSparkContext(conf)valrdd:RDD[String]=sc.makeRDD(List("helloscala","hellospark"))valwords:RDD[String]=rdd.flatMap(_.split(""))valmapRdd:RDD[(String,Int)]=words.map(word={println("******************")(word,1)})valres:RDD[(String,Int)]=mapRdd.reduceByKey(_+_)res.collect().foreach(println)println("=============================")//RDD不存储数据,如果一个RDD需要重复使用,那么需要从头再次执行来获取数据//RDD对象可以重用,但是数据无法重用valres1:RDD[(String,Iterable[Int])]=mapRdd.groupByKey()res1.collect().foreach(println)

这样改写只是对象重复使用,RDD不存储数据,数据还需再次执行才可获取。

持久化保存在内存或磁盘是临时文件,当作业执行完毕会被删除

valconf:SparkConf=newSparkConf().setAppName("Spark01_WordCount_02").setMaster("local[2]")valsc:SparkContext=newSparkContext(conf)valrdd:RDD[String]=sc.makeRDD(List("helloscala","hellospark"))valwords:RDD[String]=rdd.flatMap(_.split(""))valmapRdd:RDD[(String,Int)]=words.map(word={println("******************")(word,1)})//cache默认存储在内存中,要想要保存到磁盘文件,需要更改存储级别//mapRdd.cache()//持久化数据可以重复使用mapRdd.persist(StorageLevel.DISK_ONLY)valres:RDD[(String,Int)]=mapRdd.reduceByKey(_+_)res.collect().foreach(println)println("=============================")//RDD不存储数据,如果一个RDD需要重复使用,那么需要从头再次执行来获取数据//RDD对象可以重用,但是数据无法重用valres1:RDD[(String,Iterable[Int])]=mapRdd.groupByKey()res1.collect().foreach(println)

2)RDDCheckPoint检查点

检查点就是通过将RDD中间结果写入磁盘。

由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少开销。

对RDD进行CheckPoint操作并不会马上被执行,必须执行action操作才能触发。

CheckPoint需要落盘,需要指定检查点保存路径。

检查点路径保存的文件,当作业执行完毕后不会被删除。

一般保存路径都是在分布式存储系统:HDFS

sc.setCheckpointDir("datas/cp")mapRdd.checkpoint()

区别:

cache:将数据临时存储在内存中进行数据重用

cache操作会增加血缘关系(一旦出现问题,可以从头读取数据),不改变原有的血缘关系

persist:将数据临时存储在磁盘文件中进行数据重用

涉及到磁盘IO,性能较低,但是数据安全

如果作业执行完毕,临时保存的数据文件就会丢失

checkoutPoint:将数据长久地存储在磁盘文件中进行数据重用

涉及到磁盘IO,性能较低,但是数据安全

为了保证数据安全,所以一般情况下,会独立执行作业(可通过代码打印看出)

为了能够提高效率,一般情况下,需要和cache联合使用

执行过程中会切断血缘关系,重新建立新的血缘关系(解释:因为数据保存起来,相当于数据源发生了改变)

源码:

runJob---rdd.doCheckpoint()---

4.RDD分区器

Spark目前支持Hash分区和Range分区,和用户自定义分区。Hsah分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区,进而决定了Reduce的个数。

只有key-value类型的RDD才有分区器,非key-value类型的RDD分区的值是None。

每个RDD的分区ID范围:0~(numPartitions-1),决定这个值是属于哪个分区的。

objectRDD_partition_01{defmain(args:Array[String]):Unit={valconf:SparkConf=newSparkConf().setAppName("Spark01_WordCount_02").setMaster("local[2]")valsc:SparkContext=newSparkContext(conf)valrdd:RDD[(String,String)]=sc.makeRDD(List(("nba","xxxxxxxxxxx"),("wnba","xxxxxxxxxxx"),("cba","xxxxxxxxxxx"),("nba","xxxxxxxxxxx")))valpartRdd:RDD[(String,String)]=rdd.partitionBy(newMyPartitioner)partRdd.saveAsTextFile("output")sc.stop()}/***自定义分区*/classMyPartitionerextendsPartitioner{//分区数overridedefnumPartitions:Int=3//根据数据的key值返回数据的分区索引(从0开始)overridedefgetPartition(key:Any):Int={keymatch{case"nba"=0case"wnba"=1case_=2}}}}

5.RDD的文件读取与保存

Spark的数据读取和数据保存可以从两个维度来区分:文件格式以及文件系统

文件格式分为:text文件、csv文件、sequence文件、Object文件

文件系统分为:本地文件系统、HDFS、HBASE以及数据库

Text文件:

//读取数据sc.textFile("input/data.txt")//保存文件inputRdd.saveAsTextFile("output")

SequenceFile文件是Hadoop用来存储二进制形式的Key-Value对而设计的一种平面文件(FlatFile)。在SparkContext中,可以调用sequenceFile[KeyClass,ValueClass](path)。

//保存数据dataRdd.saveAsSequenceFile("output")//读取文件sc.sequenceFile[Int,Int]("output").collect().foreach(println)

Object对象文件

对象文件是将对象序列化后保存的文件,采用Java的序列化机制,可以通过objectFile[ClassTag](path)函数接收一个路径,读取对象文件。因为是序列化所以要指定类型。

//保存数据dataRdd.saveAsObjectFile("output")//读取数据sc.objectFile[(String,Int)]("output").collect().foreach(println)

RDD累加器

累加器:分布式共享只写变量

累加器用来把Executor端变量信息聚合到Driver端。

在Driver程序中定义的变量,在Executor端的每一个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。

累加器引入:

valrdd:RDD[Int]=sc.makeRDD(List(1,2,3,4))varsum:Int=0rdd.foreach(num={sum+=num})println("sum="+sum)//结果为0,因为Driver和Executor分布式

累加器使用:

Spark默认就提供了简单数据聚合的累加器

valrdd:RDD[Int]=sc.makeRDD(List(1,2,3,4))//获取系统累加器//Spark默认就提供了简单数据聚合的累加器valsumAcc:LongAccumulator=sc.longAccumulator("sum")//取名可以在UI界面监控rdd.foreach(num={//使用累加器sumAcc.add(num)})//获取累加器的值println(sumAcc.value)

累加器使用问题:

会出现少加/多加的问题

少加:转换算子中调用累加器,如果没有行动算子,则不会执行

多加:多个行动算子,则会执行多次

一般情况下,累加器会放在行动算子中进行操作

//少加valmapRdd:RDD[Int]=rdd.map(num={//使用累加器sumAcc.add(num)num})

//多加valmapRdd:RDD[Int]=rdd.map(num={//使用累加器sumAcc.add(num)num})mapRdd.collect()mapRdd.collect()println(sumAcc.value)//结果为20累加两次行动算子会触发累加器计算

自定义累加器:

objectSpark_acc_04_myAcc{defmain(args:Array[String]):Unit={valconf:SparkConf=newSparkConf().setAppName("Operator").setMaster("local[2]")valsc:SparkContext=newSparkContext(conf)valrdd:RDD[String]=sc.makeRDD(List("hello","spark","hello","scala"))//累加器:WordCount//创建累加器valmyAcc:MyAccumulator=newMyAccumulator()//向Spark注册sc.register(myAcc,"WordCount")rdd.foreach(word={myAcc.add(word)})println(myAcc.value)sc.stop()}/***自定义累加器:WordCount*1.继承AccumulatorV2指定参数类型*IN:累加器输入的数据类型String*OUT:累加器返回的数据类型mutable.Map[String,Long]*2.重新方法*/classMyAccumulatorextendsAccumulatorV2[String,mutable.Map[String,Long]]{privatevarwcMap=mutable.Map[String,Long]()//判断是否为初始状态overridedefisZero:Boolean={wcMap.isEmpty}overridedefcopy():AccumulatorV2[String,mutable.Map[String,Long]]={newMyAccumulator()}//重置overridedefreset():Unit={wcMap.clear()}//获取累加器需要计算的值overridedefadd(word:String):Unit={valnewCnt=wcMap.getOrElse(word,0L)+1wcMap.update(word,newCnt)}//合并累加器overridedefmerge(other:AccumulatorV2[String,mutable.Map[String,Long]]):Unit={valmap1=this.wcMapvalmap2=other.valuemap2.foreach{case(word,count)={valnewCnt=map1.getOrElse(word,0L)+countmap1.update(word,newCnt)}}}//累加器结果overridedefvalue:mutable.Map[String,Long]={wcMap}}}

广播变量

闭包数据都是以Task为单位发送的,每个任务中包含闭包数据,这样可能会导致,一个Executor中(有多个Task)含有大量重复的数据,并且占用大量的内存。

Executor其实就是一个JVM,所以在启动时会自动分配内存。完全可以将任务中的闭包数据放在Executor的内存中,达到Executor中的Task共享闭包数据的目的。

Spark中的广播变量就可以将闭包的数据保存到Executor的内存中。

广播变量不能更改。-------分布式共享只读变量

valrdd1:RDD[(String,Int)]=sc.makeRDD(List(("a",1),("b",2),("c",3)))valmap:mutable.Map[String,Int]=mutable.Map(("a",4),("b",5),("c",6))//封装广播变量valbc:Broadcast[mutable.Map[String,Int]]=sc.broadcast(map)rdd1.map{case(w,c)={//获取广播变量valvalue=bc.value.getOrElse(w,0)(w,(c,value))}}

实例实操

数据格式:

字段:日期(-05-05),用户ID(85),SessionID(b23eroit-aasd-4df3-acfd),页面ID,动作时间(-05-:54:16),搜索关键词(苹果),点击品类ID和产品ID(15,23),下单品类ID和产品ID(1,2,3),支付品类ID和产品ID(1,2,3),城市ID(20)

用户的4种行为数据:搜索、点击、下单、支付(四种行为不能同一时间发生,有无效值)

数据规则如下:

1)数据各个字段间用下划线分隔

2)每一行数据表示用户的一次行为,这个行为只能是4种行为中的一种

3)若搜索关键字为null,表示数据不是搜索数据

4)若点击的品类ID和产品ID为-1,表示数据不是点击数据

5)针对下单行为,一次可以下单多个商品,所以品类ID和产品ID可以是多个,ID之间采用逗号分隔。若本次不是下单行为,则数据采用null表示

6)支付行为和下单行为类似

详细字段说明:

编号字段名称字段类型字段含义1dateString用户点击行为的日期2user_idLong用户的ID3session_idStringSession的ID4page_idLong某个页面的ID5action_timeString动作的时间点6search_keywordString用户搜索的关键词7click_category_idLong某一商品品类ID8click_product_idLong某一商品的ID9order_category_idsString一次订单中所有品类的ID集合10order_product_idsString一次订单中所有商品的ID集合????11pay_category_idsString一次支付中所有品类的ID集合12pay_product_idsString一次支付中所有商品的ID集合????13city_idLong城市ID

objectSpark_req_HotCategoryTop10_Analysis{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("HotCategoryTop10").setMaster("local

  • ")valsc:SparkContext=newSparkContext(conf)valactionRdd:RDD[String]=sc.textFile("datas/uesr_action_info.txt")//2.统计品类的点击数量:(品类ID,点击数量)valclickRdd:RDD[String]=actionRdd.filter(action={valdatas:Array[String]=action.split("_")datas(6)!="-1"//若点击的品类ID和产品ID为-1,表示数据不是点击数据})valclickCntRdd:RDD[(String,Int)]=clickRdd.map(action={valdatas:Array[String]=action.split("_")(datas(6),1)}).reduceByKey(_+_)//3.统计品类的下单数量:(品类ID,下单数量)valorderRdd:RDD[String]=actionRdd.filter(action={valdatas:Array[String]=action.split("_")datas(8)!="null"//若本次不是下单行为,则数据采用null表示//datas(8)数据格式为"1,2,3"------flatMap操作})valorderCntRdd:RDD[(String,Int)]=orderRdd.flatMap(action={valdatas:Array[String]=action.split("_")valcid:String=datas(8)valcids:Array[String]=cid.split(",")cids.map(id=(id,1))}).reduceByKey(_+_)//4.统计品类的支付数量:(品类ID,支付数量)valpayRdd:RDD[String]=actionRdd.filter(action={valdatas:Array[String]=action.split("_")datas(8)!="null"//若本次不是下单行为,则数据采用null表示//datas(8)数据格式为"1,2,3"------flatMap操作})valpayCntRdd:RDD[(String,Int)]=orderRdd.flatMap(action={valdatas:Array[String]=action.split("_")valcid:String=datas(10)valcids:Array[String]=cid.split(",")cids.map(id=(id,1))}).reduceByKey(_+_)//5.将品类进行排序,并且取前10名//点击数量排序、下单数量排序、支付数量排序//(品类ID,(点击数量,下单数量,支付数量))//算子选择:join,leftOuterJoin,cogroup=connect+group//join:排除,必须相同的key才能连接,可能出现品类A三种操作里没有的情况valcogroupRdd:RDD[(String,(Iterable[Int],Iterable[Int],Iterable[Int]))]=clickCntRdd.cogroup(orderCntRdd,payCntRdd)valanalysisRdd:RDD[(String,(Int,Int,Int))]=cogroupRdd.mapValues{case(clickIter,orderIter,payIter)={varclickCnt=0valit1:Iterator[Int]=clickIter.iteratorif(it1.hasNext){clickCnt=it1.next()}varorderCnt=0valit2:Iterator[Int]=orderIter.iteratorif(it2.hasNext){orderCnt=it2.next()}varpayCnt=0valit3:Iterator[Int]=payIter.iteratorif(it3.hasNext){payCnt=it3.next()}(clickCnt,orderCnt,payCnt)}}valresultRdd:Array[(String,(Int,Int,Int))]=analysisRdd.sortBy(_._2,false).take(10)//6.将数据打印在控制台上resultRdd.foreach(println)sc.stop()}}

    上述写法需要改进:

    问题1:actionRdd重复使用,RDD不存数据,效率低

    改进:使用持久化

    问题2:cogroup查看源码有可能存在shuffle(当分区器不同时),性能可能低

    改进:不使用cogroup,直接转化数据格式

    (品类ID,点击数量)=(品类ID,(点击数量,0,0))

    (品类ID,下单数量)=(品类ID,(0,下单数量,0))

    两两聚合

    union+reduceByKey

    defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("HotCategoryTop10").setMaster("local

  • ")valsc:SparkContext=newSparkContext(conf)valactionRdd:RDD[String]=sc.textFile("datas/uesr_action_info.txt")actionRdd.cache()//2.统计品类的点击数量:(品类ID,点击数量)valclickRdd:RDD[String]=actionRdd.filter(action={valdatas:Array[String]=action.split("_")datas(6)!="-1"//若点击的品类ID和产品ID为-1,表示数据不是点击数据})valclickCntRdd:RDD[(String,Int)]=clickRdd.map(action={valdatas:Array[String]=action.split("_")(datas(6),1)}).reduceByKey(_+_)//3.统计品类的下单数量:(品类ID,下单数量)valorderRdd:RDD[String]=actionRdd.filter(action={valdatas:Array[String]=action.split("_")datas(8)!="null"//若本次不是下单行为,则数据采用null表示//datas(8)数据格式为"1,2,3"------flatMap操作})valorderCntRdd:RDD[(String,Int)]=orderRdd.flatMap(action={valdatas:Array[String]=action.split("_")valcid:String=datas(8)valcids:Array[String]=cid.split(",")cids.map(id=(id,1))}).reduceByKey(_+_)//4.统计品类的支付数量:(品类ID,支付数量)valpayRdd:RDD[String]=actionRdd.filter(action={valdatas:Array[String]=action.split("_")datas(8)!="null"//若本次不是下单行为,则数据采用null表示//datas(8)数据格式为"1,2,3"------flatMap操作})valpayCntRdd:RDD[(String,Int)]=orderRdd.flatMap(action={valdatas:Array[String]=action.split("_")valcid:String=datas(10)valcids:Array[String]=cid.split(",")cids.map(id=(id,1))}).reduceByKey(_+_)//5.将品类进行排序,并且取前10名//点击数量排序、下单数量排序、支付数量排序//(品类ID,(点击数量,下单数量,支付数量))valrdd1:RDD[(String,(Int,Int,Int))]=clickCntRdd.map{case(clik,cnt)={(clik,(cnt,0,0))}}valrdd2:RDD[(String,(Int,Int,Int))]=orderCntRdd.map{case(order,cnt)={(order,(0,cnt,0))}}valrdd3:RDD[(String,(Int,Int,Int))]=payCntRdd.map{case(pay,cnt)={(pay,(0,0,cnt))}}//将三个数据源合并在一起,统一进行聚合计算union算子:并集valsourceRdd:RDD[(String,(Int,Int,Int))]=rdd1.union(rdd2).union(rdd3)valanalysisRdd:RDD[(String,(Int,Int,Int))]=sourceRdd.reduceByKey(//reduceByKey也有shuffle(t1,t2)={(t1._1+t2._1,t1._2+t2._2,t1._3+t2._3)})valresultRdd:Array[(String,(Int,Int,Int))]=analysisRdd.sortBy(_._2,false).take(10)//6.将数据打印在控制台上resultRdd.foreach(println)sc.stop()}

    改进3:

    问题1:clickCntRdd过程和后面三个数据源聚合时大量使用reduceByKey,性能低

    改进:在clickCntRdd过程就一步到位转化数据格式=(品类ID,(点击数量,0,0))

    reduceByKey聚合算子Spark会提供优化,预聚合缓存

    defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("HotCategoryTop10").setMaster("local

  • ")valsc:SparkContext=newSparkContext(conf)valactionRdd:RDD[String]=sc.textFile("datas/uesr_action_info.txt")//2.转化数据格式点击场合:(品类ID,(1,0,0))//下单场合:(品类ID,(0,1,0))//支付场合:(品类ID,(0,0,1))valflatRdd:RDD[(String,(Int,Int,Int))]=actionRdd.flatMap(action={valdatas:Array[String]=action.split("_")if(datas(6)!="-1")//点击场合{List((datas(6),(1,0,0)))}elseif(datas(8)!="null")//下单场合{valids:Array[String]=datas(8).split(",")ids.map(id=(id,(0,1,0)))}elseif(datas(10)!="null")//支付场合{valids:Array[String]=datas(10).split(",")ids.map(id=(id,(0,0,1)))}else{Nil}})//3.将相同的品类ID的数据进行分组聚合(品类ID,(点击数量,下单数量,支付数量))valanalysisRdd:RDD[(String,(Int,Int,Int))]=flatRdd.reduceByKey((t1,t2)={(t1._1+t2._1,t1._2+t2._2,t1._3+t2._3)})//4.将统计结果根据数量进行降序处理,取前10valresultRdd:Array[(String,(Int,Int,Int))]=analysisRdd.sortBy(_._2,false).take(10)//5.将数据打印在控制台上resultRdd.foreach(println)sc.stop()}

    改进4:

    问题1:还是使用了reduceByKey有shuffle性能低

    改进:累加器

    defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("HotCategoryTop10").setMaster("local

  • ")valsc:SparkContext=newSparkContext(conf)valactionRdd:RDD[String]=sc.textFile("datas/uesr_action_info.txt")//2.注册累加器valacc:HotCategoryAccumulator=newHotCategoryAccumulatorsc.register(acc)//统计数据actionRdd.foreach(action={valdatas:Array[String]=action.split("_")if(datas(6)!="-1")//点击场合{acc.add((datas(6),"click"))}elseif(datas(8)!="null")//下单场合{valids:Array[String]=datas(8).split(",")ids.foreach{id=acc.add((id,"order"))}}elseif(datas(10)!="null")//支付场合{valids:Array[String]=datas(10).split(",")ids.foreach{id=acc.add((id,"pay"))}}})valaccVal:mutable.Map[String,HotCategory]=acc.valuevalcategories:mutable.Iterable[HotCategory]=accVal.map(_._2)//可迭代的不能排序转化为List//sortWith自定义排序规则valsort:List[HotCategory]=categories.toList.sortWith((left,right)={if(left.clickCntright.clickCnt){true}elseif(left.clickCnt==right.clickCnt){if(left.orderCntright.orderCnt){true}elseif(left.orderCnt==right.orderCnt){left.payCntright.payCnt}else{false}}else{false}})//4.将统计结果根据数量进行降序处理,取前10,将数据打印在控制台上sort.take(10).foreach(println)sc.stop()}caseclassHotCategory(cid:String,varclickCnt:Int,varorderCnt:Int,varpayCnt:Int)/***自定义累加器*1.继承AccumulatorV2,定义泛型*IN:累加器输入的数据类型(品类ID,行为类型)*OUT:累加器返回的数据类型mutable.Map[String,HotCategory]*2.重写方法*/classHotCategoryAccumulatorextendsAccumulatorV2[(String,String),mutable.Map[String,HotCategory]]{privatevalhcMap=mutable.Map[String,HotCategory]()overridedefisZero:Boolean={hcMap.isEmpty}overridedefcopy():AccumulatorV2[(String,String),mutable.Map[String,HotCategory]]={newHotCategoryAccumulator()}overridedefreset():Unit={hcMap.clear()}overridedefadd(v:(String,String)):Unit={valcid=v._1valactionType=v._2valcategory:HotCategory=hcMap.getOrElse(cid,HotCategory(cid,0,0,0))if(actionType=="click"){category.clickCnt+=1}elseif(actionType=="order"){category.orderCnt+=1}elseif(actionType=="pay"){category.payCnt+=1}hcMap.update(cid,category)}overridedefmerge(other:AccumulatorV2[(String,String),mutable.Map[String,HotCategory]]):Unit={valmap1=this.hcMapvalmap2=other.valuemap2.foreach{case(cid,hotCategory)={valcategory:HotCategory=map1.getOrElse(cid,HotCategory(cid,0,0,0))category.clickCnt+=hotCategory.clickCntcategory.orderCnt+=hotCategory.orderCntcategory.payCnt+=hotCategory.payCntmap1.update(cid,category)}}}overridedefvalue:mutable.Map[String,HotCategory]={hcMap}}

    需求二:

    Top10热门品类中每个品类的Top10活跃Session统计

    在需求一的基础上,增加每个品类用户Session的点击统计

    defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("HotCategoryTop10").setMaster("local

  • ")valsc:SparkContext=newSparkContext(conf)valactionRdd:RDD[String]=sc.textFile("datas/uesr_action_info.txt")actionRdd.cache()valtop10Ids:Array[String]=top10Category(actionRdd)//1.过滤原始数据,保留点击和前10品类IDvalfilterActionRdd:RDD[String]=actionRdd.filter(action={valdatas:Array[String]=action.split("_")if(datas(6)!="-1"){top10Ids.contains(datas(6))}else{false}})//2.根据品类ID和sessionID进行点击量的统计valreduceRdd:RDD[((String,String),Int)]=filterActionRdd.map(action={valdatas:Array[String]=action.split("_")((datas(6),datas(2)),1)//((品类ID,Session),1)}).reduceByKey(_+_)//3.将统计的结果进行结构的转换//((品类ID,SessionID),sum)=(品类ID,(SessionID,sum))valmapRdd:RDD[(String,(String,Int))]=reduceRdd.map{case((cid,sessionId),sum)={(cid,(sessionId,sum))}}//4.相同的品类进行分组valgroupRdd:RDD[(String,Iterable[(String,Int)])]=mapRdd.groupByKey()//5.相同的key,value进行排序,取前10valsortRdd:RDD[(String,List[(String,Int)])]=groupRdd.mapValues(iter={iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)})//6.打印在控制台上Top10品类中的每个品类的Top10Session的统计结果sortRdd.collect().foreach(println)sc.stop()}deftop10Category(actionRdd:RDD[String]):Array[String]={valflatRdd:RDD[(String,(Int,Int,Int))]=actionRdd.flatMap(action={valdatas:Array[String]=action.split("_")if(datas(6)!="-1")//点击场合{List((datas(6),(1,0,0)))}elseif(datas(8)!="null")//下单场合{valids:Array[String]=datas(8).split(",")ids.map(id=(id,(0,1,0)))}elseif(datas(10)!="null")//支付场合{valids:Array[String]=datas(10).split(",")ids.map(id=(id,(0,0,1)))}else{Nil}})//3.将相同的品类ID的数据进行分组聚合(品类ID,(点击数量,下单数量,支付数量))valanalysisRdd:RDD[(String,(Int,Int,Int))]=flatRdd.reduceByKey((t1,t2)={(t1._1+t2._1,t1._2+t2._2,t1._3+t2._3)})//4.将统计结果根据数量进行降序处理,取前10analysisRdd.sortBy(_._2,false).take(10).map(_._1)}

    需求3:页面单跳转化率

    页面单跳转化率:计算页面单跳转化率,什么是页面单跳转化率,比如一个用户在一次Session过程中访问的页面路径3,5,7,9,10,21,那么页面3跳到页面5叫一次单跳,7-9也叫一次单跳,那么单跳转化率就是要统计页面点击的概率。

    比如:计算3-5的单跳转化率,先获取符合条件的Session对于页面3的访问次数(PV)为A,然后获取符合条件的Session中访问了页面3又紧接着访问了页面5的次数为B,那么B/A就是3-5的页面单跳转化率。

    defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("HotCategoryTop10").setMaster("local

  • ")valsc:SparkContext=newSparkContext(conf)valactionRdd:RDD[String]=sc.textFile("datas/uesr_action_info.txt")valactionDataRdd:RDD[UserVisitAction]=actionRdd.map(action={valdatas=action.split("_")UserVisitAction(datas(0),datas(1).toLong,datas(2),datas(3).toLong,datas(4),datas(5),datas(6).toLong,datas(7).toLong,datas(8),datas(9),datas(10),datas(11),datas(12).toLong)})actionDataRdd.cache()//TODO-计算分母每个页面的点击次数//对指定页面连续跳转进行统计//1-2,2-3,3-4,4-5,5-6,6-7valids=List[Long](1,2,3,4,5,6,7)valOkFlowIds=ids.zip(ids.tail)valpageIdCntMap:Map[Long,Long]=actionDataRdd.filter(action={//过滤数据ids.init.contains(action.page_id)//不包含最后一个(元素7,它不会作为分母)}).map(action={(action.page_id,1L)}).reduceByKey(_+_).collect().toMap//TODO-计算分子每个页面跳转到下一页面的次数//根据session进行分组----标识每个用户的valsessionRdd:RDD[(String,Iterable[UserVisitAction])]=actionDataRdd.groupBy(_.session_id)//分组后,根据访问时间进行排序(升序)后,数据简化//数据(sessionId,((页面1,页面2),1))valmvRdd:RDD[(String,List[((Long,Long),Int)])]=sessionRdd.mapValues(iter={valsortList:List[UserVisitAction]=iter.toList.sortBy(_.action_time)//将页面按时间线排序后组装成跳转的格式即=//=,,=//scala中sliding:滑窗//拉链zip//List的尾部信息valflowIds:List[Long]=sortList.map(_.page_id)valpageJumpIds:List[(Long,Long)]=flowIds.zip(flowIds.tail)//分子页面数据过滤将不合法的页面过滤掉pageJumpIds.filter(tuple={OkFlowIds.contains(tuple._1)}).map(t={(t,1)})})//((1,2),1)valflatRdd:RDD[((Long,Long),Int)]=mvRdd.map(_._2).flatMap(list=list)//((1,2),1)=((1,2),sum)valdataRdd:RDD[((Long,Long),Int)]=flatRdd.reduceByKey(_+_)//TODO-计算单跳转换率除以对应页面的即pageId1的所以分母的结果要保存成Map的形式,方便查找dataRdd.foreach{case((pageId1,pageId2),sum)={vallon:Long=pageIdCntMap.getOrElse(pageId1,0L)println(s"页面${pageId1}跳转到页面${pageId2}的单跳转化率为:"+(sum.toDouble/lon))}}sc.stop()}//用户访问动作表caseclassUserVisitAction(data:String,user_id:Long,session_id:String,page_id:Long,action_time:String,search_keyword:String,click_category_id:Long,click_product_id:Long,order_category_ids:String,order_product_ids:String,pay_category_ids:String,pay_product_ids:String,city_id:Long)

    注意:

    1.分母的过滤是在数据开始处理的时候

    2.而分子的数据过滤不能放在数据刚开始处理的时候,因为先做过滤页面跳转数据就不正确了(有些页面被过滤掉)

    分子的数据过滤应该放在

    工程化代码--架构模式

    MVC:ModelViewController

    JavaWeb:Servlet(Java+HTML+CSS+JS)

    JavaWeb:JSP

    三层架构:controller控制层、service服务层、dao持久层

    ThreadLocal可以对线程的内存进行控制,存储数据,共享数据

    不能解决线程安全问题

    SparkCore----总结

    Master和Worker是跟资源相关的概念

    Driver和Executor是跟计算相关的概念

    在提交任务的过程中会创建Driver和Executor

    预览时标签不可点收录于话题#个上一篇下一篇
  • 1
    查看完整版本: spark学习04核心编程