当前位置: 首页 > news >正文

项城网站建设深圳网警

项城网站建设,深圳网警,wordpress页面在哪里,线上商城怎么开Flink主要有两种基础类型的状态:keyed state 和operator state。 Keyed State Keyed State总是和keys相关,并且只能用于KeyedStream上的函数和操作。 你可以将Keyed State视为是已经被分片或分区的Operator State,每个key都有且仅有一个状态分…

Flink主要有两种基础类型的状态:keyed state 和operator state。
Keyed State
Keyed State总是和keys相关,并且只能用于KeyedStream上的函数和操作。
你可以将Keyed State视为是已经被分片或分区的Operator State,每个key都有且仅有一个状态分区(state-partition)。每个keyed-state逻辑上绑定到一个唯一的<parallel-operator-instance, key>组合上,由于每个key“属于”keyed operator的一个并行实例,所以我们可以简单的认为是<operator,key>。
Keyed State进一步被组织到所谓的Key Groups中。Key Groups是Flink能够重新分配keyed State的原子单元。Key Groups的数量等于定义的最大并行度。在一个keyed operator的并行实例执行期间,它与一个或多个Key Groups配合工作。
Raw and Managed State
Keyed State 和 Operator State 有两种形式: managed和raw。
Managed State表示数据结构由Flink runtime控制,例如内部哈希表或者RocksDB。例如,“ValueState”,“ListState”等等。Flink的runtime层会编码State并将其写入checkpoint中。
Raw State是操作算子保存在它的数据结构中的state。当进行checkpoint时,它只写入字节序列到checkpoint中。Flink并不知道状态的数据结构,并且只能看到raw字节。
所有的数据流函数都可以使用managed state,但是raw state接口只可以在操作算子的实现类中使用。推荐使用managed state(而不是raw state),因为使用managed state,当并行度变化时,Flink可以自动的重新分布状态,也可以做更好的内存管理。
注意 如果你的managed state需要自定义序列化逻辑,请参见managed state的自定义序列化以确保未来的兼容性。Flink默认的序列化不需要特殊处理。
Managed Keyed State
managed keyed state接口提供了对当前输入元素的key的不同类型的状态的访问。这意味着这种类型的状态只能在KeyedStream中使用,它可以通过stream.keyBy(…)创建。
现在,我们首先看下不同类型的状态,然后展示如何在程序中使用它们。可用的状态原语是:
ValueState:它会保存一个可以被更新和查询的值(受限于上面提到的输入元素的key,算子看到的每个key可能仅一个值)。可使用update(T) 和 T value() 更新和查询值。
ListState: 它保存了一个元素列表。你可以添加元素和检索Iterable来获取所有当前存储的元素。添加元素使用add(T)或者addAll(List)方法,获取Iterable使用Iterable get()方法。也可以使用update(List)覆盖已有的list。

ReducingState: 它保存了一个聚合了所有添加到这个状态的值的结果。接口和ListState相同,但是使用add(T)方法本质是使用指定ReduceFunction的聚合行为。

AggregatingState<IN, OUT>: 它保存了一个聚合了所有添加到这个状态的值的结果。与ReducingState有些不同,聚合类型可能不同于添加到状态的元素的类型。接口和ListState相同,但是使用add(IN)添加的元素本质是通过使用指定的AggregateFunction进行聚合。

FoldingState<T, ACC>:它保存了一个聚合了所有添加到这个状态的值的结果。与ReducingState有些不同,聚合类型可能不同于添加到状态的元素的类型。接口和ListState相同,但是使用add(IN)添加的元素本质是通过使用指定的FoldFunction折叠进行聚合。

MapState<UK, UV>:它保存了一个映射列表。你可以将key-value对放入状态中,并通过Iterable检索所有当前存储的映射关系。使用put(UK, UV) 或 putAll(Map<UK, UV>)添加映射关系。使用get(UK)获取key相关的value。分别使用entries(), keys() 和 values() 获取映射关系,key和value的视图。

所有类型的状态都有一个clear()方法,用以清除当前活跃key(即输入元素的key)的状态。

注意 FoldingState 和 FoldingStateDescriptor在Flink1.4中已经被废弃,并且可能在将来完全删除。请使用AggregatingState和 AggregatingStateDescriptor替代。

首先需要记住的是这些状态对象只能用来与状态进行交互。状态不一定存储在内存中,但是可能存储在磁盘或者其他地方。第二个需要记住的是,从状态获取的值依赖于输入元素的key。因此如果包含不同的key,那么在你的用户函数中的一个调用获得的值和另一个调用获得值可能不同。

为了获得状态句柄,必须创建一个StateDescriptor。它维护了状态的名称(稍后将看到,你可以创建多个状态,因此他们必须有唯一的名称,以便你可以引用它们),状态维护的值的类型,和可用户定义function,例如ReduceFunction。根据你想要查询的状态的类型,你可以创建ValueStateDescriptor,ListStateDescriptor,ReducingStateDescriptor,FoldingStateDescriptor或MapStateDescriptor。

使用RuntimeContext访问状态,因此它只有在richfunction中才可以使用。rich function的相关信息请看这里,但是我们也很快会看到一个示例。RichFunction中,RuntimeContext有这些访问状态的方法:
ValueState getState(ValueStateDescriptor)
ReducingState getReducingState(ReducingStateDescriptor)
ListState getListState(ListStateDescriptor)
AggregatingState<IN, OUT> getAggregatingState(AggregatingState<IN, OUT>)
FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

