Simple Example
This simple example is word count
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import explodefrom pyspark.sql.functions import splitspark = SparkSession.builder.appName("StructuredNetworkWordCount")\ .getOrCreate()# Create DataFrame with input lines from connection to localhost:9999lines = spark.readStream.format("socket")\ .option("host", "localhost").option("port", 9999).load()# Split the lines into wordswords = lines.select( explode( split(lines.value, " ") ).alias("word"))# Generate running word countwordCounts = words.groupBy("word").count()
Then, use a write stream to output
# Start running the query that prints the running counts to the consolequery = wordCounts.writeStream.outputMode("complete").format("console").start()query.awaitTermination()
Every data item that is arriving on the stream is like a new row being appended to the Input Table. For example, input
w1 w1 w2w2 w1
to the above program will get output as w1: 3, w2: 2
finally
Note the Result table is real, the Input unbounded table does not exist, the data is discarded after it is used to update
DataFrame API
...