网站模板哪个好,网站建设费能入长期待摊吗,Wordpress支持分布发布吗,免费咨询律师电话12345DataStream编程模型之状态编程
参考#xff1a; 1.【Flink-Scala】DataStream编程模型之数据源、数据转换、数据输出 2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序 3.【Flink-scala】DataStream编程模型之窗口计算-触发器-驱逐器 4.【Flink-scal…DataStream编程模型之状态编程
参考 1.【Flink-Scala】DataStream编程模型之数据源、数据转换、数据输出 2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序 3.【Flink-scala】DataStream编程模型之窗口计算-触发器-驱逐器 4.【Flink-scala】DataStream编程模型之水位线 5.【Flink-scala】DataStream编程模型之延迟数据处理 文章目录 DataStream编程模型之状态编程前言一、状态编程相关概念1.1Flink中状态始终与特定算子相关联1.2 演示代码1.3 状态编程程序输入输出 前言
流计算分为无状态和有状态两种无状态是观察每个独立事件根据最后一个事件输出结果。比如传感器只关注当前的水位量超出水位量就发生报警事件。 有状态计算则会基于多个事件输出结果。比如计算过去1小时的水位平均值那就是状态的计算。
一、状态编程相关概念
流与流之间的所有关联操作以及流与静态表或动态表之间的关联操作都是有状态计算。
在传统的批处理中数据的划分为块分片去完成的每个task处理一个分片执行完成后把结果聚合起来就是最终的结果这个过程中对状态的需求还是较少的。
但对于流计算而言它对状态有着非常高的要求因为在流系统中输入是一个无限制的流会运行很长一段时间甚至运行几天或者几个月都不会停机。在这个过程当中就需要把状态数据很好地管理起来
1.1Flink中状态始终与特定算子相关联
分为算子状态和键控状态 算子状态的作用范围限定为算子任务,这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态状态对于同一任务而言是共享的。
算子状态不能由相同或不同算子的另一个任务访问
键控状态是根据输入数据流中定义的键来维护和访问的。Flink为每个键值维护一个状态实例并将具有相同键的所有数据分区到同一个算子任务中这个任务会维护和处理这个键对应的状态。当任务处理一条数据时它会自动将状态的访问范围限定为当前数据的键。因此具有相同键的所有数据都会访问相同状态 1.2 演示代码
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collectorcase class StockPrice(stockId:String,timeStamp:Long,price:Double)object StateTest {def main(args: Array[String]): Unit {//设定执行环境val env StreamExecutionEnvironment.getExecutionEnvironment
//设定程序并行度env.setParallelism(1) //创建数据源val source env.socketTextStream(localhost, 9999) //指定针对数据流的转换操作逻辑val stockDataStream source.map(s s.split(,)).map(s StockPrice(s(0).toString, s(1).toLong, s(2).toDouble))val alertStream stockDataStream.keyBy(_.stockId).flatMap(new PriceChangeAlert(10))//新建了一个PriceChangeAlert类 这里重新了flatmap方法// 打印输出alertStream.print() //触发程序执行env.execute(state test)}class PriceChangeAlert(threshold: Double) extends RichFlatMapFunction[StockPrice,(String, Double, Double)]{//定义状态保存上一次的价格lazy val lastPriceState: ValueState[Double] getRuntimeContext.getState(new ValueStateDescriptor[Double](last-price,classOf[Double]))override def flatMap(value: StockPrice, out: Collector[(String, Double, Double)]): Unit {// 获取上次的价格
val lastPrice lastPriceState.value()
//跟最新的价格求差值做比较val diff (value.price-lastPrice).absif( diff threshold)out.collect((value.stockId,lastPrice,value.price))//更新状态lastPriceState.update(value.price)}}
}
代码分析 1.传入参数阈值 2.继承里接受一个stockPrice类型的输入一个String,Double,Double三元组的输出。
String,Double,Doublecase class StockPrice(stockId:String,timeStamp:Long,price:Double)有什么不同呢两个double代表了两个价格分别代表股票ID、上次价格、当前价格。
3.ValueState是Flink中用于保存单个值的状态。这里它被用来保存上一次处理的股票价格。lazy关键字意味着这个状态变量只有在第一次被使用时才会被初始化 4…getState(new ValueStateDescriptor[Double](“last-price”, classOf[Double])): 这个方法尝试从运行时上下文中检索一个名为 “last-price” 的 ValueState如果状态不存在它将根据提供的 ValueStateDescriptor 创建一个新的状态。
ValueStateDescriptor 包含了状态的名称代码中是 “last-price”和状态的值的类型这个代码中是 Double。 5. classOf[Double] 提供了状态的值的类型信息。 6. 重写的flatmap应该能看懂主要是当当前价格超出阈值代码中是10就打印。
1.3 状态编程程序输入输出
输入
stock_4,1602031562148,43.4
stock_1,1602036130952,39.7
stock_4,1602036131741,59.9
stock_2,1602036132184,30.1
stock_3,1602036133154,79.8
stock_0,1602036133919,9.9
stock_1,1602036134385,21.7输出
(stock_4,0.0,43.4)
(stock_1,0.0,39.7)
(stock_4,43.4,59.9)
(stock_2,0.0,30.1)
(stock_3,0.0,79.8)
(stock_1,39.7,21.7)其中根据stock_id分类。
初始状态所有stockId的最近价格都是未定义的即null或None在代码中表现为Double的默认值0.0因为ValueState在初始化时未设置值。
处理第一条记录stock_4,1602031562148,43.4。由于没有先前的价格不会触发输出。最近价格更新为43.4。 处理第二条记录stock_1,1602036130952,39.7。同样没有先前的价格不会触发输出。最近价格更新为39.7。 处理第三条记录stock_4,1602036131741,59.9。价格从43.4变为59.9差异为16.5超过阈值10因此输出(stock_4, 43.4, 59.9)。最近价格更新为59.9。 后续记录对于stock_2、stock_3、stock_0由于没有先前的价格30.1 和79.8直接列出 但是9.9这个价格要注意 stock_0默认值为0这里变为9.9,没有超出阈值10,那么输出就没有。