https://stackoverflow.com/questions/58766638/how-to-use-foreach-or-foreachbatch-in-pyspark-to-write-to-database
尚未试验过,走读觉得应该可以
注:foreach和foreachBatch是用于处理流数据的
from pyspark.sql import SparkSessionimport pyspark.sql.functions as Ffrom pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampTypefrom pyspark.sql import DataFrameWriter# configuration of target dbdb_target_url = "jdbc:mysql://localhost/database"db_target_properties = {"user":"writer", "password":"1234"}# schemaschema_simple = StructType([StructField("Signal", StringType()),StructField("Value", DoubleType())])# create spark sessionspark = SparkSession.builder.appName("streamer").getOrCreate()# create DataFrame representing the streamdf = spark.readStream \.format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \.option("subscribe", "mytopic") \.load() \.selectExpr("Timestamp", "cast (value as string) as json") \.select("Timestamp", F.from_json("json", schema_simple).alias('json_wrapper')) \.selectExpr("Timestamp", "json_wrapper.Signal", "json_wrapper.Value")df.printSchema()# Do some dummy processingdf2 = df.filter("Value < 11111111111")print("df2: ", df2.isStreaming)def process_row(row):# Process rowrow.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)passquery = df2.writeStream.foreachBatch(process_row).start()
