有状态的
package com.yang.sparkimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}/*** @author :ywb* @date :Created in 2022/2/26 1:44 PM* @description:toDo*/object SparkKafka01_Consumer {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("kafka-saprk")val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))ssc.checkpoint("check")val kafkaPara: Map[String, Object] = Map[String,Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "42.192.229.208:9092",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.GROUP_ID_CONFIG -> "consumer01")val kafkaDSteam: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaPara))val value: DStream[(String,Int)] = kafkaDSteam.map(item => (item.value(),1))//TODO 无状态的操作//val res: DStream[(String, Int)] = value.reduceByKey(_ + _)//res.print()//TODO 有状态的操作 注意:需要设定有状态点的设定 ssc.checkpoint("check")val res: DStream[(String, Int)] = value.updateStateByKey((newDate: Seq[Int], buff: Option[Int]) => {val i: Int = buff.getOrElse(0) + newDate.sumOption(i)})res.print()ssc.start()ssc.awaitTermination()}}
