当前位置: 首页 > news >正文

陕西网站开发会展中心网站建设

陕西网站开发,会展中心网站建设,苏州首页关键词优化,枣阳建设局网站一、介绍 Join大体分类只有两种:Window Join和Interval Join Window Join有可以根据Window的类型细分出3种:Tumbling(滚动) Window Join、Sliding(滑动) Window Join、Session(会话) Widnow Join。 🌸Window 类型的join都是利用window的机制…

 一、介绍

Join大体分类只有两种:Window Join和Interval Join

Window Join有可以根据Window的类型细分出3种:Tumbling(滚动) Window Join、Sliding(滑动) Window Join、Session(会话) Widnow Join。

        🌸Window 类型的join都是利用window的机制,先将数据缓存在Window State中,当窗口触发计算时,执行join操作。

        🌸Interval join也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据触发数据清理,目前Stream join的结果是数据的卡尔积。

二、Window Join

✨Tumbling Window Join

        执行翻滚窗口联接时,具有公共键和公告翻滚窗口的所有元素将成对组合联接,并传递JoinFunction或FlatJoinFunction。因为它的行为类似于内部连接,所以一个流中的元素在其滚动窗口中没有来自另一个流的元素,因此不会被发射。

        如图所示,我们定义了一个为2毫秒的翻滚窗口,结果窗口的形式为[0,1]、[2,3]..............该图显示了每个窗口中所以元素的成对组合,这些元素将传递给JoinFunction。注意在翻滚窗口[6,7]中没有发射任何东西,因为绿色流中不存在与橙色元素⑥和⑦结合的元素。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

✨Sliding Window Join

        在执行滑动窗口联接时,具有公共键和公共滑动窗口的所以元素将作为成对组合联接,并传递JoinFunction或FlatJoinFunction。在当前滑动窗口中,一个流的元素没有来自另一个流的元素,则不会发射!请注意,某些元素可能会联接到一个滑动窗口中,但不会联接到另一个滑动窗口中!

        在本例中,我们使用大小为2毫秒的滑动窗口,并将其滑动1毫秒,从而产生滑动窗口[-1,0],[1,2],[2,3]...........x轴下方的连续元素时传递给每个滑动窗口的Join Function的元素。在这里,你还可以看到,例如在窗口[2,3]中,橙色②和绿色③连接,但在窗口[1,2]中没有与任何对象连接。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

✨Session Window Join

        在执行会话窗口联接时,具有相同键(当“组合”满足会话条件)的所有元素以成对组合方式联接,并传递给JoinFunction或FlatJoinFunction。同样,这执行一个内部连接,所以如果有一个会话窗口只包含来自一个流的元素,则不会发出任何输出

        这里,我们定义一个会话窗口连接,其中每个会话被至少1毫秒的时间分割。有三个会话,在前两个会话中,来自两个流的连接元素被传递给JoinFunction。在第三个会话中,绿色流中没有元素,所以⑧和⑨没有连接!

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

三、Interval Join

        前面学习的Window Join必须要在一个Window中进行Join,那如果没有Window如何处理呢?interval join也是使用相同的key来join两个流(流A、流B),并且流B中的元素中的时间戳,和流A元素的时间戳,有一个时间间隔。

b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] or

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

也就是:流B的元素的时间戳 ≥ 流A的元素时间戳 + 下界,且,流B的元素的时间戳 ≤ 流A的元素时间戳

 

在上面的示例中,我们将两个流“orange”和“green”连接起来,其下限为-2毫秒,上限为+1毫秒。默认情况下,这些边界是包含的,但是可以应用.lowerBoundExclusive()和.upperBoundExclusive来更改行为orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound 

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(first + "," + second);}});

 

http://www.yayakq.cn/news/118541/

相关文章:

  • 美容北京公司网站建设wordpress 火箭加速
  • 南安市城乡住房建设局网站网站维护与建设ppt
  • 青岛网站建设邓巴迪国家建设执业注册中心网站
  • 网站开发基础与提高郑州网站制作公司
  • 医药企业网站建设做一个网页一般多少钱
  • 东莞网站建设网站制作公司辽宁专业网页设计免费建站
  • 珠海网站建设招聘html页面 wordpress
  • 一条专访是哪个网站做的做外贸需要做国外的网站吗
  • 手机网站域名开头跨境电商网站建设流程
  • 网站搭建公司加盟网上如何推广平台
  • 试卷a《网站建设与管理》360建网站好不好?
  • 企业微信官方网站怎样查看wordpress用的什么主题
  • 福州网站建设yfznkjwordpress手机版登录
  • 英文网站 正文字体大小开发一个app多少钱
  • 网站建设外文版要求备案域名回收
  • 怎么做wood网站教程微信商城开发商华网天下优秀
  • h5手机网站开发wordpress p=29
  • 石家庄住房和城乡建设厅网站赚钱软件app
  • 网站建设哪家专业公司好做自媒体都有什么网站
  • 网站开发参考资料秦皇岛seo
  • 网站流量显示中国定制网
  • 禁止浏览器访问一个网站怎么做龙华和龙岗哪个繁华
  • 7块钱建购物网站公司网站制作哪个公司好
  • 代刷网自助建站系统网站分析 工具
  • 深圳做营销网站公司网站域名管理权限
  • 做的网站网站建设可以在家做吗
  • 手机网站底部悬浮菜单辽宁建设工程网
  • 沧州商贸行业网站建设店铺设计叫什么
  • 甘南网站建设许昌购物网站开发设计
  • 我想自己建个网站买货 怎么做百度推广效果怎样一天费用