当前位置: 首页 > news >正文

珠市口网站建设微商城分销

珠市口网站建设,微商城分销,华为网络推广方案,网站建设的原因State state 可以理解为-- 历史计算结果 有状态计算和无状态计算 无状态计算: 不需要考虑历史数据, 相同的输入,得到相同的输出!如:map, 将每个单词记为1, 进来一个hello, 得到(hello,1),再进来一个hello,得到的还是(hello,1) 有状态计算: 需要考虑历史数据, 相同的输入,可…

State

state 可以理解为-- 历史计算结果

有状态计算和无状态计算

  • 无状态计算:
    • 不需要考虑历史数据, 相同的输入,得到相同的输出!
    • 如:map, 将每个单词记为1, 进来一个hello, 得到(hello,1),再进来一个hello,得到的还是(hello,1)
  • 有状态计算:
    • 需要考虑历史数据, 相同的输入,可能会得到不同的输出!
    • 如:sum/reduce/maxBy, 对单词按照key分组聚合,进来一个(hello,1),得到(hello,1), 再进来一个(hello,1), 得到的结果为(hello,2)

注意: Flink默认已经支持了无状态和有状态计算!

例如WordCount代码:已经做好了状态维护, 输入hello,输出(hello,1),再输入hello,输出(hello,2)

有状态计算和无状态计算的应用场景

  • 无状态计算:数据转换,过滤等操作直接使用无状态的map/filter即可
  • 有状态计算:需要做聚合/比较的操作得使用有状态的sum/reduce/maxBy/minBy....

有状态中的状态的分类

有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态(State),然后在新流入数据的基础上不断更新状态。下面的几个场景都需要使用流处理的状态功能:

  • 数据流中的数据有重复,想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
  • 检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。
  • 对一个时间窗口内的数据进行聚合分析,分析一个小时内某项指标的75分位或99分位的数值。

其实窗口本身就是状态,他不是立即出结果,而是将数据都保存起来,达到触发条件才计算。

一个状态更新和获取的流程如下图所示,一个算子子任务接收输入流,获取对应的状态,根据新的计算结果更新状态。一个简单的例子是对一个时间窗口内输入流的某个整数字段求和,那么当算子子任务接收到新元素时,会获取已经存储在状态中的数值,然后将当前输入加到状态上,并将状态数据更新。

以wordcout为例,说明上图的流程

 

状态类型

Flink有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)。

两者的区别:Managed State是由Flink管理的,Flink帮忙存储、恢复和优化,Raw State是开发者自己管理的,需要自己序列化。

具体区别有:

从状态管理的方式上来说,Managed State由Flink Runtime托管,状态是自动存储、自动恢复的,Flink在存储管理和持久化上做了一些优化。当横向伸缩,或者说修改Flink应用的并行度时,状态也能自动重新分布到多个并行实例上。Raw State是用户自定义的状态。

从状态的数据结构上来说,Managed State支持了一系列常见的数据结构,如ValueState、ListState、MapState等。Raw State只支持字节,任何上层数据结构需要序列化为字节数组。使用时,需要用户自己序列化,以非常底层的字节数组形式存储,Flink并不知道存储的是什么样的数据结构。

从具体使用场景来说,绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State。Raw State是在已有算子和Managed State不够用时,用户自定义算子时使用。

对Managed State继续细分,它又有两种类型:Keyed State和Operator State。

Flink状态 - 托管状态- KeyedState ( 在keyBy之后可以使用状态 )- ValueState  (存储一个值)- ListState   (存储多个值)- MapState    (存储key-value) - OperatorState ( 没有keyBy的情况下也可以使用 ) [不用]- 原生状态 (不用)

Keyed State (键控状态)

Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。

需要注意的是键控状态只能在 KeyedStream 上进行使用,可以通过 stream.keyBy(...) 来得到 KeyedStream 。

Flink 提供了以下数据格式来管理和存储键控状态 (Keyed State):

· ValueState:存储单值类型的状态。可以使用 update(T) 进行更新,并通过 T value() 进行检索。

· ListState:存储列表类型的状态。可以使用 add(T) 或 addAll(List) 添加元素;并通过 get() 获得整个列表。

· ReducingState:用于存储经过 ReduceFunction 计算后的结果,使用 add(T) 增加元素。

· AggregatingState:用于存储经过 AggregatingState 计算后的结果,使用 add(IN) 添加元素。

· FoldingState:已被标识为废弃,会在未来版本中移除,官方推荐使用 AggregatingState 代替。

· MapState:维护 Map 类型的状态。

代码演示-Managed State-Keyed State

//nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/state/

案例1:

使用KeyedState中的ValueState获取数据中的最大值(获取每个key的最大值)(实际中直接使用maxBy即可)

也就是我们自己使用KeyState中的ValueState来模拟实现maxBy

代码实现:

