merge master
This commit is contained in:
Коммит
8d980c8b03
|
@ -44,7 +44,9 @@ libraryDependencies <++= (scalaVersion) {
|
|||
"org.scalatest" %% "scalatest" % "3.0.1" % "test",
|
||||
|
||||
// https://github.com/Azure/azure-iot-sdk-java/releases
|
||||
"com.microsoft.azure.sdk.iot" % "iot-device-client" % "1.0.21" % "test"
|
||||
"com.microsoft.azure.sdk.iot" % "iot-device-client" % "1.0.21" % "test",
|
||||
|
||||
"org.mockito" % "mockito-all" % "1.10.19" % "test"
|
||||
)
|
||||
}
|
||||
|
||||
|
|
11
build.sh
11
build.sh
|
@ -1,11 +1,12 @@
|
|||
if [ "$1" == "PR" ]; then
|
||||
TRAVIS_PULL_REQUEST=true
|
||||
if [ "$1" == "PR" -o "$1" == "pr" ]; then
|
||||
echo "Skipping tests requiring encrypted secrets."
|
||||
export TRAVIS_PULL_REQUEST="true"
|
||||
fi
|
||||
|
||||
travis lint -x && \
|
||||
sbt +clean && \
|
||||
sbt +compile && \
|
||||
sbt +package && \
|
||||
sbt +clean && \
|
||||
sbt +compile && \
|
||||
sbt +package && \
|
||||
sbt +test
|
||||
|
||||
rm -f *.crt
|
||||
|
|
|
@ -1 +1 @@
|
|||
sbt.version = 0.13.13
|
||||
sbt.version=0.13.13
|
||||
|
|
|
@ -1,10 +1,3 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
scalaVersion := "2.12.1"
|
||||
|
||||
libraryDependencies <++= (scalaVersion) {
|
||||
scalaVersion ⇒
|
||||
Seq(
|
||||
"com.microsoft.azure" % "azure-eventhubs-eph" % "0.11.0"
|
||||
)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
package com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib
|
||||
|
||||
case class Auth(username: String, password: String)
|
|
@ -4,22 +4,80 @@ package com.microsoft.azure.iot.iothubreact.checkpointing
|
|||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib.Auth
|
||||
import com.typesafe.config.{Config, ConfigFactory}
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.language.postfixOps
|
||||
import scala.util.Try
|
||||
|
||||
/** Checkpointing configuration interface
|
||||
*/
|
||||
trait ICPConfiguration {
|
||||
|
||||
/** Whether checkpointing is enabled
|
||||
*/
|
||||
val isEnabled: Boolean
|
||||
|
||||
/** Namespace where the table with checkpoint data is stored (e.g. Cassandra keyspace)
|
||||
*/
|
||||
val storageNamespace: String
|
||||
|
||||
/** Type of storage, the value is not case sensitive
|
||||
*/
|
||||
val checkpointBackendType: String
|
||||
|
||||
/** How often checkpoint data is written to the storage
|
||||
*/
|
||||
val checkpointFrequency: FiniteDuration
|
||||
|
||||
/** Checkpointing operations timeout
|
||||
*/
|
||||
val checkpointRWTimeout: FiniteDuration
|
||||
|
||||
/** How many messages to replay after a restart, for each IoT hub partition
|
||||
*/
|
||||
val checkpointCountThreshold: Int
|
||||
|
||||
/** Store a position if its value is older than this amount of time, rounded to seconds
|
||||
*/
|
||||
val checkpointTimeThreshold: FiniteDuration
|
||||
|
||||
/** Whether to use the Azure Storage Emulator when using Azure blob backend
|
||||
*/
|
||||
val azureBlobEmulator: Boolean
|
||||
|
||||
/** Azure blob connection string
|
||||
*/
|
||||
val azureBlobConnectionString: String
|
||||
|
||||
/** Azure blob lease duration (between 15s and 60s by Azure docs)
|
||||
*/
|
||||
val azureBlobLeaseDuration: FiniteDuration
|
||||
|
||||
/** Cassandra cluster address
|
||||
* TODO: support list
|
||||
*/
|
||||
val cassandraCluster: String
|
||||
|
||||
/** Cassandra replication factor, value required to open a connection
|
||||
*/
|
||||
val cassandraReplicationFactor: Int
|
||||
|
||||
/** Cassandra authentication credentials
|
||||
*/
|
||||
val cassandraAuth: Option[Auth]
|
||||
}
|
||||
|
||||
/** Hold IoT Hub stream checkpointing configuration settings
|
||||
*/
|
||||
private[iothubreact] object Configuration {
|
||||
private[iothubreact] class CPConfiguration(implicit conf: Config = ConfigFactory.load) extends ICPConfiguration {
|
||||
|
||||
// TODO: Allow to use multiple configurations, e.g. while processing multiple streams
|
||||
// a client will need a dedicated checkpoint container for each stream
|
||||
|
||||
private[this] val confPath = "iothub-react.checkpointing."
|
||||
|
||||
private[this] val conf: Config = ConfigFactory.load()
|
||||
|
||||
// Default time between checkpoint writes to the storage
|
||||
private[this] val DefaultFrequency = 1 second
|
||||
|
||||
|
@ -57,7 +115,10 @@ private[iothubreact] object Configuration {
|
|||
private[this] val MaxTimeThreshold = 1 hour
|
||||
|
||||
// Default name of the container used to store checkpoint data
|
||||
private[this] val DefaultContainer = "iothub-react-checkpoints"
|
||||
private[this] lazy val DefaultContainer = checkpointBackendType.toUpperCase match {
|
||||
case "CASSANDRA" ⇒ "iothub_react_checkpoints"
|
||||
case _ ⇒ "iothub-react-checkpoints"
|
||||
}
|
||||
|
||||
// Whether checkpointing is enabled or not
|
||||
lazy val isEnabled: Boolean = conf.getBoolean(confPath + "enabled")
|
||||
|
@ -70,11 +131,11 @@ private[iothubreact] object Configuration {
|
|||
MaxFrequency)
|
||||
|
||||
// How many messages to replay after a restart, for each IoT hub partition
|
||||
lazy val checkpointCountThreshold = Math.max(1, conf.getInt(confPath + "countThreshold"))
|
||||
lazy val checkpointCountThreshold: Int = Math.max(1, conf.getInt(confPath + "countThreshold"))
|
||||
|
||||
// Store a position if its value is older than this amount of time, rounded to seconds
|
||||
// Min: 1 second, Max: 1 hour
|
||||
lazy val checkpointTimeThreshold = getDuration(
|
||||
lazy val checkpointTimeThreshold: FiniteDuration = getDuration(
|
||||
confPath + "timeThreshold",
|
||||
DefaultTimeThreshold,
|
||||
MinTimeThreshold,
|
||||
|
@ -99,7 +160,7 @@ private[iothubreact] object Configuration {
|
|||
// Azure blob connection string
|
||||
lazy val azureBlobConnectionString: String = getAzureBlobConnectionString
|
||||
|
||||
// Azure blob lease duration (15s and 60s by Azure docs)
|
||||
// Azure blob lease duration (between 15s and 60s by Azure docs)
|
||||
lazy val azureBlobLeaseDuration: FiniteDuration = getDuration(
|
||||
confPath + "storage.azureblob.lease",
|
||||
15 seconds,
|
||||
|
@ -107,8 +168,14 @@ private[iothubreact] object Configuration {
|
|||
60 seconds)
|
||||
|
||||
// Cassandra cluster address
|
||||
lazy val cassandraCluster : String = conf.getString(confPath + "storage.cassandra.cluster")
|
||||
lazy val cassandraReplicationFactor: Int = conf.getInt(confPath + "storage.cassandra.replicationFactor")
|
||||
lazy val cassandraCluster : String = conf.getString(confPath + "storage.cassandra.cluster")
|
||||
lazy val cassandraReplicationFactor: Int = conf.getInt(confPath + "storage.cassandra.replicationFactor")
|
||||
lazy val cassandraAuth : Option[Auth] = (for {
|
||||
u <- Try(conf.getString(confPath + "storage.cassandra.username"))
|
||||
p <- Try(conf.getString(confPath + "storage.cassandra.password"))
|
||||
} yield {
|
||||
Auth(u, p)
|
||||
}).toOption
|
||||
|
||||
/** Load Azure blob connection string, taking care of the Azure storage emulator case
|
||||
*
|
|
@ -13,6 +13,7 @@ private[iothubreact] object CheckpointActorSystem {
|
|||
|
||||
implicit private[this] val actorSystem = ActorSystem("IoTHubReact")
|
||||
implicit private[this] val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem))
|
||||
implicit private[this] val cpconfig = new CPConfiguration
|
||||
var localRegistry: Map[String, ActorRef] = Map[String, ActorRef]()
|
||||
|
||||
/** Create an actor to read/write offset checkpoints from the storage
|
||||
|
|
|
@ -31,7 +31,7 @@ private[iothubreact] object CheckpointService {
|
|||
*
|
||||
* @param partition IoT hub partition number [0..N]
|
||||
*/
|
||||
private[iothubreact] class CheckpointService(partition: Int)
|
||||
private[iothubreact] class CheckpointService(partition: Int)(implicit config: ICPConfiguration)
|
||||
extends Actor
|
||||
with Stash
|
||||
with Logger {
|
||||
|
@ -96,8 +96,9 @@ private[iothubreact] class CheckpointService(partition: Int)
|
|||
|
||||
var offsetToStore: String = ""
|
||||
val now = Instant.now.getEpochSecond
|
||||
val timeThreshold = Configuration.checkpointTimeThreshold.toSeconds
|
||||
val countThreshold = Configuration.checkpointCountThreshold
|
||||
|
||||
val timeThreshold = config.checkpointTimeThreshold.toSeconds
|
||||
val countThreshold = config.checkpointCountThreshold
|
||||
|
||||
// Check if the queue contains old offsets to flush (time threshold)
|
||||
// Check if the queue contains data of too many messages (count threshold)
|
||||
|
@ -110,7 +111,7 @@ private[iothubreact] class CheckpointService(partition: Int)
|
|||
}
|
||||
|
||||
if (offsetToStore == "") {
|
||||
log.debug(s"Checkpoint skipped: partition=${partition}, count ${queuedOffsets} < threshold ${Configuration.checkpointCountThreshold}")
|
||||
log.debug(s"Checkpoint skipped: partition=${partition}, count ${queuedOffsets} < threshold ${config.checkpointCountThreshold}")
|
||||
} else {
|
||||
log.info(s"Writing checkpoint: partition=${partition}, storing ${offsetToStore} (current offset=${currentOffset})")
|
||||
storage.writeOffset(partition, offsetToStore)
|
||||
|
@ -139,7 +140,7 @@ private[iothubreact] class CheckpointService(partition: Int)
|
|||
def updateOffsetAction(offset: String) = {
|
||||
|
||||
if (!schedulerStarted) {
|
||||
val time = Configuration.checkpointFrequency
|
||||
val time = config.checkpointFrequency
|
||||
schedulerStarted = true
|
||||
context.system.scheduler.schedule(time, time, self, StoreOffset)
|
||||
log.info(s"Scheduled checkpoint for partition ${partition} every ${time.toMillis} ms")
|
||||
|
@ -166,7 +167,7 @@ private[iothubreact] class CheckpointService(partition: Int)
|
|||
|
||||
// TODO: Support plugins
|
||||
def getCheckpointBackend: CheckpointBackend = {
|
||||
val conf = Configuration.checkpointBackendType
|
||||
val conf = config.checkpointBackendType
|
||||
conf.toUpperCase match {
|
||||
case "AZUREBLOB" ⇒ new AzureBlob
|
||||
case "CASSANDRA" ⇒ new CassandraTable
|
||||
|
|
|
@ -6,7 +6,7 @@ import java.io.IOException
|
|||
import java.net.URISyntaxException
|
||||
import java.util.UUID
|
||||
|
||||
import com.microsoft.azure.iot.iothubreact.checkpointing.Configuration
|
||||
import com.microsoft.azure.iot.iothubreact.checkpointing.ICPConfiguration
|
||||
import com.microsoft.azure.iot.iothubreact.scaladsl.IoTHubPartition
|
||||
import com.microsoft.azure.iot.iothubreact.{Logger, Retry}
|
||||
import com.microsoft.azure.storage.blob.CloudBlockBlob
|
||||
|
@ -17,13 +17,13 @@ import scala.language.{implicitConversions, postfixOps}
|
|||
|
||||
/** Storage logic to write checkpoints to Azure blobs
|
||||
*/
|
||||
private[iothubreact] class AzureBlob extends CheckpointBackend with Logger {
|
||||
private[iothubreact] class AzureBlob(implicit config: ICPConfiguration) extends CheckpointBackend with Logger {
|
||||
|
||||
// Set the account to point either to Azure or the emulator
|
||||
val account: CloudStorageAccount = if (Configuration.azureBlobEmulator)
|
||||
val account: CloudStorageAccount = if (config.azureBlobEmulator)
|
||||
CloudStorageAccount.getDevelopmentStorageAccount()
|
||||
else
|
||||
CloudStorageAccount.parse(Configuration.azureBlobConnectionString)
|
||||
CloudStorageAccount.parse(config.azureBlobConnectionString)
|
||||
|
||||
val client = account.createCloudBlobClient()
|
||||
|
||||
|
@ -112,7 +112,7 @@ private[iothubreact] class AzureBlob extends CheckpointBackend with Logger {
|
|||
// Note: the lease ID must be a Guid otherwise the service returs 400
|
||||
var leaseId = UUID.randomUUID().toString
|
||||
try {
|
||||
file.acquireLease(Configuration.azureBlobLeaseDuration.toSeconds.toInt, leaseId)
|
||||
file.acquireLease(config.azureBlobLeaseDuration.toSeconds.toInt, leaseId)
|
||||
} catch {
|
||||
|
||||
case e: StorageException ⇒
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
package com.microsoft.azure.iot.iothubreact.checkpointing.backends
|
||||
|
||||
import com.microsoft.azure.iot.iothubreact.Logger
|
||||
import com.microsoft.azure.iot.iothubreact.checkpointing.Configuration
|
||||
import com.microsoft.azure.iot.iothubreact.checkpointing.ICPConfiguration
|
||||
import com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib.Connection
|
||||
import com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.{CheckpointRecord, CheckpointsTableSchema}
|
||||
import com.microsoft.azure.iot.iothubreact.scaladsl.IoTHubPartition
|
||||
|
@ -11,10 +11,10 @@ import org.json4s.JsonAST
|
|||
|
||||
/** Storage logic to write checkpoints to a Cassandra table
|
||||
*/
|
||||
private[iothubreact] class CassandraTable extends CheckpointBackend with Logger {
|
||||
private[iothubreact] class CassandraTable(implicit config: ICPConfiguration) extends CheckpointBackend with Logger {
|
||||
|
||||
val schema = new CheckpointsTableSchema(checkpointNamespace, "checkpoints")
|
||||
val connection = Connection(Configuration.cassandraCluster, Configuration.cassandraReplicationFactor, schema)
|
||||
val connection = Connection(config.cassandraCluster, config.cassandraReplicationFactor, config.cassandraAuth, schema)
|
||||
val table = connection.getTable[CheckpointRecord]()
|
||||
|
||||
connection.createKeyspaceIfNotExists()
|
||||
|
|
|
@ -2,11 +2,11 @@
|
|||
|
||||
package com.microsoft.azure.iot.iothubreact.checkpointing.backends
|
||||
|
||||
import com.microsoft.azure.iot.iothubreact.checkpointing.Configuration
|
||||
import com.microsoft.azure.iot.iothubreact.checkpointing.{CPConfiguration, ICPConfiguration}
|
||||
|
||||
trait CheckpointBackend {
|
||||
|
||||
def checkpointNamespace: String = Configuration.storageNamespace
|
||||
def checkpointNamespace(implicit config: ICPConfiguration = new CPConfiguration): String = config.storageNamespace
|
||||
|
||||
/** Read the offset of the last record processed for the given partition
|
||||
*
|
||||
|
|
|
@ -13,11 +13,18 @@ import com.datastax.driver.core.Cluster
|
|||
private[iothubreact] case class Connection(
|
||||
contactPoint: String,
|
||||
replicationFactor: Int,
|
||||
auth: Option[Auth],
|
||||
table: TableSchema) {
|
||||
|
||||
private lazy val hostPort = extractHostPort()
|
||||
private lazy val cluster = Cluster.builder().addContactPoint(hostPort._1).withPort(hostPort._2).build()
|
||||
implicit lazy val session = cluster.connect()
|
||||
private lazy val hostPort = extractHostPort()
|
||||
private lazy val cluster = {
|
||||
val builder = Cluster.builder().addContactPoint(hostPort._1).withPort(hostPort._2)
|
||||
auth map {
|
||||
creds ⇒ builder.withCredentials(creds.username, creds.password)
|
||||
} getOrElse (builder) build()
|
||||
}
|
||||
|
||||
implicit lazy val session = cluster.connect()
|
||||
|
||||
/** Create the key space if not present
|
||||
*/
|
||||
|
|
|
@ -8,6 +8,7 @@ import java.util.concurrent.CompletionStage
|
|||
import akka.stream.javadsl.{Sink, Source ⇒ JavaSource}
|
||||
import akka.{Done, NotUsed}
|
||||
import com.microsoft.azure.iot.iothubreact._
|
||||
import com.microsoft.azure.iot.iothubreact.checkpointing.{CPConfiguration, ICPConfiguration}
|
||||
import com.microsoft.azure.iot.iothubreact.scaladsl.{IoTHub ⇒ IoTHubScalaDSL, OffsetList ⇒ OffsetListScalaDSL, PartitionList ⇒ PartitionListScalaDSL}
|
||||
import com.microsoft.azure.iot.iothubreact.sinks.{DevicePropertiesSink, MessageToDeviceSink, MethodOnDeviceSink}
|
||||
|
||||
|
@ -17,7 +18,8 @@ class IoTHub() {
|
|||
|
||||
// TODO: Provide ClearCheckpoints() method to clear the state
|
||||
|
||||
private lazy val iotHub = new IoTHubScalaDSL()
|
||||
private implicit lazy val cpconfig: ICPConfiguration = new CPConfiguration
|
||||
private lazy val iotHub = new IoTHubScalaDSL()
|
||||
|
||||
/** Stop the stream
|
||||
*/
|
||||
|
|
|
@ -8,7 +8,7 @@ import akka.stream._
|
|||
import akka.stream.scaladsl._
|
||||
import akka.{Done, NotUsed}
|
||||
import com.microsoft.azure.iot.iothubreact._
|
||||
import com.microsoft.azure.iot.iothubreact.checkpointing.{Configuration ⇒ CPConfiguration}
|
||||
import com.microsoft.azure.iot.iothubreact.checkpointing.{CPConfiguration, ICPConfiguration}
|
||||
import com.microsoft.azure.iot.iothubreact.sinks.{DevicePropertiesSink, MessageToDeviceSink, MethodOnDeviceSink}
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
@ -16,7 +16,7 @@ import scala.language.postfixOps
|
|||
|
||||
/** Provides a streaming source to retrieve messages from Azure IoT Hub
|
||||
*/
|
||||
case class IoTHub() extends Logger {
|
||||
case class IoTHub(implicit config: ICPConfiguration = new CPConfiguration) extends Logger {
|
||||
|
||||
// TODO: Provide ClearCheckpoints() method to clear the state
|
||||
|
||||
|
@ -137,7 +137,7 @@ case class IoTHub() extends Logger {
|
|||
withTimeOffset = false,
|
||||
partitions = allPartitions,
|
||||
offsets = fromStart,
|
||||
withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
|
||||
withCheckpoints = withCheckpoints && config.isEnabled)
|
||||
}
|
||||
|
||||
/** Stream returning all the messages from all the configured partitions.
|
||||
|
@ -154,7 +154,7 @@ case class IoTHub() extends Logger {
|
|||
withTimeOffset = false,
|
||||
partitions = Some(partitions),
|
||||
offsets = fromStart,
|
||||
withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
|
||||
withCheckpoints = withCheckpoints && config.isEnabled)
|
||||
}
|
||||
|
||||
/** Stream returning all the messages starting from the given offset, from all
|
||||
|
@ -201,7 +201,7 @@ case class IoTHub() extends Logger {
|
|||
withTimeOffset = true,
|
||||
partitions = allPartitions,
|
||||
startTime = startTime,
|
||||
withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
|
||||
withCheckpoints = withCheckpoints && config.isEnabled)
|
||||
}
|
||||
|
||||
/** Stream returning all the messages starting from the given time, from all
|
||||
|
@ -218,7 +218,7 @@ case class IoTHub() extends Logger {
|
|||
withTimeOffset = true,
|
||||
partitions = Some(partitions),
|
||||
startTime = startTime,
|
||||
withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
|
||||
withCheckpoints = withCheckpoints && config.isEnabled)
|
||||
}
|
||||
|
||||
/** Stream returning all the messages starting from the given offset, from all
|
||||
|
@ -234,7 +234,7 @@ case class IoTHub() extends Logger {
|
|||
withTimeOffset = false,
|
||||
partitions = allPartitions,
|
||||
offsets = Some(offsets),
|
||||
withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
|
||||
withCheckpoints = withCheckpoints && config.isEnabled)
|
||||
}
|
||||
|
||||
/** Stream returning all the messages starting from the given offset, from all
|
||||
|
@ -251,7 +251,7 @@ case class IoTHub() extends Logger {
|
|||
withTimeOffset = false,
|
||||
partitions = Some(partitions),
|
||||
offsets = Some(offsets),
|
||||
withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
|
||||
withCheckpoints = withCheckpoints && config.isEnabled)
|
||||
}
|
||||
|
||||
/** Stream returning all the messages, from the given starting point, optionally with
|
||||
|
@ -280,9 +280,9 @@ case class IoTHub() extends Logger {
|
|||
|
||||
for (partition ← partitions.get.values) {
|
||||
val graph = if (withTimeOffset)
|
||||
IoTHubPartition(partition).source(startTime, withCheckpoints).via(streamManager)
|
||||
else
|
||||
IoTHubPartition(partition).source(offsets.get.values(partition), withCheckpoints).via(streamManager)
|
||||
IoTHubPartition(partition).source(startTime, withCheckpoints).via(streamManager)
|
||||
else
|
||||
IoTHubPartition(partition).source(offsets.get.values(partition), withCheckpoints).via(streamManager)
|
||||
|
||||
val source = Source.fromGraph(graph).async
|
||||
source ~> merge
|
||||
|
|
|
@ -11,7 +11,7 @@ import akka.util.Timeout
|
|||
import com.microsoft.azure.eventhubs.PartitionReceiver
|
||||
import com.microsoft.azure.iot.iothubreact._
|
||||
import com.microsoft.azure.iot.iothubreact.checkpointing.CheckpointService.GetOffset
|
||||
import com.microsoft.azure.iot.iothubreact.checkpointing.{CheckpointActorSystem, SavePositionOnPull, Configuration ⇒ CPConfiguration}
|
||||
import com.microsoft.azure.iot.iothubreact.checkpointing.{CheckpointActorSystem, ICPConfiguration, SavePositionOnPull}
|
||||
import com.microsoft.azure.iot.iothubreact.filters.Ignore
|
||||
|
||||
import scala.concurrent.Await
|
||||
|
@ -33,7 +33,7 @@ object IoTHubPartition extends Logger {
|
|||
* @param partition IoT hub partition number (0-based). The number of
|
||||
* partitions is set during the deployment.
|
||||
*/
|
||||
private[iothubreact] case class IoTHubPartition(val partition: Int) extends Logger {
|
||||
private[iothubreact] case class IoTHubPartition(val partition: Int)(implicit cpconfig: ICPConfiguration) extends Logger {
|
||||
|
||||
/** Stream returning all the messages from the given offset
|
||||
*
|
||||
|
@ -46,7 +46,7 @@ private[iothubreact] case class IoTHubPartition(val partition: Int) extends Logg
|
|||
getSource(
|
||||
withTimeOffset = true,
|
||||
startTime = startTime,
|
||||
withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
|
||||
withCheckpoints = withCheckpoints && cpconfig.isEnabled)
|
||||
}
|
||||
|
||||
/** Stream returning all the messages from the given offset
|
||||
|
@ -60,7 +60,7 @@ private[iothubreact] case class IoTHubPartition(val partition: Int) extends Logg
|
|||
getSource(
|
||||
withTimeOffset = false,
|
||||
offset = offset,
|
||||
withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
|
||||
withCheckpoints = withCheckpoints && cpconfig.isEnabled)
|
||||
}
|
||||
|
||||
/** Create a stream returning all the messages for the defined partition, from the given start
|
||||
|
@ -83,7 +83,7 @@ private[iothubreact] case class IoTHubPartition(val partition: Int) extends Logg
|
|||
var _offset = offset
|
||||
var _withTimeOffset = withTimeOffset
|
||||
if (withCheckpoints) {
|
||||
val savedOffset = GetSavedOffset()
|
||||
val savedOffset = GetSavedOffset
|
||||
if (savedOffset != IoTHubPartition.OffsetCheckpointNotFound) {
|
||||
_offset = savedOffset
|
||||
_withTimeOffset = false
|
||||
|
@ -112,7 +112,7 @@ private[iothubreact] case class IoTHubPartition(val partition: Int) extends Logg
|
|||
*/
|
||||
private[this] def GetSavedOffset(): String = {
|
||||
val partitionCp = CheckpointActorSystem.getCheckpointService(partition)
|
||||
implicit val rwTimeout = Timeout(CPConfiguration.checkpointRWTimeout)
|
||||
implicit val rwTimeout = Timeout(cpconfig.checkpointRWTimeout)
|
||||
try {
|
||||
Retry(3, 5 seconds) {
|
||||
log.debug(s"Loading the stream position for partition ${partition}")
|
||||
|
|
|
@ -1,16 +1,18 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
// Namespace chosen to avoid access to internal classes
|
||||
// NOTE: Namespace chosen to avoid access to internal classes
|
||||
package api
|
||||
|
||||
// No global imports to make easier detecting breaking changes
|
||||
// NOTE: No global imports to make easier detecting breaking changes
|
||||
|
||||
class APIIsBackwardCompatible extends org.scalatest.FeatureSpec {
|
||||
class APIIsBackwardCompatible extends org.scalatest.FeatureSpec with org.scalatest.mockito.MockitoSugar {
|
||||
|
||||
info("As a developer using Azure IoT hub React")
|
||||
info("I want to be able to upgrade to new minor versions without changing my code")
|
||||
info("So I can benefit from improvements without excessive development costs")
|
||||
|
||||
implicit val cpconfig = mock[com.microsoft.azure.iot.iothubreact.checkpointing.ICPConfiguration]
|
||||
|
||||
feature("Version 0.x is backward compatible") {
|
||||
|
||||
scenario("Using MessageFromDevice") {
|
||||
|
@ -149,6 +151,7 @@ class APIIsBackwardCompatible extends org.scalatest.FeatureSpec {
|
|||
import com.microsoft.azure.iot.iothubreact.checkpointing.backends.CheckpointBackend
|
||||
|
||||
class CustomBackend extends CheckpointBackend {
|
||||
|
||||
override def readOffset(partition: Int): String = {
|
||||
return ""
|
||||
}
|
||||
|
@ -157,7 +160,10 @@ class APIIsBackwardCompatible extends org.scalatest.FeatureSpec {
|
|||
}
|
||||
|
||||
val backend: CustomBackend = new CustomBackend()
|
||||
assert(backend.checkpointNamespace == "iothub-react-checkpoints")
|
||||
|
||||
val anyname = java.util.UUID.randomUUID.toString
|
||||
org.mockito.Mockito.when(cpconfig.storageNamespace).thenReturn(anyname)
|
||||
assert(backend.checkpointNamespace == anyname)
|
||||
}
|
||||
|
||||
scenario("Using Message Type") {
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
package com.microsoft.azure.iot.iothubreact.checkpointing
|
||||
|
||||
import com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib.Auth
|
||||
import com.typesafe.config.{Config, ConfigException}
|
||||
import org.mockito.Mockito._
|
||||
import org.scalatest.mockito.MockitoSugar
|
||||
import org.scalatest.{FeatureSpec, GivenWhenThen}
|
||||
|
||||
class ConfigurationTest extends FeatureSpec with GivenWhenThen with MockitoSugar {
|
||||
|
||||
info("As a configured instance")
|
||||
info("I want logic around returned values to be consistent with application expectations")
|
||||
|
||||
val confPath = "iothub-react.checkpointing."
|
||||
feature("Configuration Cassandra authorization") {
|
||||
|
||||
scenario("Only one of username or password is supplied") {
|
||||
var cfg = mock[Config]
|
||||
when(cfg.getString(confPath + "storage.cassandra.username")).thenReturn("username")
|
||||
when(cfg.getString(confPath + "storage.cassandra.password")).thenThrow(new ConfigException.Missing("path"))
|
||||
assert(new CPConfiguration()(cfg).cassandraAuth == None)
|
||||
|
||||
cfg = mock[Config]
|
||||
when(cfg.getString(confPath + "storage.cassandra.username")).thenThrow(new ConfigException.Missing("path"))
|
||||
when(cfg.getString(confPath + "storage.cassandra.password")).thenReturn("password")
|
||||
assert(new CPConfiguration()(cfg).cassandraAuth == None)
|
||||
}
|
||||
|
||||
scenario("Both username and password are supplied") {
|
||||
var cfg = mock[Config]
|
||||
when(cfg.getString(confPath + "storage.cassandra.username")).thenReturn("username")
|
||||
when(cfg.getString(confPath + "storage.cassandra.password")).thenReturn("password")
|
||||
assert(new CPConfiguration()(cfg).cassandraAuth == Some(Auth("username", "password")))
|
||||
}
|
||||
}
|
||||
|
||||
feature("Storage namespace") {
|
||||
|
||||
scenario("Cassandra has a special namespace value") {
|
||||
var cfg = mock[Config]
|
||||
when(cfg.getString(confPath + "storage.namespace")).thenReturn("")
|
||||
|
||||
when(cfg.getString(confPath + "storage.backendType")).thenReturn("anythingbutcassandra")
|
||||
assert(new CPConfiguration()(cfg).storageNamespace == "iothub-react-checkpoints")
|
||||
|
||||
when(cfg.getString(confPath + "storage.backendType")).thenReturn("AZUREBLOB")
|
||||
assert(new CPConfiguration()(cfg).storageNamespace == "iothub-react-checkpoints")
|
||||
|
||||
when(cfg.getString(confPath + "storage.backendType")).thenReturn("CASSANDRA")
|
||||
assert(new CPConfiguration()(cfg).storageNamespace == "iothub_react_checkpoints")
|
||||
}
|
||||
}
|
||||
}
|
|
@ -22,7 +22,6 @@ class IoTHubReactHasAnAwesomeAPI extends FeatureSpec with GivenWhenThen {
|
|||
|
||||
Given("An IoT hub is configured")
|
||||
val hub = IoTHub()
|
||||
val hubPartition = IoTHubPartition(1)
|
||||
|
||||
When("A developer wants to fetch messages from Azure IoT hub")
|
||||
val messagesFromAllPartitions: Source[MessageFromDevice, NotUsed] = hub.source(false)
|
||||
|
|
Загрузка…
Ссылка в новой задаче