您现在的位置是:主页 > news > 嘉兴企业网络营销推广平台/网络推广优化服务

嘉兴企业网络营销推广平台/网络推广优化服务

admin2025/4/26 16:07:12news

简介嘉兴企业网络营销推广平台,网络推广优化服务,怎么做网站的点击率,it十大诈骗培训机构1. map和mapPartition 将DataSet中的每一个元素转换为另外一个元素 示例 使用map操作,将以下数据转换为一个scala的样例类。 “1,张三”, “2,李四”, “3,王五”, “4,赵六” 注意 map和mapPartition的效果是一样的, 但如果在map的函数中,需要访问一…

嘉兴企业网络营销推广平台,网络推广优化服务,怎么做网站的点击率,it十大诈骗培训机构1. map和mapPartition 将DataSet中的每一个元素转换为另外一个元素 示例 使用map操作,将以下数据转换为一个scala的样例类。 “1,张三”, “2,李四”, “3,王五”, “4,赵六” 注意 map和mapPartition的效果是一样的, 但如果在map的函数中,需要访问一…

1. map和mapPartition

将DataSet中的每一个元素转换为另外一个元素

示例

使用map操作,将以下数据转换为一个scala的样例类。

“1,张三”, “2,李四”, “3,王五”, “4,赵六”

注意

map和mapPartition的效果是一样的,

但如果在map的函数中,需要访问一些外部存储,如:访问mysql数据库,需要打开连接, 此时效率较低。

而使用mapPartition可以有效减少连接数,提高效率

参考代码

