Backports from 0.9 branch (scala 2.12.1 and code style)
* Upgrade Scala from 2.12.0 to 2.12.1 * Remove dead code in IoTHubPartition * Code style
This commit is contained in:
Родитель
fc2e73a514
Коммит
6508964026
|
@ -3,6 +3,7 @@
|
|||
tools/devices-simulator/credentials.js
|
||||
*.crt
|
||||
.ensime
|
||||
_dev
|
||||
|
||||
### MacOS ###
|
||||
.DS_Store
|
||||
|
|
10
.travis.yml
10
.travis.yml
|
@ -2,14 +2,16 @@ jdk: oraclejdk8
|
|||
language: scala
|
||||
scala:
|
||||
- 2.11.8
|
||||
- 2.12.0
|
||||
- 2.12.1
|
||||
cache:
|
||||
directories:
|
||||
- "$HOME/.ivy2"
|
||||
- "$HOME/.sbt"
|
||||
- "$HOME/.m2"
|
||||
notifications:
|
||||
slack:
|
||||
secure: S6pcmclrj9vaqHOFMrjgYkF6wXrYF6nB5joYY0rqAwsmTLf7crXRVKZ8txlatpxMHc20Rbw8RQDM6tTka9wwBkHZZfErrcPsS84d5MU9siEkIY42/bAQwuYhxkcgilttgFmSwzLodE72giC/VMhIYCSOyOXIxuR0VtBiPD9Inm9QZ35dZDx3P3nbnaOC4fk+BjdbrX1LB8YL9z5Gy/9TqI90w0FV85XMef75EnSgpqeMD/GMB5hIg+arWVnC2S6hZ91PPCcxCTKBYDjwqUac8mFW/sMFT/yrb2c0NE6ZQqa3dlx/XFyC1X6+7DjJli2Y8OU+FPjY1tQC8JxgVFTbddIgCdUM/5be4uHN/KNs/yF7w1g06ZXK4jhJxxpL4zWINlqDrDmLaqhAtPQkc2CqL3g8MCwYxBbxZY4aFyPfZD7YLdQXDzJZNcfXn9RQQh5y+/zexbGc1zZ/XUo5bK3VbElSs+o2ErI+Sze0FaiK8fW+QeitBdGvjMY7YVKi0Zzf5Dxx1wwxiHR1PQ1r0hA8YZQxwwdpa5lWLFlSVu2w+upPtXqfINMeFktQPbOs1JWIvUvLV0A38dS6R/DsM/W1a3OEVbHQ0Z6OV1nffDnGYPLUl5kRDPFuYYugmCpQHW73lqJdiM0O+Ote4eOQniL1rcajtt+V5cn1/JRWzdJ4PH0=
|
||||
before_install:
|
||||
- if [ "$TRAVIS_PULL_REQUEST" = "false" ]; then openssl aes-256-cbc -K $encrypted_cbef0ff679f7_key -iv $encrypted_cbef0ff679f7_iv -in devices.json.enc -out src/test/resources/devices.json -d ; fi
|
||||
notifications:
|
||||
slack:
|
||||
rooms:
|
||||
- secure: VW5Pw7NsQw8Uv9Zncn7TEy6vEoudygJDTdyMBjcfpsbibHVjOeDMMw/PKGrr2Y/W6b26LoIaz7uENynwZ9IXML+edIsJB4Kek7cS8YUQn4sT2UAobo8gSgia5aOVd39O48fUdikzJyscVPhyAN0eLiyUkFdDSfuZTonk1qDJEAeW6lbkepPyczoPtXio+K6jPoU+43eW+qA5ZYup+e5Hgo0TSOCaaxF0gDgD+duBVcYEXKnPnCxGhVi6DE8rzBBmcZenUHWFXoZLWVDILMcbjTDcTbx2UvQQ6EMHhyTMkK2v8D0qw3h5H9RtvGCx6OUbOi6ko8giYuC/tADm5TSksTHJOzTyGVcnw214smNBa4kPEwA0f0ayESRXsxwylI7P39IKMyDgGeDkfZdoPr2uaMl0rB98ubUlbllm7o+QtjUegsG2Zf91wL1wxTTXyDldgtot+5IEHyZcFke8zyJooj1Pjk4vasEBDPtDqiqiLOBc8a8tF6TccNb/dfCK3xh2sLVlVU95hrc6tw/ZJk9gZ0U9dDwPYixPTO8X8530UAo4E9s3epMzkdMahcAWFPGpd8tGO3IqegZps4d2p75aoRodz8JO54HRLHX3Gz+EsNDfgHeVqFcA6+rW+P04R2p8fJD1sWpDbq1xT4nQQ6L4TAt8UObkVET22T0BNaOdwqo=
|
||||
- secure: evAZ40O+fXFmKAmfqnaLjLhELHTXL/aKOlMfDlStfVM35V+JMCivvQt/V+8ObmmkHBI6y9XoeILZzDD1cfCPjUzPAmHpnL6cT9kavu+OhS8BnyGvOcp1BysHIH1DHJ/JpYAKSXkMR2PoGakUp9kvETO+5s5ZKp909vtZqG4oShyqNz8jO6BYKwJrcU/ehOIRG2uHza2KtpRREFlwXu+Ca20+2oV74X0GvaqyDgVjEwxwLRc4KUg+/NPy4YWusBg/6Q/TV1HIAktlROA7ZOb9+DUshWZkOvd+F6JVaCzQDlSKCDvZpuddwUi+YveYlKqb8s7R+zzhxc6KPsXMrJ79k8E1uiZiEjRV3HTLxgZnKBYoKuglgKS1R1nQSsiJfVFQJn+dbBO0HxT9f3hy8nI3EpfTpy9DAO+wOnHycRBp1TCIPBEOCwIfBo2UvxqVAsBfIC3WmOF4VhlZxr/U+QCwolvWNjkPRrcKh4CzQEj2VIoAfyJ0vKujytB2gnv6Sp0mbbcK1jEJdNShc/wPdIXwn7oTT/2RnCdiKPivsF66wKcDmL7U/+zvVqRsw2wZrsw97n+j9lvj4YYZUVVXxtEQuXZ4sru5FM4a08lnS+cY5BfomvMkAT7UXKeV++sF7gBUq7DGQ4Xs6aLMZAAFSbPSw5mT349e+ipQyG8jX/iBllk=
|
||||
|
|
|
@ -49,7 +49,7 @@ val start = java.time.Instant.now()
|
|||
val withCheckpoints = false
|
||||
|
||||
IoTHub().source(start, withCheckpoints)
|
||||
.map(m => jsonParser.readValue(m.contentAsString, classOf[Temperature]))
|
||||
.map(m ⇒ jsonParser.readValue(m.contentAsString, classOf[Temperature]))
|
||||
.filter(_.value > 100)
|
||||
.to(console)
|
||||
.run()
|
||||
|
|
14
README.md
14
README.md
|
@ -20,7 +20,7 @@ the temperature value:
|
|||
|
||||
```scala
|
||||
IoTHub().source()
|
||||
.map(m => parse(m.contentAsString).extract[Temperature])
|
||||
.map(m ⇒ parse(m.contentAsString).extract[Temperature])
|
||||
.filter(_.value > 100)
|
||||
.to(console)
|
||||
.run()
|
||||
|
@ -68,7 +68,7 @@ case class KafkaProducer(bootstrapServer: String)(implicit val system: ActorSyst
|
|||
val kafkaProducer = KafkaProducer(bootstrapServer)
|
||||
|
||||
IoTHub().source()
|
||||
.map(m => parse(m.contentAsString).extract[Temperature])
|
||||
.map(m ⇒ parse(m.contentAsString).extract[Temperature])
|
||||
.filter(_.value > 100)
|
||||
.runWith(kafkaProducer.getSink())
|
||||
```
|
||||
|
@ -88,7 +88,7 @@ val p1 = 0
|
|||
val p2 = 3
|
||||
|
||||
IoTHub().source(PartitionList(Seq(p1, p2)))
|
||||
.map(m => parse(m.contentAsString).extract[Temperature])
|
||||
.map(m ⇒ parse(m.contentAsString).extract[Temperature])
|
||||
.filter(_.value > 100)
|
||||
.to(console)
|
||||
.run()
|
||||
|
@ -103,7 +103,7 @@ It's possible to start the stream from a given date and time too:
|
|||
val start = java.time.Instant.now()
|
||||
|
||||
IoTHub().source(start)
|
||||
.map(m => parse(m.contentAsString).extract[Temperature])
|
||||
.map(m ⇒ parse(m.contentAsString).extract[Temperature])
|
||||
.filter(_.value > 100)
|
||||
.to(console)
|
||||
.run()
|
||||
|
@ -180,7 +180,7 @@ val start = java.time.Instant.now()
|
|||
val withCheckpoints = false
|
||||
|
||||
IoTHub().source(start, withCheckpoints)
|
||||
.map(m => parse(m.contentAsString).extract[Temperature])
|
||||
.map(m ⇒ parse(m.contentAsString).extract[Temperature])
|
||||
.filter(_.value > 100)
|
||||
.to(console)
|
||||
.run()
|
||||
|
@ -193,7 +193,7 @@ your `build.sbt` file:
|
|||
|
||||
```scala
|
||||
libraryDependencies ++= {
|
||||
val iothubReactV = "0.8.1"
|
||||
val iothubReactV = "0.8.0"
|
||||
|
||||
Seq(
|
||||
"com.microsoft.azure.iot" %% "iothub-react" % iothubReactV
|
||||
|
@ -207,7 +207,7 @@ or this dependency in `pom.xml` file if working with Maven:
|
|||
<dependency>
|
||||
<groupId>com.microsoft.azure.iot</groupId>
|
||||
<artifactId>iothub-react_2.12</artifactId>
|
||||
<version>0.8.1</version>
|
||||
<version>0.8.0</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
name := "iothub-react"
|
||||
organization := "com.microsoft.azure.iot"
|
||||
|
||||
version := "0.8.1"
|
||||
version := "0.8.1" // Latest version released to Maven Central: 0.8.0
|
||||
//version := "0.8.1-DEV.170309a"
|
||||
|
||||
scalaVersion := "2.12.1"
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
if [ "$1" == "PR" -o "$1" == "pr" ]; then
|
||||
echo "Skipping tests requiring encrypted secrets."
|
||||
export TRAVIS_PULL_REQUEST="true"
|
||||
fi
|
||||
|
||||
travis lint -x && \
|
||||
sbt +clean && \
|
||||
sbt +compile && \
|
||||
sbt +package && \
|
||||
sbt +test
|
||||
|
||||
rm -f *.crt
|
|
@ -1,3 +1,3 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
scalaVersion := "2.12.0"
|
||||
scalaVersion := "2.12.1"
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
|
||||
<groupId>com.microsoft.azure.iot</groupId>
|
||||
<artifactId>iothub-react-demo</artifactId>
|
||||
<version>0.8.1</version>
|
||||
<version>0.8.0</version>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
|
|
|
@ -1 +1 @@
|
|||
sbt.version = 0.13.12
|
||||
sbt.version=0.13.13
|
||||
|
|
|
@ -11,7 +11,6 @@ import akka.stream.Materializer;
|
|||
*/
|
||||
public class ReactiveStreamingApp
|
||||
{
|
||||
|
||||
private static ActorSystem system = ActorSystem.create("Demo");
|
||||
|
||||
protected final static Materializer streamMaterializer = ActorMaterializer.create(system);
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
scalaVersion := "2.12.0"
|
||||
scalaVersion := "2.12.1"
|
||||
|
|
|
@ -1 +1 @@
|
|||
sbt.version = 0.13.12
|
||||
sbt.version=0.13.13
|
||||
|
|
|
@ -43,7 +43,7 @@ object Demo extends App with Deserialize {
|
|||
/*
|
||||
// Run the two workflows in parallel
|
||||
RunnableGraph.fromGraph(GraphDSL.create() {
|
||||
implicit b =>
|
||||
implicit b ⇒
|
||||
import GraphDSL.Implicits._
|
||||
|
||||
val shape = b.add(Broadcast[Temperature](2))
|
||||
|
|
|
@ -105,7 +105,7 @@ private class MessageFromDeviceSource() extends GraphStage[SourceShape[MessageFr
|
|||
// Define the (sole) output port of this stage
|
||||
private[this] val out: Outlet[MessageFromDevice] = Outlet("MessageFromDeviceSource")
|
||||
|
||||
// Define the shape of this stage => SourceShape with the port defined above
|
||||
// Define the shape of this stage ⇒ SourceShape with the port defined above
|
||||
override val shape: SourceShape[MessageFromDevice] = SourceShape(out)
|
||||
|
||||
// All state MUST be inside the GraphStageLogic, never inside the enclosing
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
package com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib
|
||||
|
||||
final case class Auth(username: String, password: String)
|
||||
case class Auth(username: String, password: String)
|
||||
|
|
|
@ -11,25 +11,67 @@ import scala.concurrent.duration._
|
|||
import scala.language.postfixOps
|
||||
import scala.util.Try
|
||||
|
||||
/** Checkpointing configuration interface
|
||||
*/
|
||||
trait ICPConfiguration {
|
||||
val isEnabled : Boolean
|
||||
val storageNamespace : String
|
||||
val checkpointBackendType : String
|
||||
val checkpointFrequency : FiniteDuration
|
||||
val checkpointRWTimeout : FiniteDuration
|
||||
val checkpointCountThreshold : Int
|
||||
val checkpointTimeThreshold : FiniteDuration
|
||||
val azureBlobEmulator : Boolean
|
||||
val azureBlobConnectionString : String
|
||||
val azureBlobLeaseDuration : FiniteDuration
|
||||
val cassandraCluster : String
|
||||
|
||||
/** Whether checkpointing is enabled
|
||||
*/
|
||||
val isEnabled: Boolean
|
||||
|
||||
/** Namespace where the table with checkpoint data is stored (e.g. Cassandra keyspace)
|
||||
*/
|
||||
val storageNamespace: String
|
||||
|
||||
/** Type of storage, the value is not case sensitive
|
||||
*/
|
||||
val checkpointBackendType: String
|
||||
|
||||
/** How often checkpoint data is written to the storage
|
||||
*/
|
||||
val checkpointFrequency: FiniteDuration
|
||||
|
||||
/** Checkpointing operations timeout
|
||||
*/
|
||||
val checkpointRWTimeout: FiniteDuration
|
||||
|
||||
/** How many messages to replay after a restart, for each IoT hub partition
|
||||
*/
|
||||
val checkpointCountThreshold: Int
|
||||
|
||||
/** Store a position if its value is older than this amount of time, rounded to seconds
|
||||
*/
|
||||
val checkpointTimeThreshold: FiniteDuration
|
||||
|
||||
/** Whether to use the Azure Storage Emulator when using Azure blob backend
|
||||
*/
|
||||
val azureBlobEmulator: Boolean
|
||||
|
||||
/** Azure blob connection string
|
||||
*/
|
||||
val azureBlobConnectionString: String
|
||||
|
||||
/** Azure blob lease duration (between 15s and 60s by Azure docs)
|
||||
*/
|
||||
val azureBlobLeaseDuration: FiniteDuration
|
||||
|
||||
/** Cassandra cluster address
|
||||
* TODO: support list
|
||||
*/
|
||||
val cassandraCluster: String
|
||||
|
||||
/** Cassandra replication factor, value required to open a connection
|
||||
*/
|
||||
val cassandraReplicationFactor: Int
|
||||
val cassandraAuth : Option[Auth]
|
||||
|
||||
/** Cassandra authentication credentials
|
||||
*/
|
||||
val cassandraAuth: Option[Auth]
|
||||
}
|
||||
|
||||
/** Hold IoT Hub stream checkpointing configuration settings
|
||||
*/
|
||||
private[iothubreact] final class CPConfiguration(implicit val conf: Config = ConfigFactory.load) extends ICPConfiguration {
|
||||
private[iothubreact] class CPConfiguration(implicit conf: Config = ConfigFactory.load) extends ICPConfiguration {
|
||||
|
||||
// TODO: Allow to use multiple configurations, e.g. while processing multiple streams
|
||||
// a client will need a dedicated checkpoint container for each stream
|
||||
|
@ -118,7 +160,7 @@ private[iothubreact] final class CPConfiguration(implicit val conf: Config = Con
|
|||
// Azure blob connection string
|
||||
lazy val azureBlobConnectionString: String = getAzureBlobConnectionString
|
||||
|
||||
// Azure blob lease duration (15s and 60s by Azure docs)
|
||||
// Azure blob lease duration (between 15s and 60s by Azure docs)
|
||||
lazy val azureBlobLeaseDuration: FiniteDuration = getDuration(
|
||||
confPath + "storage.azureblob.lease",
|
||||
15 seconds,
|
||||
|
@ -178,4 +220,4 @@ private[iothubreact] final class CPConfiguration(implicit val conf: Config = Con
|
|||
else
|
||||
default
|
||||
}
|
||||
}
|
||||
}
|
|
@ -26,8 +26,8 @@ private[iothubreact] object CheckpointActorSystem {
|
|||
val actorPath = "CheckpointService" + partition
|
||||
|
||||
localRegistry get actorPath match {
|
||||
case Some(actorRef) => actorRef
|
||||
case None => {
|
||||
case Some(actorRef) ⇒ actorRef
|
||||
case None ⇒ {
|
||||
val actorRef = actorSystem.actorOf(Props(new CheckpointService(partition)), actorPath)
|
||||
localRegistry += Tuple2(actorPath, actorRef)
|
||||
actorRef
|
||||
|
|
|
@ -67,7 +67,7 @@ private[iothubreact] class CheckpointService(partition: Int)(implicit config: IC
|
|||
queuedOffsets = 0
|
||||
}
|
||||
catch {
|
||||
case e: Exception => {
|
||||
case e: Exception ⇒ {
|
||||
log.error(e, e.getMessage)
|
||||
context.become(notReady)
|
||||
}
|
||||
|
@ -121,7 +121,7 @@ private[iothubreact] class CheckpointService(partition: Int)(implicit config: IC
|
|||
log.debug(s"Partition=${partition}, checkpoint queue is empty [count ${queuedOffsets}, current offset=${currentOffset}]")
|
||||
}
|
||||
} catch {
|
||||
case e: Exception => log.error(e, e.getMessage)
|
||||
case e: Exception ⇒ log.error(e, e.getMessage)
|
||||
} finally {
|
||||
context.become(ready)
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ import scala.language.{implicitConversions, postfixOps}
|
|||
|
||||
/** Storage logic to write checkpoints to Azure blobs
|
||||
*/
|
||||
private[iothubreact] class AzureBlob(implicit val config: ICPConfiguration) extends CheckpointBackend with Logger {
|
||||
private[iothubreact] class AzureBlob(implicit config: ICPConfiguration) extends CheckpointBackend with Logger {
|
||||
|
||||
// Set the account to point either to Azure or the emulator
|
||||
val account: CloudStorageAccount = if (config.azureBlobEmulator)
|
||||
|
|
|
@ -11,7 +11,7 @@ import org.json4s.JsonAST
|
|||
|
||||
/** Storage logic to write checkpoints to a Cassandra table
|
||||
*/
|
||||
private[iothubreact] class CassandraTable(implicit val config: ICPConfiguration) extends CheckpointBackend with Logger {
|
||||
private[iothubreact] class CassandraTable(implicit config: ICPConfiguration) extends CheckpointBackend with Logger {
|
||||
|
||||
val schema = new CheckpointsTableSchema(checkpointNamespace, "checkpoints")
|
||||
val connection = Connection(config.cassandraCluster, config.cassandraReplicationFactor, config.cassandraAuth, schema)
|
||||
|
|
|
@ -17,10 +17,10 @@ private[iothubreact] object ColumnType extends Enumeration {
|
|||
* @return Type as string
|
||||
*/
|
||||
def toString(columnType: ColumnType): String = columnType match {
|
||||
case ColumnType.String => "text"
|
||||
case ColumnType.Timestamp => "timestamp"
|
||||
case ColumnType.Double => "double"
|
||||
case ColumnType.Int => "int"
|
||||
case ColumnType.String ⇒ "text"
|
||||
case ColumnType.Timestamp ⇒ "timestamp"
|
||||
case ColumnType.Double ⇒ "double"
|
||||
case ColumnType.Int ⇒ "int"
|
||||
|
||||
case _ ⇒ throw new RuntimeException(s"Missing mapping for Cassandra type ${columnType}")
|
||||
}
|
||||
|
@ -32,10 +32,10 @@ private[iothubreact] object ColumnType extends Enumeration {
|
|||
* @return Column type
|
||||
*/
|
||||
def fromName(typeAsString: String): ColumnType = typeAsString match {
|
||||
case "text" => ColumnType.String
|
||||
case "timestamp" => ColumnType.Timestamp
|
||||
case "double" => ColumnType.Double
|
||||
case "int" => ColumnType.Int
|
||||
case "text" ⇒ ColumnType.String
|
||||
case "timestamp" ⇒ ColumnType.Timestamp
|
||||
case "double" ⇒ ColumnType.Double
|
||||
case "int" ⇒ ColumnType.Int
|
||||
|
||||
case _ ⇒ throw new IllegalArgumentException(s"Unknown Cassandra column type '${typeAsString}'")
|
||||
}
|
||||
|
|
|
@ -16,15 +16,15 @@ private[iothubreact] case class Connection(
|
|||
auth: Option[Auth],
|
||||
table: TableSchema) {
|
||||
|
||||
private lazy val hostPort = extractHostPort()
|
||||
private lazy val hostPort = extractHostPort()
|
||||
private lazy val cluster = {
|
||||
val builder = Cluster.builder().addContactPoint(hostPort._1).withPort(hostPort._2)
|
||||
auth map {
|
||||
creds ⇒ builder.withCredentials(creds.username, creds.password)
|
||||
} getOrElse (builder) build()
|
||||
creds ⇒ builder.withCredentials(creds.username, creds.password)
|
||||
} getOrElse (builder) build()
|
||||
}
|
||||
|
||||
implicit lazy val session = cluster.connect()
|
||||
implicit lazy val session = cluster.connect()
|
||||
|
||||
/** Create the key space if not present
|
||||
*/
|
||||
|
@ -71,7 +71,7 @@ private[iothubreact] case class Connection(
|
|||
* @param columns Columns
|
||||
*/
|
||||
private[this] def createT(tableName: String, columns: Seq[Column]): Unit = {
|
||||
val columnsSql = columns.foldLeft("")((b, a) => s"$b\n${a.name} ${ColumnType.toString(a.`type`)},")
|
||||
val columnsSql = columns.foldLeft("")((b, a) ⇒ s"$b\n${a.name} ${ColumnType.toString(a.`type`)},")
|
||||
val indexesSql = columns.filter(_.index).map(_.name).mkString("PRIMARY KEY(", ", ", ")")
|
||||
val createTable = s"CREATE TABLE IF NOT EXISTS ${table.keyspaceCQL}.$tableName($columnsSql $indexesSql)"
|
||||
session.execute(createTable)
|
||||
|
|
|
@ -63,33 +63,6 @@ private[iothubreact] case class IoTHubPartition(val partition: Int)(implicit cpc
|
|||
withCheckpoints = withCheckpoints && cpconfig.isEnabled)
|
||||
}
|
||||
|
||||
/** Stream returning all the messages from the given offset. Checkpoints are NOT saved but ARE loaded at startup.
|
||||
*
|
||||
* @param startTime Starting position expressed in time
|
||||
* @return A source of IoT messages
|
||||
*/
|
||||
def sourceWithSavedCheckpoint(startTime: Instant): Source[MessageFromDevice, NotUsed] = {
|
||||
getSource(
|
||||
withTimeOffset = true,
|
||||
startTime = startTime,
|
||||
withCheckpoints = false,
|
||||
startFromSavedCheckpoint = true)
|
||||
}
|
||||
|
||||
/** Stream returning all the messages from the given offset. Checkpoints are NOT saved but ARE loaded at startup.
|
||||
*
|
||||
* @param offset Starting position, offset of the first message
|
||||
* @return A source of IoT messages
|
||||
*/
|
||||
def sourceWithSavedCheckpoint(offset: String): Source[MessageFromDevice, NotUsed] = {
|
||||
getSource(
|
||||
withTimeOffset = true,
|
||||
offset = offset,
|
||||
withCheckpoints = false,
|
||||
startFromSavedCheckpoint = true)
|
||||
}
|
||||
|
||||
|
||||
/** Create a stream returning all the messages for the defined partition, from the given start
|
||||
* point, optionally with checkpointing
|
||||
*
|
||||
|
@ -148,7 +121,7 @@ private[iothubreact] case class IoTHubPartition(val partition: Int)(implicit cpc
|
|||
Await.result(future, rwTimeout.duration)
|
||||
}
|
||||
} catch {
|
||||
case e: java.util.concurrent.TimeoutException => {
|
||||
case e: java.util.concurrent.TimeoutException ⇒ {
|
||||
log.error(e, "Timeout while retrieving the offset from the storage")
|
||||
throw e
|
||||
}
|
||||
|
|
|
@ -1,23 +1,17 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
// Namespace chosen to avoid access to internal classes
|
||||
// NOTE: Namespace chosen to avoid access to internal classes
|
||||
package api
|
||||
|
||||
import java.util.UUID
|
||||
// NOTE: No global imports to make easier detecting breaking changes
|
||||
|
||||
import com.microsoft.azure.iot.iothubreact.checkpointing.ICPConfiguration
|
||||
import org.mockito.Mockito.when
|
||||
import org.scalatest.mockito.MockitoSugar
|
||||
|
||||
// No global imports to make easier detecting breaking changes
|
||||
|
||||
class APIIsBackwardCompatible extends org.scalatest.FeatureSpec with MockitoSugar {
|
||||
class APIIsBackwardCompatible extends org.scalatest.FeatureSpec with org.scalatest.mockito.MockitoSugar {
|
||||
|
||||
info("As a developer using Azure IoT hub React")
|
||||
info("I want to be able to upgrade to new minor versions without changing my code")
|
||||
info("So I can benefit from improvements without excessive development costs")
|
||||
|
||||
implicit val cpconfig = mock[ICPConfiguration]
|
||||
implicit val cpconfig = mock[com.microsoft.azure.iot.iothubreact.checkpointing.ICPConfiguration]
|
||||
|
||||
feature("Version 0.x is backward compatible") {
|
||||
|
||||
|
@ -166,8 +160,8 @@ class APIIsBackwardCompatible extends org.scalatest.FeatureSpec with MockitoSuga
|
|||
|
||||
val backend: CustomBackend = new CustomBackend()
|
||||
|
||||
val anyname = UUID.randomUUID.toString
|
||||
when(cpconfig.storageNamespace).thenReturn(anyname)
|
||||
val anyname = java.util.UUID.randomUUID.toString
|
||||
org.mockito.Mockito.when(cpconfig.storageNamespace).thenReturn(anyname)
|
||||
assert(backend.checkpointNamespace == anyname)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
package com.microsoft.azure.iot.iothubreact.checkpointing
|
||||
|
||||
import com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib.Auth
|
||||
|
|
Загрузка…
Ссылка в новой задаче