当前位置: 首页 > news >正文

建设网站怎样分配给用户空间佛山优化公司推广

建设网站怎样分配给用户空间,佛山优化公司推广,黄岛建设厅官方网站,建外贸网站有效果吗前言 在 Flink 窗口计算模型中,数据先经过 WindowAssigner 分配窗口,然后再经过触发器 Trigger,Trigger 决定了一个窗口何时被 ProcessFunction 处理。每个 WindowAssigner 都有一个默认的 Trigger,如果默认的不满足需求&#xf…

前言

在 Flink 窗口计算模型中,数据先经过 WindowAssigner 分配窗口,然后再经过触发器 Trigger,Trigger 决定了一个窗口何时被 ProcessFunction 处理。每个 WindowAssigner 都有一个默认的 Trigger,如果默认的不满足需求,可以通过 WindowedStream.trigger() 指定自定义的 Trigger。

认识Trigger

所有触发器都是org.apache.flink.streaming.api.windowing.triggers.Trigger的子类,父类定义了一个触发器应该具备的能力:

@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {private static final long serialVersionUID = -4104633972991191369L;public Trigger() {}public abstract TriggerResult onElement(T var1, long var2, W var4, TriggerContext var5) throws Exception;public abstract TriggerResult onProcessingTime(long var1, W var3, TriggerContext var4) throws Exception;public abstract TriggerResult onEventTime(long var1, W var3, TriggerContext var4) throws Exception;public boolean canMerge() {return false;}public void onMerge(W window, OnMergeContext ctx) throws Exception {throw new UnsupportedOperationException("This trigger does not support merging.");}public abstract void clear(W var1, TriggerContext var2) throws Exception;
}

Trigger 抽象类提供了六个方法:

  • onElement 元素被加入到窗口时触发,返回值决定窗口是否计算
  • onProcessingTime 注册的ProcessingTime任务时间到达时触发
  • onEventTime 注册的EventTime任务时间到达时触发
  • canMerge 是否可以合并
  • onMerge Trigger合并
  • clear 窗口被移除时触发

如果数据本身携带窗口是否触发计算的标记,那么重写 onElement() 方法即可;但是在时间窗口计算模型下,并不是通过元素来判断窗口是否需要计算的,而是窗口的结束时间到达时才出发计算,这个时候就需要用到定时器 Timer。Timer 就像一个闹钟,我们可以在它上面注册一个未来的时间戳,当这个时间到达时,对应的事件就会被触发,就像闹钟喊醒沉睡的你一样。

Trigger 会有自己的 Timer,TriggerContext 提供了注册时间事件的方法,你可以根据自己采用的时间语义,调用对应的注册方法来注册事件。

triggerContext.registerProcessingTimeTimer();
triggerContext.registerEventTimeTimer();

在重写这三个方法时,要重点关注方法的返回值。方法的返回值有两个作用:1、窗口内的数据是否可以计算,2、窗口内的数据是否需要清理。

public enum TriggerResult {CONTINUE(false, false),FIRE_AND_PURGE(true, true),FIRE(true, false),PURGE(false, true);private final boolean fire;private final boolean purge;
}

TriggerResult 枚举有四个值,含义分别是:

  • CONTINUE 不做任何操作
  • FIRE 触发窗口计算,但是数据仍然保留
  • PURGE 不触发计算,只是清理窗口内数据
  • FIRE_AND_PURGE 触发窗口计算,同时清理数据

内置的Trigger

Flink 内置了许多常用的 Trigger,大多数情况下它们足以支撑我们的业务场景,只有当内置的Trigger不符合要求时,才需要开发自定义的 Trigger。

1、EventTimeTrigger

和事件时间窗口搭配使用的 Trigger,直到 Watermark 时间戳等于窗口的结束时间才会触发计算。

onElement 的逻辑是:当有元素加入到窗口时,先判断当前 Watermark 时间戳是否到达窗口的结束时间,如果到了就直接触发计算,否则注册一个时间事件,等待 Timer 触发窗口计算。

public class EventTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private EventTimeTrigger() {}public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {return TriggerResult.FIRE;} else {ctx.registerEventTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}}public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;}。。。。。。
}

2、ProcessingTimeTrigger

和 EventTimeTrigger 差不多,区别是采用的处理时间语义,没有 Watermark 相关的判断,直接注册ProcessingTime 事件等待窗口触发计算。

public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private ProcessingTimeTrigger() {}public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) {ctx.registerProcessingTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {return TriggerResult.FIRE;}
}

3、DeltaTrigger

DeltaTrigger 会计算新加入窗口的元素和上一个元素的差值,当这个差值超过给定的阈值时,窗口就会触发计算,元素之间的差值是通过指定的 DeltaFunction 计算出来的。