package com.bigdata.state;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class _01_KeyedStateDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStream<Tuple2<String, Long>> tupleDS = env.fromElements(Tuple2.of("北京", 1L),Tuple2.of("上海", 2L),Tuple2.of("北京", 6L),Tuple2.of("上海", 8L),Tuple2.of("北京", 3L),Tuple2.of("上海", 4L),Tuple2.of("北京", 7L));//2. source-加载数据tupleDS.keyBy(new KeySelector<Tuple2<String, Long>, String>() {@Overridepublic String getKey(Tuple2<String, Long> value) throws Exception {return value.f0;}}).map(new RichMapFunction<Tuple2<String, Long>, Tuple2<String,Long>>() {// 借助状态这个API实现ValueState<Long> maxValueState= null;@Overridepublic void open(Configuration parameters) throws Exception {// 就是对ValueState初始化ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<Long>("valueState",Long.class);maxValueState = getRuntimeContext().getState(stateDescriptor);}@Overridepublic Tuple2<String, Long> map(Tuple2<String, Long> value) throws Exception {Long val = value.f1;if(maxValueState.value() == null){maxValueState.update(val);}else{if(maxValueState.value() < val){maxValueState.update(val);}}return Tuple2.of(value.f0,maxValueState.value());}}).print();//.maxBy(1).print();//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();}
}

案例2:

如果一个人的体温超过阈值38度,超过3次及以上,则输出: 姓名 [温度1,温度2,温度3]

 

姓名,温度输入                      输出张三,37张三,38张三,39张三,35张三,40张三,41               张三,[39,40,41]张三,40               张三,[39,40,41,40]
package com.bigdata.state;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.planner.expressions.In;
import org.apache.flink.util.Collector;import java.util.ArrayList;public class _02_KeyedStateDemo2 {// 如果一个人的体温超过阈值38度,超过3次及以上,则输出: 姓名 [温度1,温度2,温度3]public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 8889);//3. transformation-数据处理转换   zs,37dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] arr = value.split(",");return Tuple2.of(arr[0],Integer.valueOf(arr[1]));}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}}).flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, ArrayList<Integer>>>() {ValueState<Integer> valueState = null;ListState<Integer> listState = null;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<Integer>("numState",Integer.class);valueState = getRuntimeContext().getState(stateDescriptor);ListStateDescriptor<Integer> listStateDescriptor = new ListStateDescriptor<>("listState", Integer.class);listState = getRuntimeContext().getListState(listStateDescriptor);}@Overridepublic void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, ArrayList<Integer>>> out) throws Exception {Integer tiwen = value.f1;if(tiwen >= 38){valueState.update(valueState.value()==null?1:(valueState.value()+1));listState.add(tiwen);}if(valueState.value()!=null && valueState.value() >= 3){ArrayList<Integer> list = new ArrayList<>();Iterable<Integer> iterable = listState.get();for (Integer tiwenwen : iterable) {list.add(tiwenwen);}out.collect(Tuple2.of(value.f0,list));}}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

 

 

 

http://www.yayakq.cn/news/277706/

相关文章:

  • 做网站哪家公司好百度网盟推广价格
  • 贵阳做网站找哪家好如何制作自己的个人网站
  • 启迪网站开发网站后台进入突然不显示
  • 华为网站建设安徽网新科技集团
  • 不要营业执照的做网站wordpress 代码 换行
  • 做网站的每天打电话咋办08影院 WordPress模板
  • 买程序的网站乐歌股份摄像头
  • 跑胡子网站开发国家认可的赚钱软件
  • 做移动网站快速排名软件app开发制作网站平台
  • 建设银行网站怎么看交易记录做网站备案都需要什么东西
  • 网站开发使用的框架wordpress 建站教程
  • 设计兼职网站wordpress php教程 pdf
  • 怎么做网站海报网页传奇私
  • 商城网站后台模板专业做电脑系统下载网站
  • 建设银行积分兑换商城官方网站公司网站建设的步骤
  • 万维建设网站郑州市重点项目建设办公室网站
  • 鄂尔多斯网站制作专业seo服务商
  • 新建网站的步骤ui设计的流程有哪些步骤
  • 自适应网站教程个人建设网站盈利需要什么材料
  • 阿里个人网站网页建站点
  • 网络平台开展职业培训网站建设wordpress批量扫描弱口令工具
  • 重庆招标建设信息网站如何做一个好网站
  • 网站开发合同适用印花税免费域名注册优惠
  • 公司网站建设说明书表白视频制作软件app
  • 制作好网站怎么导入贵阳哪里做网站
  • 网站建设使用什么软件有哪些怎么开一个微信公众号
  • 网站建设与 维护实训报告范文打开网站要密码
  • wordpress网站怎么建南皮网站建设
  • 行业网站建设公司17173游戏排行榜
  • 佛山正规企业网站排名优化网站优化顺义案例