Flink的Keyed State支持以下数据类型:
ValueState[T]保存单个的值,值的类型为T。ListState[T]保存一个列表,列表里的元素的数据类型为TMapState[K, V]保存Key-Value对ReducingState[T]AggregatingState[I, O]
使用案例: 连续 5s 水位上涨,则告警
声明有状态类型的变量, 这是初始化有 2种方法:
- 在
Open 方法中完成变量的初始化 - 在变量声明时使用
lazy val
package com.ylb.timeimport com.ylb.myCluss.WaterSensorimport org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}import org.apache.flink.api.scala._import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks, KeyedProcessFunction}import org.apache.flink.streaming.api.watermark.Watermarkimport org.apache.flink.util.Collector/*** @author yanglibin* @create 2020-03-07 8:56*/object watermark_5 {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// 设置时间语义env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 处理数据// 连续 5s 水位上涨,则告警// 初始化变量// 初始化 当前水位值// var currentHigh = 0L// 初始化 连续告警值// var alarmTimer = 0L// markDSval markDS: DataStream[WaterSensor] = env.socketTextStream("hadoop-master", 9999)// 将数据封装为 WaterSensor 类.map(line => {val datas = line.split(",")WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble)})// 设置 watermark.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[WaterSensor] {override def checkAndGetNextWatermark(lastElement: WaterSensor, extractedTimestamp: Long): Watermark = {new Watermark(extractedTimestamp)}override def extractTimestamp(element: WaterSensor, previousElementTimestamp: Long): Long = {element.ts * 1000}})// processval processDS: DataStream[String] = markDS.keyBy(_.id).process(new KeyedProcessFunction[String, WaterSensor, String] {// 方法1: 在 Open 方法 中完成变量的初始化// private var currentHigh:ValueState[Long] = _// private var alarmTimer:ValueState[Long] = _//// override def open(parameters: Configuration): Unit = {// currentHigh = getRuntimeContext.getState(// new ValueStateDescriptor[Long]("currentHigh",classOf[Long])// )// alarmTimer = getRuntimeContext.getState(// new ValueStateDescriptor[Long]("alarmTimer",classOf[Long])// )// }// 方法2: 在变量声明时使用 lazy vallazy val currentHigh:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("currentHigh",classOf[Long]))lazy val alarmTimer:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("alarmTimer", classOf[Long]))override def onTimer(timestamp: Long,ctx: KeyedProcessFunction[String, WaterSensor, String]#OnTimerContext,out: Collector[String]): Unit = {out.collect(s"id:${ctx.getCurrentKey}, 当前水位值: ${ctx.timerService().currentWatermark()}, 连接5s水位上涨")}override def processElement(value: WaterSensor,ctx: KeyedProcessFunction[String, WaterSensor, String]#Context,out: Collector[String]): Unit = {if (value.vc > currentHigh.value()) {// 如果传入值 大于 当前值, 则注册if (alarmTimer.value() == 0) {alarmTimer.update(value.ts * 1000 + 5000)ctx.timerService().registerEventTimeTimer(alarmTimer.value())}} else {// 否则删除 重新注册ctx.timerService().deleteEventTimeTimer(alarmTimer.value())alarmTimer.update(value.ts * 1000 + 5000)ctx.timerService().registerEventTimeTimer(alarmTimer.value())}currentHigh.update(value.vc.toLong)}})markDS.print("mark>>")processDS.print("process>>")// 执行env.execute()}}
状态后端
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>1.7.2</version></dependency>
val backend:StateBackend = new RocksDBStateBackend("c://tmp/output/flink")// 设置状态后端env.setStateBackend(backend)// 启用检查点// 1000ms 生成一条checkpoint数据 + 精准一次性处理env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE)
