当前位置: 首页 > news >正文

网站建设甲方给乙方的需求方案北京公司提供注册地址

网站建设甲方给乙方的需求方案,北京公司提供注册地址,免费自学编程,wordpress多说头像场景 自定义Map或者别的算子的时候,有时候需要定义一些类变量,在flink内部高并发的情况下需要正确理解这些变量的行为 代码 package com.pg.function;import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common…

场景

自定义Map或者别的算子的时候,有时候需要定义一些类变量,在flink内部高并发的情况下需要正确理解这些变量的行为

代码

package com.pg.function;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;public class FlinkFunction {//对于自定义函数中的变量,只有内置的状态是完全按照flink内置的 keyBy行为来的//如果是自定义的缓存比如ArrayList 则可能不会按照预期的行为public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStream<String> dataStream = env.fromElements( "b","b","b","c","c","c","d","d","d");dataStream.keyBy(x->{return x;}).map(new MyMap()).print();env.execute();}}class MyMap extends RichMapFunction<String, String> {public ArrayList<String> list= new ArrayList<>();
//     public ValueState<Integer> counter;//存储数据条数
//     public ValueState<String> element;//存储临时数据
//     @Override
//     public void open(Configuration parameters) throws Exception {
//         counter = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("counter", Types.INT));
//         element = getRuntimeContext().getState(new ValueStateDescriptor<>("element", Types.STRING));
//     }@Overridepublic String map(String s) throws Exception {list.add(s);if(list.size()==2){String re = list.toString();list.clear();return re;}else {return "null";}
//        if (counter.value() == null) {
//            counter.update(1);//遇见第一条数据的时候,计数器为1
//        } else {
//            counter.update(counter.value() + 1);
//        }
//        if (element.value() == null) {
//            element.update(s);//element只存储上一次到来的数据
//        }else {
//            element.update(element.value()+s);
//        }
//        if (counter.value() == 2) {
//            String re = element.value();
//            //发出结果之后清楚状态
//            counter.clear();
//            element.clear();
//            return re;
//        }else {
//            return "null";
//        }}
}

分析

keyBy之后,理论上相同key的会在map中用同样的处理逻辑,我们的预期行为是输出:bb,cc,dd
但是用ArrayList实现的逻辑最终输出却是:bb,bc,cc,dd
用ValueState的输出是:bb,cc,dd
这说明了,keBy后的逻辑,ArrayList不会按照预期的行为执行。这是因为在flink中,当多个并发的时候,多个key如果落入同一个线程
则当前线程的valueState是和某一个key绑定的,符合flink预期行为,但是ArrayList以及其它你定义的变量则不做保证, 它是线程级别的局部变量, 这点要注意。

http://www.yayakq.cn/news/852620/

相关文章:

  • 购物网站开发总结报告网站开发 app
  • 建一家网站多少钱家私公司网站建设多少钱
  • 设计外包网站用什么来网站开发好
  • 找做课件的网站h5制作的软件
  • 商务网站建设流程步骤从事网站开发需要什么
  • 网站服务器端口如何做防护国外交互设计网站欣赏
  • 长沙网站搜索排名飞机多少钱一架
  • 常德网站建设网站优化芜湖网站备案咨询电话
  • 请别人做网站有风险吗镜像网站是怎么做的
  • 网站建设ahxkj游戏开发需要学什么编程
  • 如何做医美机构网站观察分析门户网站建设和管理情况
  • 网站开发手机验证码新浪如何上传wordpress
  • 基本型电商网站举例西安网站建设公司哪有
  • 顺德制作网站价格多少oj网站开发
  • 江苏省交通厅门户网站建设管理有做国际网站生意吗
  • 化妆品做的不好的网站免费静态网站托管
  • 工艺品网站源码做网站如何提需求
  • 做公司网站推广鞍山发布
  • 做地方特产的网站邢台网站建设包括哪些
  • 环保网站建设方案为什么有网络却打不开网页
  • 安徽 网站建设建设银行网站能买手机
  • 模型网站推广普通话主题手抄报
  • 手工艺品网站建设策划书网站建设与优化及覆盖率方案
  • 自助网站建设公司电话怎么做网上卖菜网站
  • 做网站下载类似pc蛋蛋的网站建设
  • 均安公司网站建设2022年企业所得税政策
  • 广州工程建设信息网站seo短视频网页入口引流下载
  • 有谁知道网站优化怎么做门户网站建设美丽
  • ps兼职做网站哪里做网站比较号
  • 怎么自己做卡盟网站本地好的app开发公司