public class DeltaTrigger<T, W extends Window> extends Trigger<T, W> {private static final long serialVersionUID = 1L;private final DeltaFunction<T> deltaFunction;private final double threshold;private final ValueStateDescriptor<T> stateDesc;private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {this.deltaFunction = deltaFunction;this.threshold = threshold;this.stateDesc = new ValueStateDescriptor("last-element", stateSerializer);}public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {ValueState<T> lastElementState = (ValueState)ctx.getPartitionedState(this.stateDesc);if (lastElementState.value() == null) {lastElementState.update(element);return TriggerResult.CONTINUE;} else if (this.deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {lastElementState.update(element);return TriggerResult.FIRE;} else {return TriggerResult.CONTINUE;}}
}

自定义Trigger

通过子类继承 Trigger 重写相应的方法,即可自定义我们自己的触发器。

举个例子,我们自定义一个和时间不相关的 Trigger,我们等窗口积攒到一定数量的元素再出发计算。如下示例程序:

public static class CounterTrigger extends Trigger<Long, GlobalWindow> {private final int count;public CounterTrigger(int count) {this.count = count;}@Overridepublic TriggerResult onElement(Long element, long timestamp, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {// 通过Flink state来保存窗口内积攒的元素数量ValueState<Integer> countState = triggerContext.getPartitionedState(new ValueStateDescriptor<Integer>("count", Integer.class));int elementCount = Optional.ofNullable(countState.value()).orElse(0) + 1;if (elementCount >= this.count) {countState.update(0);return TriggerResult.FIRE_AND_PURGE;}countState.update(elementCount);return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long timestamp, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic TriggerResult onEventTime(long timestamp, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic void clear(GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {}}

接下来验证一下我们的Trigger是否生效。

我们编写一个Flink作业,数据源每秒生成10个随机数,数据会被统一划分到 GlobalWindow 窗口,然后指定我们自定义的 CounterTrigger,等窗口内积攒了十条数据就出发求和计算。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.addSource(new SourceFunction<Long>() {@Overridepublic void run(SourceContext<Long> sourceContext) throws Exception {while (true) {Threads.sleep(100);sourceContext.collect(ThreadLocalRandom.current().nextLong(100));}}@Overridepublic void cancel() {}}).windowAll(GlobalWindows.create()).trigger(new CounterTrigger(10)).process(new ProcessAllWindowFunction<Long, Object, GlobalWindow>() {@Overridepublic void process(ProcessAllWindowFunction<Long, Object, GlobalWindow>.Context context, Iterable<Long> iterable, Collector<Object> collector) throws Exception {Iterator<Long> iterator = iterable.iterator();long sum = 0L;while (iterator.hasNext()) {sum += iterator.next();}System.err.println(sum);}});environment.execute();
}

运行Flink作业,控制台每隔一段时间就会输出随机数之和。

尾巴

在 Flink 中,Trigger 决定了何时触发窗口的计算和输出结果。通过灵活配置 Trigger 规则,能够精确控制数据处理的时机,适应不同的业务需求和数据特点。

Trigger 能够处理各种复杂的情况,例如在特定条件满足时触发,或者基于时间间隔、数据量等因素进行触发。它为开发者提供了精细的控制手段,确保数据处理的准确性和及时性。 合理运用 Trigger 可以优化 Flink 作业的性能,避免不必要的计算和资源消耗。

http://www.yayakq.cn/news/600017/

相关文章:

  • 网站登录密码怎么取消保存云相册网站怎么做
  • 国内做五金加工的订单网站阿里虚拟机建设网站
  • 专业定制网站公司兼职网站高中生在家可做
  • 个人备案网站内容cosy主题wordpress
  • wordpress++分页优化方案官方网站
  • 做网站开发的想接私活做网站的公司找客户
  • 重庆企业型网站建设优书网打不开了
  • 涡阳在北京做网站的名人长沙 php企业网站系统
  • 网页制作与网站建设宝典 pdf如何将自己做的网站放到网上去
  • 建设工程质量安全管理体系网站比较好的做网站的公司
  • 寻花问柳-专注做一家男人的网站猪网站开发的pc或移动端
  • 网站地图怎么做一键导航展会设计公司简介
  • 制作外贸网站公司网页设计心得体会报告怎么写
  • 浙江网站建设多少钱高度重视网站建设 利用网站 接受监督
  • app商城网站开发网站建设方案ppt
  • 宁波网站建设公司优选亿企邦科技股龙头
  • 上海企业网站建设制网站怎么排版
  • 如何优化百度seo排名可以做关键词优化的免费网站
  • 搜索引擎是软件还是网站WordPress更改角色插件
  • 下沙经济开发区建设局网站做名片赞机器人电脑网站是多少钱
  • 如何破解网站后台账号和密码电商主题wordpress
  • 企业网站管理系统多少钱一年找平台推广
  • 增城网站建设推广苏州电子商务网站开发公司
  • 宣传工作网站建设作用株洲做网站的公司
  • 做视频网站带宽要wordpress主题申请软著吗
  • 沈阳网站哪家做的好我国企业网站的建设情况
  • 快速建站wordpress迁移后插件消失
  • 云南网站优化建设网上做流量对网站有什么影响
  • 外贸网站定制开发设计师互联网
  • 企业网站内容策划网页翻译哪个好用