Revert "Fix structured streaming from CosmosDB as source - change feed from beginning"

This commit is contained in:
Ramnandan Krishnamurthy 2018-07-05 14:46:44 -07:00 коммит произвёл GitHub
Родитель 51b6c45d9b
Коммит 553783a75f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 25 добавлений и 106 удалений

Просмотреть файл

@ -151,6 +151,17 @@ private[spark] case class AsyncCosmosDBConnection(config: Config) extends Loggin
connectionPolicy.setPreferredLocations(preferredLocations)
}
val bulkimport = config.get[String](CosmosDBConfig.BulkImport).
getOrElse(CosmosDBConfig.DefaultBulkImport.toString).
toBoolean
if (bulkimport) {
// The bulk import library handles the throttling requests on its own
// Gateway connection mode needed to avoid potential master partition throttling
// as the number of tasks grow larger for collection with a lot of partitions.
connectionPolicy.getRetryOptions.setMaxRetryAttemptsOnThrottledRequests(0)
connectionPolicy.setConnectionMode(ConnectionMode.Gateway)
}
ClientConfiguration(
config.get[String](CosmosDBConfig.Endpoint).get,
config.get[String](CosmosDBConfig.Masterkey).get,

Просмотреть файл

@ -43,7 +43,7 @@ object CosmosDBRDDIterator {
// For verification purpose
var lastFeedOptions: FeedOptions = _
var hdfsUtils: HdfsUtils = _
private var hdfsUtils: HdfsUtils = _
def initializeHdfsUtils(hadoopConfig: Map[String, String]): Any = {
if (hdfsUtils == null) {
@ -102,10 +102,6 @@ object CosmosDBRDDIterator {
tokenString = new ObjectMapper().writeValueAsString(nextTokenMap)
}
else {
// Encoding offset as serialized empty map and not null to prevent serialization failure
tokenString = new ObjectMapper().writeValueAsString(new ConcurrentHashMap[String, String]())
}
tokenString
}

Просмотреть файл

@ -1,68 +0,0 @@
/**
* The MIT License (MIT)
* Copyright (c) 2016 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.cosmosdb.spark.streaming
import org.apache.spark.sql.{ForeachWriter, Row}
import com.microsoft.azure.cosmosdb.spark.AsyncCosmosDBConnection
import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig}
import scala.collection.mutable
case class CosmosDBForeachWriter(configMap: Map[String, String]) extends ForeachWriter[Row] {
var asyncConnection: AsyncCosmosDBConnection = _
var rows: mutable.ArrayBuffer[Row] = _
val config = Config(configMap)
val upsert: Boolean = config
.getOrElse(CosmosDBConfig.Upsert, String.valueOf(CosmosDBConfig.DefaultUpsert))
.toBoolean
val writingBatchSize = config
.getOrElse(CosmosDBConfig.WritingBatchSize, String.valueOf(CosmosDBConfig.DefaultWritingBatchSize))
.toInt
val writingBatchDelayMs = config
.getOrElse(CosmosDBConfig.WritingBatchDelayMs, String.valueOf(CosmosDBConfig.DefaultWritingBatchDelayMs))
.toInt
val rootPropertyToSave = config
.get[String](CosmosDBConfig.RootPropertyToSave)
def open(partitionId: Long, version: Long): Boolean = {
asyncConnection = new AsyncCosmosDBConnection(config)
rows = new mutable.ArrayBuffer[Row]()
true
}
def process(value: Row): Unit = {
rows.append(value)
}
def close(errorOrNull: Throwable): Unit = {
errorOrNull match {
case t: Throwable => throw t
case _ => {
if(rows.nonEmpty) {
asyncConnection.importWithRxJava(rows.iterator, asyncConnection, writingBatchSize, writingBatchDelayMs, rootPropertyToSave, upsert)
}
}
}
}
}

Просмотреть файл

@ -63,43 +63,25 @@ private[spark] class CosmosDBSource(sqlContext: SQLContext,
if (currentSchema == null) {
CosmosDBRDDIterator.initializeHdfsUtils(HdfsUtils.getConfigurationMap(
sqlContext.sparkSession.sparkContext.hadoopConfiguration).toMap)
// Delete current tokens and next tokens checkpoint directories to ensure change feed starts from beginning if set
if (streamConfigMap.getOrElse(CosmosDBConfig.ChangeFeedStartFromTheBeginning, String.valueOf(false)).toBoolean) {
val changeFeedCheckpointLocation: String = streamConfigMap
.getOrElse(CosmosDBConfig.ChangeFeedCheckpointLocation, StringUtils.EMPTY)
val queryName = Config(streamConfigMap)
.get[String](CosmosDBConfig.ChangeFeedQueryName).get
val currentTokensCheckpointPath = changeFeedCheckpointLocation + "/" + HdfsUtils.filterFilename(queryName)
val nextTokensCheckpointPath = changeFeedCheckpointLocation + "/" +
HdfsUtils.filterFilename(CosmosDBRDDIterator.getNextTokenPath(queryName))
CosmosDBRDDIterator.hdfsUtils.deleteFile(currentTokensCheckpointPath)
CosmosDBRDDIterator.hdfsUtils.deleteFile(nextTokensCheckpointPath)
}
logDebug(s"Reading data to derive the schema")
val helperDfConfig: Map[String, String] = streamConfigMap
.-(CosmosDBConfig.ChangeFeedStartFromTheBeginning)
.+((CosmosDBConfig.ChangeFeedStartFromTheBeginning, String.valueOf(false)))
.-(CosmosDBConfig.ReadChangeFeed).
+((CosmosDBConfig.ReadChangeFeed, String.valueOf(false)))
.-(CosmosDBConfig.QueryCustom).
+((CosmosDBConfig.QueryCustom, "SELECT TOP 10 * FROM c"))
// Dummy change feed query to get the first continuation token
val df = sqlContext.read.cosmosDB(Config(helperDfConfig))
val tokens = CosmosDBRDDIterator.getCollectionTokens(Config(configMap))
if (StringUtils.isEmpty(tokens)) {
// Empty tokens means it is a new streaming query
// Trigger the count to update the current continuation token
df.count()
}
val shouldInferSchema = helperDfConfig.
getOrElse(CosmosDBConfig.InferStreamSchema, CosmosDBConfig.DefaultInferStreamSchema.toString).
toBoolean
if (shouldInferSchema) {
// Dummy batch read query to sample schema
val df = sqlContext.read.cosmosDB(Config(helperDfConfig))
val tokens = CosmosDBRDDIterator.getCollectionTokens(Config(configMap))
if (StringUtils.isEmpty(tokens)) {
// Empty tokens means it is a new streaming query
// Trigger the count to force batch read query to sample schema
df.count()
}
currentSchema = df.schema
} else {
currentSchema = cosmosDbStreamSchema
@ -120,14 +102,12 @@ private[spark] class CosmosDBSource(sqlContext: SQLContext,
val tsTokenRegex = "\"" + CosmosDBConfig.StreamingTimestampToken + "\"\\:\"[\\d]+\"" // "tsToken": "2324343"
offsetJson.replaceAll(tsTokenRegex, StringUtils.EMPTY)
}
logDebug(s"getBatch with offset: $start $end")
val endJson: String = getOffsetJsonForProgress(end.json)
val endJson = getOffsetJsonForProgress(end.json)
val nextTokens = getOffsetJsonForProgress(CosmosDBRDDIterator.getCollectionTokens(Config(streamConfigMap)))
val currentTokens = getOffsetJsonForProgress(
CosmosDBRDDIterator.getCollectionTokens(Config(streamConfigMap),
shouldGetCurrentToken = true))
// Only getting the data in the following cases:
// - The provided end offset is the current offset (next tokens), the stream is progressing to the batch
// - The provided end offset is the current tokens. This means the stream didn't get to commit the to end offset yet
@ -138,8 +118,8 @@ private[spark] class CosmosDBSource(sqlContext: SQLContext,
streamConfigMap
.-(CosmosDBConfig.ChangeFeedContinuationToken)
.+((CosmosDBConfig.ChangeFeedContinuationToken, end.json)))
val currentDf = sqlContext.read.cosmosDB(schema, readConfig, sqlContext)
currentDf
sqlContext.read.cosmosDB(schema, readConfig, sqlContext)
} else {
logDebug(s"Skipping this batch")
sqlContext.createDataFrame(sqlContext.emptyDataFrame.rdd, schema)