import org.apache.flink.api.scala.ExecutionEnvironment/*** 演示转换操作*/
object BatchTransformation {def main(args: Array[String]): Unit = {//获取envval env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._//mapval data: DataSet[String] = env.fromCollection(List("1,张三", "2,李四", "3,王五", "4,赵六"))case class User(id: String, name: String)val userDataSet: DataSet[User] = data.map(text => {val files = text.split(",")User(files(0), files(1))})userDataSet.print()//mapPartitionval userDataSet2 = data.mapPartition(iter => {// TODO:打开连接iter.map(ele => {val files = ele.split(",")User(files(0), files(1))})// TODO:关闭连接})userDataSet2.print()}
}

2. flatMap

flatMap的用法和之前学习Spark中的一模一样,今天学习一个新的用法

将DataSet中的每一个元素转换为另一个集合并压平为多个元素

将DataSet中的每一个元素转换为0~n个元素

示例

分别将以下数据,转换成国家、省份、城市三个维度的数据。

将以下数据

​ 张三,中国,江西省,南昌市

​ 李四,中国,河北省,石家庄市

转换为

​ (张三,中国)

​ (张三,中国,江西省)

​ (张三,中国,江西省,南昌市)

​ (李四,中国)

​ (李四,中国,河北省)

​ (李四,中国,河北省,石家庄市)

思路

- 以上数据为一条转换为三条,显然,应当使用flatMap来实现

- 分别在flatMap函数中构建三个数据,并放入到一个列表中

List(

(姓名, 国家),

(姓名, 国家, 省份),

(姓名, 国家, 省份, 城市)

)

参考代码

//flatMap
val data2 = env.fromCollection(List("张三,中国,江西省,南昌市","李四,中国,河北省,石家庄市"
))//使用flatMap将一条数据转换为三条数据
val resultDataSet: DataSet[(String, String)] = data2.flatMap(text => {val fieldArr = text.split(",")List((fieldArr(0), fieldArr(1)),(fieldArr(0), fieldArr(1) , fieldArr(2)),(fieldArr(0), fieldArr(1) , fieldArr(2) , fieldArr(3)))}
)
resultDataSet.print()
//(张三,中国)
//(张三,中国,江西省)
//(张三,中国,江西省,南昌市)
//(李四,中国)
//(李四,中国,河北省)
//(李四,中国,河北省,石家庄市)

3. filter

Filter函数在实际生产中特别实用,数据处理阶段可以过滤掉大部分不符合业务的内容,可以极大减轻整体flink的运算压力

示例:

过滤出来以下以长度>4的单词。

“hadoop”, “hive”, “spark”, “flink”

参考代码

//filter
val wordDataSet = env.fromCollection(List("hadoop", "hive", "spark", "flink"))
val resultDataSet2 = wordDataSet.filter(_.length > 4)
resultDataSet2.print()

4. reduce

可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素

示例1

请将以下元组数据,使用reduce操作聚合成一个最终结果

(“java” , 1) , (“java”, 1) ,(“java” , 1)

将上传元素数据转换为(“java”,3)

示例2

请将以下元组数据,下按照单词使用groupBy进行分组,再使用reduce操作聚合成一个最终结果

(“java” , 1) , (“java”, 1) ,(“scala” , 1)

转换为

(“java”, 2), (“scala”, 1)

参考代码

//reduce
val wordCountDataSet: DataSet[(String, Int)] = env.fromCollection(List(("java" , 1) , ("java", 1) ,("java" , 1)))
val resultDataSet3 = wordCountDataSet.reduce((wc1, wc2) => (wc2._1, wc1._2 + wc2._2))
resultDataSet3.print()//groupBy+reduce
val wordcountDataSet2: DataSet[(String, Int)] = env.fromCollection(List(("java" , 1) , ("java", 1) ,("scala" , 1)))
val groupedDataSet: GroupedDataSet[(String, Int)] = wordcountDataSet2.groupBy(_._1)
val resultDataSet4: DataSet[(String, Int)] = groupedDataSet.reduce((t1, t2) => (t1._1, t1._2 + t2._2))
resultDataSet4.print()//groupBy+sum
val wordcountDataSet3: DataSet[(String, Int)] = env.fromCollection(List(("java" , 1) , ("java", 1) ,("scala" , 1)))
val resultDataSet5: DataSet[(String, Int)]= wordcountDataSet3.groupBy(0).sum(1)
resultDataSet5.print()

5. reduceGroup

可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素

reduce和reduceGroup的区别

- reduce是将数据一个个拉取到另外一个节点,然后再执行计算

- reduceGroup是先在每个group所在的节点上执行计算,然后再拉取

在这里插入图片描述

示例

请将以下元组数据,下按照单词使用groupBy进行分组,再使用reduceGroup操作进行单词计数

(“java” , 1) , (“java”, 1) ,(“scala” , 1)

参考代码

val wordcountDataSet2: DataSet[(String, Int)] = env.fromCollection(List(("java" , 1) , ("java", 1) ,("scala" , 1)))
//reduceGroup
val groupedDataSet2 = wordcountDataSet2.groupBy(0)
val resultDataSet6 = groupedDataSet2.reduceGroup(iter =>{
iter.reduce((wc1, wc2) => (wc1._1,wc1._2 + wc2._2))
}
)
resultDataSet6.print()

6. Aggregate

按照内置的方式来进行聚合, Aggregate只能作用于元组上。例如:SUM/MIN/MAX…

参考代码

val wordcountDataSet2: DataSet[(String, Int)] = 
env.fromCollection(List(("java" , 1) , ("java", 1) ,("scala" , 1)))
val groupedDataSet2: GroupedDataSet[(String, Int)] = wordcountDataSet2.groupBy(0)//_._1报错
//aggregate
val resultDataSet7 = groupedDataSet2.aggregate(Aggregations.SUM, 1)
resultDataSet7.print()

注意

Aggregate只能作用于元组上

要使用aggregate,只能使用字段索引或索引名称来进行分组groupBy(0),否则会报一下错误:

Exception in thread “main” java.lang.UnsupportedOperationException: Aggregate does not support grouping with KeySelector functions, yet.

7. distinct

去除重复的数据

示例

请将以下元组数据,使用distinct操作去除重复的单词

(“java” , 1) , (“java”, 2) ,(“scala” , 1)

去重得到

(“java”, 1), (“scala”, 1)

参考代码

val wordcountDataSet2: DataSet[(String, Int)] = 
env.fromCollection(List(("java" , 1) , ("java", 1) ,("scala" , 1)))
//distinct
val resultDataSet8 = wordcountDataSet2.distinct(0)
resultDataSet8.print()

8. join

使用join可以将两个DataSet连接起来,返回想要的关联结果,

示例

有两个csv文件,有一个为score.csv,一个为subject.csv,分别保存了成绩数据以及学科数据。

需要将这两个数据连接到一起,然后打印出来。

在这里插入图片描述
在这里插入图片描述

参考代码

// 成绩Score(唯一ID、学生姓名、学科ID、分数)
case class Score(id:Int, name:String, subjectId:Int, score:Double)
// 学科Subject(学科ID、学科名字)
case class Subject(id:Int, name:String)
// 加载csv数据源
val scoreDataSet = env.readCsvFile[Score]("D:\\data\\score.csv")
val subjectDataSet = env.readCsvFile[Subject]("D:\\data\\subject.csv")
// join连接两个DataSet,并使用`where`、`equalTo`方法设置关联条件
val joinedDataSet = scoreDataSet.join(subjectDataSet).where(2).equalTo(0)
joinedDataSet.print()

9. LeftOuterJoin

左外连接,左边的Dataset中的每一个元素,去连接右边的元素

示例

请将以下元组数据(用户id,用户姓名)

(1, “zhangsan”) , (2, “lisi”) ,(3 , “wangwu”)

元组数据(用户id,所在城市)

(1, “beijing”), (2, “shanghai”), (4, “guangzhou”)

返回如下数据:

(3,wangwu,null)

(1,zhangsan,beijing)

(2,lisi,shanghai)

参考代码

val text1: DataSet[(Int, String)] = env.fromCollection(List((1,"zhangsan"),(2,"lisi"),(3,"wangwu")))
val text2: DataSet[(Int, String)] = env.fromCollection(List((1,"beijing"),(2,"shanghai"),(4,"guangzhou")))
/*OPTIMIZER_CHOOSES:将选择权交予Flink优化器;BROADCAST_HASH_FIRST:广播第一个输入端,同时基于它构建一个哈希表,而第二个输入端作为探索端,选择这种策略的场景是第一个输入端规模很小;BROADCAST_HASH_SECOND:广播第二个输入端并基于它构建哈希表,第一个输入端作为探索端,选择这种策略的场景是第二个输入端的规模很小;REPARTITION_HASH_FIRST:该策略会导致两个输入端都会被重分区,但会基于第一个输入端构建哈希表。该策略适用于第一个输入端数据量小于第二个输入端的数据量,但这两个输入端的规模仍然很大,优化器也是当没有办法估算大小,没有已存在的分区以及排序顺序可被使用时系统默认采用的策略;REPARTITION_HASH_SECOND:该策略会导致两个输入端都会被重分区,但会基于第二个输入端构建哈希表。该策略适用于两个输入端的规模都很大,但第二个输入端的数据量小于第一个输入端的情况;REPARTITION_SORT_MERGE:输入端被以流的形式进行连接并合并成排过序的输入。该策略适用于一个或两个输入端都已排过序的情况;
*/
val leftOuterJoinAssigner: JoinFunctionAssigner[(Int, String), (Int, String)] = text1.leftOuterJoin(text2,JoinHint.OPTIMIZER_CHOOSES).where(0).equalTo(0)
val leftOuterJoinResult: DataSet[(Int, String, String)] = leftOuterJoinAssigner.apply((first, second) => {if (second == null) {(first._1, first._2, "null")} else {(first._1, first._2, second._2)}
})
leftOuterJoinResult.print()

10. RightOuterJoin

右外连接,右边的Dataset中的每一个元素,去连接左边的元素

示例

请将以下元组数据(用户id,用户姓名)

(1, “zhangsan”) , (2, “lisi”) ,(3 , “wangwu”)

元组数据(用户id,所在城市)

(1, “beijing”), (2, “shanghai”), (4, “guangzhou”)

返回如下数据:

(1,zhangsan,beijing)

(2,lisi,shanghai)

(4,null,guangzhou)

参考代码

text1.rightOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{if(first==null){(second._1,"null",second._2)}else{(first._1,first._2,second._2)}}).print()

