Read a batch in the example instead of a stream
This commit is contained in:
Родитель
32a67fbc5f
Коммит
a50e0c98c2
|
@ -49,7 +49,7 @@ def convertToByteArray(kafkaKeyStr: String) : Array[Byte] = {
|
|||
|
||||
val connStr = "YOUR_CONNECTION_STRING"
|
||||
val eventhubParameters = EventHubsConf(connStr) // YOUR_SETTING
|
||||
val incomingStream = spark.readStream.format("eventhubs").options(eventhubParameters.toMap).load()
|
||||
val incomingStream = spark.read.format("eventhubs").options(customEventhubParameters.toMap).load()
|
||||
|
||||
val receivedKafkaBodyAndKey =
|
||||
incomingStream
|
||||
|
@ -68,5 +68,5 @@ def convertToByteArray(kafkaKeyStr: String) : Array[Byte] = {
|
|||
.withColumnRenamed("_1","msgBody")
|
||||
.withColumnRenamed("_2","msgKey")
|
||||
|
||||
kafkaBodyAndKey.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()
|
||||
kafkaBodyAndKey.show()
|
||||
```
|
Загрузка…
Ссылка в новой задаче