Merge pull request #210 from tknandu/2.3
Fix structured streaming from CosmosDB as source - change feed from beginning
This commit is contained in:
Коммит
51b6c45d9b
|
@ -151,17 +151,6 @@ private[spark] case class AsyncCosmosDBConnection(config: Config) extends Loggin
|
|||
connectionPolicy.setPreferredLocations(preferredLocations)
|
||||
}
|
||||
|
||||
val bulkimport = config.get[String](CosmosDBConfig.BulkImport).
|
||||
getOrElse(CosmosDBConfig.DefaultBulkImport.toString).
|
||||
toBoolean
|
||||
if (bulkimport) {
|
||||
// The bulk import library handles the throttling requests on its own
|
||||
// Gateway connection mode needed to avoid potential master partition throttling
|
||||
// as the number of tasks grow larger for collection with a lot of partitions.
|
||||
connectionPolicy.getRetryOptions.setMaxRetryAttemptsOnThrottledRequests(0)
|
||||
connectionPolicy.setConnectionMode(ConnectionMode.Gateway)
|
||||
}
|
||||
|
||||
ClientConfiguration(
|
||||
config.get[String](CosmosDBConfig.Endpoint).get,
|
||||
config.get[String](CosmosDBConfig.Masterkey).get,
|
||||
|
|
|
@ -43,7 +43,7 @@ object CosmosDBRDDIterator {
|
|||
// For verification purpose
|
||||
var lastFeedOptions: FeedOptions = _
|
||||
|
||||
private var hdfsUtils: HdfsUtils = _
|
||||
var hdfsUtils: HdfsUtils = _
|
||||
|
||||
def initializeHdfsUtils(hadoopConfig: Map[String, String]): Any = {
|
||||
if (hdfsUtils == null) {
|
||||
|
@ -102,6 +102,10 @@ object CosmosDBRDDIterator {
|
|||
|
||||
tokenString = new ObjectMapper().writeValueAsString(nextTokenMap)
|
||||
}
|
||||
else {
|
||||
// Encoding offset as serialized empty map and not null to prevent serialization failure
|
||||
tokenString = new ObjectMapper().writeValueAsString(new ConcurrentHashMap[String, String]())
|
||||
}
|
||||
|
||||
tokenString
|
||||
}
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
/**
|
||||
* The MIT License (MIT)
|
||||
* Copyright (c) 2016 Microsoft Corporation
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the "Software"), to deal
|
||||
* in the Software without restriction, including without limitation the rights
|
||||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the Software is
|
||||
* furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in all
|
||||
* copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
* SOFTWARE.
|
||||
*/
|
||||
package com.microsoft.azure.cosmosdb.spark.streaming
|
||||
|
||||
import org.apache.spark.sql.{ForeachWriter, Row}
|
||||
import com.microsoft.azure.cosmosdb.spark.AsyncCosmosDBConnection
|
||||
import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig}
|
||||
|
||||
import scala.collection.mutable
|
||||
|
||||
case class CosmosDBForeachWriter(configMap: Map[String, String]) extends ForeachWriter[Row] {
|
||||
var asyncConnection: AsyncCosmosDBConnection = _
|
||||
var rows: mutable.ArrayBuffer[Row] = _
|
||||
|
||||
val config = Config(configMap)
|
||||
val upsert: Boolean = config
|
||||
.getOrElse(CosmosDBConfig.Upsert, String.valueOf(CosmosDBConfig.DefaultUpsert))
|
||||
.toBoolean
|
||||
val writingBatchSize = config
|
||||
.getOrElse(CosmosDBConfig.WritingBatchSize, String.valueOf(CosmosDBConfig.DefaultWritingBatchSize))
|
||||
.toInt
|
||||
val writingBatchDelayMs = config
|
||||
.getOrElse(CosmosDBConfig.WritingBatchDelayMs, String.valueOf(CosmosDBConfig.DefaultWritingBatchDelayMs))
|
||||
.toInt
|
||||
val rootPropertyToSave = config
|
||||
.get[String](CosmosDBConfig.RootPropertyToSave)
|
||||
|
||||
def open(partitionId: Long, version: Long): Boolean = {
|
||||
asyncConnection = new AsyncCosmosDBConnection(config)
|
||||
rows = new mutable.ArrayBuffer[Row]()
|
||||
true
|
||||
}
|
||||
|
||||
def process(value: Row): Unit = {
|
||||
rows.append(value)
|
||||
}
|
||||
|
||||
def close(errorOrNull: Throwable): Unit = {
|
||||
errorOrNull match {
|
||||
case t: Throwable => throw t
|
||||
case _ => {
|
||||
if(rows.nonEmpty) {
|
||||
asyncConnection.importWithRxJava(rows.iterator, asyncConnection, writingBatchSize, writingBatchDelayMs, rootPropertyToSave, upsert)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -63,25 +63,43 @@ private[spark] class CosmosDBSource(sqlContext: SQLContext,
|
|||
if (currentSchema == null) {
|
||||
CosmosDBRDDIterator.initializeHdfsUtils(HdfsUtils.getConfigurationMap(
|
||||
sqlContext.sparkSession.sparkContext.hadoopConfiguration).toMap)
|
||||
|
||||
// Delete current tokens and next tokens checkpoint directories to ensure change feed starts from beginning if set
|
||||
if (streamConfigMap.getOrElse(CosmosDBConfig.ChangeFeedStartFromTheBeginning, String.valueOf(false)).toBoolean) {
|
||||
val changeFeedCheckpointLocation: String = streamConfigMap
|
||||
.getOrElse(CosmosDBConfig.ChangeFeedCheckpointLocation, StringUtils.EMPTY)
|
||||
val queryName = Config(streamConfigMap)
|
||||
.get[String](CosmosDBConfig.ChangeFeedQueryName).get
|
||||
val currentTokensCheckpointPath = changeFeedCheckpointLocation + "/" + HdfsUtils.filterFilename(queryName)
|
||||
val nextTokensCheckpointPath = changeFeedCheckpointLocation + "/" +
|
||||
HdfsUtils.filterFilename(CosmosDBRDDIterator.getNextTokenPath(queryName))
|
||||
|
||||
CosmosDBRDDIterator.hdfsUtils.deleteFile(currentTokensCheckpointPath)
|
||||
CosmosDBRDDIterator.hdfsUtils.deleteFile(nextTokensCheckpointPath)
|
||||
}
|
||||
|
||||
logDebug(s"Reading data to derive the schema")
|
||||
val helperDfConfig: Map[String, String] = streamConfigMap
|
||||
.-(CosmosDBConfig.ChangeFeedStartFromTheBeginning)
|
||||
.+((CosmosDBConfig.ChangeFeedStartFromTheBeginning, String.valueOf(false)))
|
||||
|
||||
// Dummy change feed query to get the first continuation token
|
||||
val df = sqlContext.read.cosmosDB(Config(helperDfConfig))
|
||||
val tokens = CosmosDBRDDIterator.getCollectionTokens(Config(configMap))
|
||||
if (StringUtils.isEmpty(tokens)) {
|
||||
// Empty tokens means it is a new streaming query
|
||||
// Trigger the count to update the current continuation token
|
||||
df.count()
|
||||
}
|
||||
|
||||
.-(CosmosDBConfig.ReadChangeFeed).
|
||||
+((CosmosDBConfig.ReadChangeFeed, String.valueOf(false)))
|
||||
.-(CosmosDBConfig.QueryCustom).
|
||||
+((CosmosDBConfig.QueryCustom, "SELECT TOP 10 * FROM c"))
|
||||
val shouldInferSchema = helperDfConfig.
|
||||
getOrElse(CosmosDBConfig.InferStreamSchema, CosmosDBConfig.DefaultInferStreamSchema.toString).
|
||||
toBoolean
|
||||
|
||||
if (shouldInferSchema) {
|
||||
// Dummy batch read query to sample schema
|
||||
val df = sqlContext.read.cosmosDB(Config(helperDfConfig))
|
||||
val tokens = CosmosDBRDDIterator.getCollectionTokens(Config(configMap))
|
||||
if (StringUtils.isEmpty(tokens)) {
|
||||
// Empty tokens means it is a new streaming query
|
||||
// Trigger the count to force batch read query to sample schema
|
||||
df.count()
|
||||
}
|
||||
|
||||
currentSchema = df.schema
|
||||
} else {
|
||||
currentSchema = cosmosDbStreamSchema
|
||||
|
@ -102,12 +120,14 @@ private[spark] class CosmosDBSource(sqlContext: SQLContext,
|
|||
val tsTokenRegex = "\"" + CosmosDBConfig.StreamingTimestampToken + "\"\\:\"[\\d]+\"" // "tsToken": "2324343"
|
||||
offsetJson.replaceAll(tsTokenRegex, StringUtils.EMPTY)
|
||||
}
|
||||
|
||||
logDebug(s"getBatch with offset: $start $end")
|
||||
val endJson = getOffsetJsonForProgress(end.json)
|
||||
val endJson: String = getOffsetJsonForProgress(end.json)
|
||||
val nextTokens = getOffsetJsonForProgress(CosmosDBRDDIterator.getCollectionTokens(Config(streamConfigMap)))
|
||||
val currentTokens = getOffsetJsonForProgress(
|
||||
CosmosDBRDDIterator.getCollectionTokens(Config(streamConfigMap),
|
||||
shouldGetCurrentToken = true))
|
||||
|
||||
// Only getting the data in the following cases:
|
||||
// - The provided end offset is the current offset (next tokens), the stream is progressing to the batch
|
||||
// - The provided end offset is the current tokens. This means the stream didn't get to commit the to end offset yet
|
||||
|
@ -118,8 +138,8 @@ private[spark] class CosmosDBSource(sqlContext: SQLContext,
|
|||
streamConfigMap
|
||||
.-(CosmosDBConfig.ChangeFeedContinuationToken)
|
||||
.+((CosmosDBConfig.ChangeFeedContinuationToken, end.json)))
|
||||
sqlContext.read.cosmosDB(schema, readConfig, sqlContext)
|
||||
|
||||
val currentDf = sqlContext.read.cosmosDB(schema, readConfig, sqlContext)
|
||||
currentDf
|
||||
} else {
|
||||
logDebug(s"Skipping this batch")
|
||||
sqlContext.createDataFrame(sqlContext.emptyDataFrame.rdd, schema)
|
||||
|
|
Загрузка…
Ссылка в новой задаче