11. fullOuterJoin

全外连接,左右两边的元素,全部连接

示例

请将以下元组数据(用户id,用户姓名)

(1, “zhangsan”) , (2, “lisi”) ,(3 , “wangwu”)

元组数据(用户id,所在城市)

(1, “beijing”), (2, “shanghai”), (4, “guangzhou”)

返回如下数据:

(3,wangwu,null)

(1,zhangsan,beijing)

(2,lisi,shanghai)

(4,null,guangzhou)

参考代码

text1.fullOuterJoin(text2,JoinHint.REPARTITION_SORT_MERGE).where(0).equalTo(0).apply((first,second)=>{if(first==null){(second._1,"null",second._2)}else if(second==null){(first._1,first._2,"null")}else{(first._1,first._2,second._2)}}).print()

12. union

将两个DataSet取并集,不会去重。

示例

将以下数据进行取并集操作

数据集1

“hadoop”, “hive”, “flume”

数据集2

“hadoop”, “hive”, “spark”

注意:

union合并的DataSet的类型必须是一致的

参考代码

val wordDataSet1: DataSet[String] = env.fromCollection(List("hadoop", "hive", "flume"))
val wordDataSet2: DataSet[String] = env.fromCollection(List("hadoop", "hive", "spark"))
val unionresult: DataSet[String] = wordDataSet1.union(wordDataSet2)
unionresult.print()

