当前位置: 首页 > news >正文

天津平台网站建设哪里好打广告网站

天津平台网站建设哪里好,打广告网站,沈阳网站建设方案外包,时代强个人网站大纲 sourceMapSplittingMapping ReduceKeyingReducing 完整代码结构参考资料 在《0基础学习PyFlink——模拟Hadoop流程》一文中,我们看到Hadoop在处理大数据时的MapReduce过程。 本节介绍的DataStream API,则使用了类似的结构。 source 为了方便&…

大纲

  • source
  • Map
    • Splitting
    • Mapping
  • Reduce
    • Keying
    • Reducing
  • 完整代码
  • 结构
  • 参考资料

在《0基础学习PyFlink——模拟Hadoop流程》一文中,我们看到Hadoop在处理大数据时的MapReduce过程。
在这里插入图片描述
本节介绍的DataStream API,则使用了类似的结构。

source

为了方便,我们依然使用from_collection从内存中读取数据。
和使用Table API类似,我们给from_collection传递的第二参数是每行数据类型。本例中是String,即“A C B”的类型。

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionModeword_count_data = ["A C B","A E B","E C D"]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.STRING()# define the sourcesource = env.from_collection(word_count_data, source_type_info)

可以使用下面指令输出source内容

    source.print()
A C B
A E B
E C D

Map

和上图一样,Map由Splitting和Mapping组成。它们分别将数据切割成做小运算单元,和生成map结构。

Splitting

    def split(line):for s in line.split():yield ssplitted = source.flat_map(split) 

上述splitted的结构输出是

A
C
B
A
E
B
E
C
D

Mapping

Mapping的操作就是将之前的数组结构转换成map结构

mapped=splitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))

mapped的输出值如下,可以看到它还是按我们输入数据的顺序排列的。

(A,1)
(C,1)
(B,1)
(A,1)
(E,1)
(B,1)
(E,1)
(C,1)
(D,1)

Reduce

Keying

这一步对应于上图中的Shuffling&Sorting,它会将相同key的数据进行分区,以供后面reducing操作使用。

    keyed=mapped.key_by(lambda i: i[0]) 

可以看到keyed数据已经经过排序和聚合了。

(A,1)
(A,1)
(B,1)
(B,1)
(C,1)
(C,1)
(D,1)

Reducing

 reduced=keyed.reduce(lambda i, j: (i[0], i[1] + j[1]))

reduce的方法有如下注释

Applies a reduce transformation on the grouped data stream grouped on by the given
key position. The ReduceFunction will receive input values based on the key value.
Only input values with the same key will go to the same reducer.

特别是最后一句非常有用“Only input values with the same key will go to the same reducer”(只有相同Key的输入数据才会进入相同的Reducer中)。这句话意味着上述Keyed的数据会被分组执行,于是就不会出现计算错乱。

(A,2)
(B,2)
(C,2)
(D,1)
(E,2)

完整代码

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionModeword_count_data = ["A C B","A E B","E C D"]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.STRING()# define the sourcesource = env.from_collection(word_count_data, source_type_info)# source.print()def split(line):for s in line.split():yield ssplitted = source.flat_map(split) # splitted.print()mapped=splitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))# mapped.print()keyed=mapped.key_by(lambda i: i[0]) # keyed.print()reduced=keyed.reduce(lambda i, j: (i[0], i[1] + j[1]))# define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

结构

在这里插入图片描述

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/python/datastream_tutorial/
http://www.yayakq.cn/news/717635/

相关文章:

  • perl网站开发搬家公司需要多少钱
  • 北京网站主题制作烟台网站seo服务
  • sjz住房建设局网站wordpress 雪人主题
  • 优化网站首页怎么做直播网站
  • 北京南昌网站制作wordpress 插件_
  • 商业网站开发实训心得体会范文网站建设后应该干什么
  • 网站建设需要下载哪些软件中国蔬菜网网站建设电话
  • 企业网站的在线推广方法有哪些购物网站建设开发费用分析
  • 网页制作软件dream抖音seo关键词优化
  • 陈村网站设计一个页面的网站
  • 网站建设阶段的推广自己做的网页怎么连接到网站
  • 门户网站制作的公司商城网站建设运营合同书
  • 网站创建公司哪家好河南万安建设集团有限公司网站
  • 网站加急备案网页设计教程课本
  • 网站建设之数据信息的保密性站长工具爱站网
  • 青岛免费建站网络推广设计建筑的软件
  • e通网网站建设福州做网站的公司电话
  • 营销型网站如何建设方案大型旅游网站源码 织梦 2016
  • 网站备案和域名备案一样吗代驾软件开发需要多少钱
  • 网站和网页建设题目wordpress被自动发布文章
  • 网站设计弹窗app注册拉新平台
  • 社交网站开发外文高德地图导航放弃重庆
  • 国外网站做任务套利有没有做数学题挣钱的网站
  • 个人网站的搭建方法短网址在线生成短网址
  • 怎样做自己的的社交网站新品发布会朋友圈文案
  • 做网站数据库坏了最权威的做网站设计公司价格
  • 找网络公司做网站需要注意什么怎么建自己公司网站
  • 企智网站建设什么是网站托管
  • 网站建设备案流程图荣耀手机正品官网查询
  • 卖域名的公司 骗做网站自己人网站建设