A library for reading data from MQTT Servers using Spark SQL Streaming ( or Structured streaming.).
Using SBT:
libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.0.0"
Using Maven:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-sql-streaming-mqtt_2.11</artifactId>
<version>2.0.0</version>
</dependency>
This library can also be added to Spark jobs launched through spark-shell
or spark-submit
by using the --packages
command line option.
For example, to include it when starting the spark shell:
$ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.0.0
Unlike using --jars
, using --packages
ensures that this library and its dependencies will be added to the classpath.
The --packages
argument can also be used with bin/spark-submit
.
This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
A SQL Stream can be created with data streams received through MQTT Server using,
sqlContext.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", "mytopic")
.load("tcp://localhost:1883")
Setting values for option localStorage
and clientId
helps in recovering in case of a restart, by restoring the state where it left off before the shutdown.
sqlContext.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", "mytopic")
.option("localStorage", "/path/to/localdir")
.option("clientId", "some-client-id")
.load("tcp://localhost:1883")
An example, for scala API to count words from incoming message stream.
// Create DataFrame representing the stream of input lines from connection to mqtt server
val lines = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", topic)
.load(brokerUrl).as[(String, Timestamp)]
// Split the lines into words
val words = lines.map(_._1).flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
Please see MQTTStreamWordCount.scala
for full example.
An example, for Java API to count words from incoming message stream.
// Create DataFrame representing the stream of input lines from connection to mqtt server.
Dataset<String> lines = spark
.readStream()
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", topic)
.load(brokerUrl).select("value").as(Encoders.STRING());
// Split the lines into words
Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
}, Encoders.STRING());
// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.start();
query.awaitTermination();
Please see JavaMQTTStreamWordCount.java
for full example.