13. rebalance

类似于Spark中的repartition,但是功能更强大,可以直接解决数据倾斜

Flink也有数据倾斜的时候,比如当前有数据量大概10亿条数据需要处理,在处理过程中可能会发生如图所示的状况:

在这里插入图片描述

出现了数据倾斜,其他3台机器执行完毕也要等待机器1执行完毕后才算整体将任务完成;

所以在实际的工作中,出现这种情况比较好的解决方案就是rebalance(内部使用round robin方法将数据均匀打散)

在这里插入图片描述

//创建并行数据
val numDataSet: DataSet[Long] = env.generateSequence(0, 100)
val filterDataSet:DataSet[Long] = numDataSet.filter(_ > 8)//RichMapFunction将当前子任务的ID和数字构建成一个元组
val result: DataSet[(Long, Long)] = filterDataSet.map(new RichMapFunction[Long, (Long, Long)] {override def map(in: Long): (Long, Long) = {(getRuntimeContext.getIndexOfThisSubtask, in)}
})
result.print()//上述代码如果没有加rebalance,通过观察,数据随机的分发给各个子任务(分区),有可能会出现数据倾斜。
//在filter计算完后,调用rebalance,这样,就会均匀地将数据分布到每一个分区中。
println("============================================================================")
val numDataSet2: DataSet[Long] = env.generateSequence(0, 100)
val filterDataSet2: DataSet[Long] = numDataSet.filter(_ > 8)val rebalanced2: DataSet[Long] = filterDataSet.rebalance()val result2: DataSet[(Long, Long)] = rebalanced2.map(new RichMapFunction[Long, (Long, Long)] {override def map(in: Long): (Long, Long) = {(getRuntimeContext.getIndexOfThisSubtask, in)}
})
result2.print()

14. 分区

partitionByHash

partitionByRange

sortPartition

按照指定的key进行分区