/*** The ValueState handle. The first field is the count, the second field a running sum.*/
private transient ValueState<Tuple2<Long, Long>> sum;@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {// access the state valueTuple2<Long, Long> currentSum = sum.value();// update the countcurrentSum.f0 += 1;// add the second field of the input valuecurrentSum.f1 += input.f1;// update the statesum.update(currentSum);// if the count reaches 2, emit the average and clear the stateif (currentSum.f0 >= 2) {out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));sum.clear();}
}@Override
public void open(Configuration config) {ValueStateDescriptor<Tuple2<Long, Long>> descriptor =new ValueStateDescriptor<>("average", // the state nameTypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type informationTuple2.of(0L, 0L)); // default value of the state, if nothing was setsum = getRuntimeContext().getState(descriptor);
}

}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(0)
.flatMap(new CountWindowAverage())
.print();

// the printed output will be (1,4) and (1,5)

State Time-To-Live(TTL)
任何类型的keyed state都可以使用TTL。如果配置了TTL,一个状态值超时了,储存的值就会在恰当的时候被删除,后面会说到。

所有状态集合类型都支持 per-entry TTL。意味着list的元素和map的entry可以单独设置超时。

TTL的使用也很简单,可以参考如下代码:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();

ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>(“text state”, String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

newBuilder方法是必须的。
Update类型的配置有以下两种:
StateTtlConfig.UpdateType.OnCreateAndWrite :创建和写入

StateTtlConfig.UpdateType.OnReadAndWrite: 也有读取功能

可视,也即是在超时之后删除之前,数据是否还能被读取,可以配置的:

StateTtlConfig.StateVisibility.NeverReturnExpired – 超时元素绝不返回
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp – 如果数据没被删除可以返回。
NeverReturnExpired该参数一旦配置,超时的状态可以视为不存在了,即使还没有被删除。该选项是在一些TTL超时要求严格的场景还是很靠谱的,比如处理隐私敏感的数据。

小提示:
状态后端(statebackend)会给用户的每个value存储一个时间戳,这就意味着会增加存储成本。堆状态后端(heap state backend)会在内存里存储一个额外的java对象(该对象带有指向用户状态对象的引用)和一个原始long值。RocksDB状态后端会为每个存储的值(list entry或者map entry)增加8byte。
当前TTL仅仅支持处理时间。
假如想用没有用TTL的savepoint,去恢复当前指定了TTL的应用程序,会报异常。
带TTL的map状态只有在序列化器支持处理null值的时候支持用户的null值。如果序列化器不支持null值,可以使用nullableSerializer取包裹null值,当然会带来额外的存储开销。
超时状态清除
当前的情况下,超时值状态仅仅在读取的时候删除,例如调用ValueState.value().

注意:这意味着如果超时状态没有被读取的话,就不会被删除,然后状态会一直增大.期待将来会有改变吧.

另外,可以配置在完成全量状态快照(full state snapshot)的时候删除状态,这也可以减少状态大小。在当前的实现机制下本地状态不会被清除,但是从之前快照里恢复的过程中不会保护已经删除的超时快照。配置方法如下:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot()
.build();
该配置不适合增量的快照机制,也即是状态后端不能是RocksDB。

http://www.yayakq.cn/news/88116/

相关文章:

  • 最优惠的赣州网站建设上海网站营销公司
  • 为企业做网站电子商务网站开发流程包括
  • 网站群建设项目招标公告商企通三合一网站建设
  • 工厂型企业做网站河北省建设银行网站
  • 中铁建设门户网站平面设计教学视频
  • 电影网站怎么做seo做电影网站有什么流媒体好
  • 个人网站开发视频微信怎样开公众号
  • 网站备案与icp备案WordPress Grace8.2主题
  • 用wordpress建站案例网站开发团队需配备什么岗位
  • 嘉兴的信息公司网站苏州seo网站推广哪家好
  • 做鲜花的网站有哪些家政网站设计
  • 4.1网站建设的基本步骤网络热词大全
  • 常熟住房和城乡建设局网站首页简单的企业网页模板
  • 自学做网站要学什么深圳4a广告公司有哪些
  • 淘宝基地网站怎么做设计协作平台
  • 北京房产网最新楼盘常德seo优化
  • 岳阳市 网站建设商城网站 html模板
  • 为什么有的网站只有版权没有备案中国移动的网站模板
  • 昆山市网站建设宝安中心图片
  • 怎样做网站收广告费房地产市场信息系统网站
  • 北京网站建站公百度seo软件首选帝搜软件
  • 学校网站建设教程wordpress 权限设置
  • 网站建设维护是啥意思星座 网站 建设
  • 中国建设银行官方网站纪念钞预约重庆企业网站制作外包
  • 网站推广方案模板后缀是.cc的网站
  • 网页类网站长沙优化科技有限公司地址
  • 购物网站 缓存南京网站建设包括哪些
  • 晋城市 制作网站中国建设信用卡积分兑换网站
  • 钢格板保定网站建设创建网站容易吗
  • 做网站需要ftp学前心理学课程建设网站