env.setParallelism(2)
val datas = new mutable.MutableList[(Int, Long, String)]
datas.+=((1, 1L, "Hello"))
datas.+=((2, 2L, "Hello"))
datas.+=((3, 2L, "Hello"))
datas.+=((4, 3L, "Hello"))
datas.+=((5, 3L, "Hello"))
datas.+=((6, 3L, "hehe"))
datas.+=((7, 4L, "hehe"))
datas.+=((8, 4L, "hehe"))
datas.+=((9, 4L, "hehe"))
datas.+=((10, 4L, "hehe"))
datas.+=((11, 5L, "hehe"))
datas.+=((12, 5L, "hehe"))
datas.+=((13, 5L, "hehe"))
datas.+=((14, 5L, "hehe"))
datas.+=((15, 5L, "hehe"))
datas.+=((16, 6L, "hehe"))
datas.+=((17, 6L, "hehe"))
datas.+=((18, 6L, "hehe"))
datas.+=((19, 6L, "hehe"))
datas.+=((20, 6L, "hehe"))
datas.+=((21, 6L, "hehe"))
val collection: DataSet[(Int, Long, String)] = env.fromCollection(Random.shuffle(datas))val partitionByHash: DataSet[(Int, Long, String)] = collection.partitionByHash(_._3)
val partitionByRange: DataSet[(Int, Long, String)] = collection.partitionByRange(x => x._1)
val partitionBysort: DataSet[(Int, Long, String)] = collection.sortPartition(_._2, Order.DESCENDING)partitionByHash.writeAsText("D:\\data\\partitionByHash", WriteMode.OVERWRITE)
partitionByRange.writeAsText("D:\\data\\partitionByRange", WriteMode.OVERWRITE)
partitionBysort.writeAsText("D:\\data\\partitionBysort", WriteMode.OVERWRITE)env.execute()

15. minBy和maxBy

val scores = new mutable.MutableList[(Int, String, Double)]
scores.+=((1, "yuwen", 90.0))
scores.+=((2, "shuxue", 20.0))
scores.+=((3, "yingyu", 30.0))
scores.+=((4, "wuli", 40.0))
scores.+=((5, "yuwen", 50.0))
scores.+=((6, "wuli", 60.0))
scores.+=((7, "yuwen", 70.0))
val input: DataSet[(Int, String, Double)] = env.fromCollection(Random.shuffle(scores))
//求每个学科下的最小分数
val output: DataSet[(Int, String, Double)] = input.groupBy(1).minBy(2)
val output2: DataSet[(Int, String, Double)] = input.groupBy(1).min(2)
output.print()
output2.print()

16. cross

和join类似,但是这种交叉操作会产生笛卡尔积

env.setParallelism(1)
//Cross 交叉操作/笛卡尔积
val students = new mutable.MutableList[(Int, String)]
//学生
students.+=((1, "张三"))
students.+=((2, "李四"))
students.+=((3, "王五"))
students.+=((4, "赵六"))val subjects = new mutable.MutableList[(Int, String)]
//课程
subjects.+=((1,"Java"))
subjects.+=((2,"Python"))
subjects.+=((3,"前端"))
subjects.+=((4,"大数据"))val input1: DataSet[(Int, String)] = env.fromCollection(Random.shuffle(students))
val input2: DataSet[(Int, String)] = env.fromCollection(Random.shuffle(subjects))
val cross = input1.cross(input2){(input1 , input2) => (input1._1,input1._2,input2._1,input2._2)
}
cross.print()

a
env.setParallelism(1)
//Cross 交叉操作/笛卡尔积
val students = new mutable.MutableList[(Int, String)]
//学生
students.+=((1, “张三”))
students.+=((2, “李四”))
students.+=((3, “王五”))
students.+=((4, “赵六”))

val subjects = new mutable.MutableList[(Int, String)]
//课程
subjects.+=((1,“Java”))
subjects.+=((2,“Python”))
subjects.+=((3,“前端”))
subjects.+=((4,“大数据”))

val input1: DataSet[(Int, String)] = env.fromCollection(Random.shuffle(students))
val input2: DataSet[(Int, String)] = env.fromCollection(Random.shuffle(subjects))
val cross = input1.cross(input2){
(input1 , input2) => (input1._1,input1._2,input2._1,input2._2)
}
cross.print()