fixing scalastyle issues and build warnings
This commit is contained in:
Родитель
ac08f53112
Коммит
afb2fad2fb
|
@ -20,9 +20,10 @@ import java.time.Instant
|
|||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import EventHubsOffsetTypes.EventHubsOffsetType
|
||||
|
||||
import com.microsoft.azure.eventhubs.{EventHubClient => AzureEventHubClient, _}
|
||||
import com.microsoft.azure.servicebus._
|
||||
import EventHubsOffsetTypes.EventHubsOffsetType
|
||||
|
||||
import org.apache.spark.{SparkEnv, TaskContext}
|
||||
import org.apache.spark.eventhubscommon.EventHubNameAndPartition
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.spark.eventhubscommon.progress
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
|
||||
import org.apache.spark.eventhubscommon.EventHubNameAndPartition
|
||||
|
||||
private[spark] object PathTools extends Serializable {
|
||||
|
|
|
@ -276,8 +276,8 @@ abstract class ProgressTrackerBase[T <: EventHubsConnector](
|
|||
var oos: FSDataOutputStream = null
|
||||
try {
|
||||
// write progress file
|
||||
oos = fs.create(new Path(s"$progressDirectoryPath/${PathTools.makeProgressFileName(commitTime)}"),
|
||||
true)
|
||||
oos = fs.create(new Path(
|
||||
s"$progressDirectoryPath/${PathTools.makeProgressFileName(commitTime)}"), true)
|
||||
offsetToCommit.foreach {
|
||||
case (namespace, ehNameAndPartitionToOffsetAndSeq) =>
|
||||
ehNameAndPartitionToOffsetAndSeq.foreach {
|
||||
|
@ -304,7 +304,8 @@ abstract class ProgressTrackerBase[T <: EventHubsConnector](
|
|||
private def createMetadata(fs: FileSystem, commitTime: Long): Boolean = {
|
||||
var oos: FSDataOutputStream = null
|
||||
try {
|
||||
oos = fs.create(new Path(s"$metadataDirectoryStr/" + s"${PathTools.makeMetadataFileName(commitTime)}"), true)
|
||||
oos = fs.create(new Path(
|
||||
s"$metadataDirectoryStr/" + s"${PathTools.makeMetadataFileName(commitTime)}"), true)
|
||||
true
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
|
|
|
@ -34,14 +34,14 @@ private[spark] class ProgressWriter(
|
|||
progressDir: String,
|
||||
subDirIdentifiers: String*) extends Logging {
|
||||
|
||||
// TODO: Why can't we get this info from one of the ProgressTrackers? This info is available there.
|
||||
// TODO: Come up with better name
|
||||
// TODO: Why can't we get this info from one of the ProgressTrackers?
|
||||
// TODO: Come up with better name for this guy
|
||||
private val tempProgressTrackingPointStr =
|
||||
PathTools.makeTempDirectoryStr(progressDir, subDirIdentifiers: _*) + "/" +
|
||||
PathTools.makeTempFileName(streamId, uid, eventHubNameAndPartition, timestamp)
|
||||
|
||||
// TODO: Why can't we get this info from one of the ProgressTrackers? This info is available there.
|
||||
// TODO: Come up with better name
|
||||
// TODO: Why can't we get this info from one of the ProgressTrackers?
|
||||
// TODO: Come up with better name for this guy
|
||||
private[spark] val tempProgressTrackingPointPath = new Path(tempProgressTrackingPointStr)
|
||||
|
||||
def write(recordTime: Long, cpOffset: Long, cpSeq: Long): Unit = {
|
||||
|
|
|
@ -32,9 +32,12 @@ class StructuredStreamingProgressTracker private[spark](
|
|||
hadoopConfiguration: Configuration)
|
||||
extends ProgressTrackerBase(progressDir, appName, hadoopConfiguration) {
|
||||
|
||||
private[spark] override lazy val progressDirectoryStr = PathTools.makeProgressDirectoryStr(progressDir, appName, uid)
|
||||
private[spark] override lazy val tempDirectoryStr = PathTools.makeTempDirectoryStr(progressDir, appName, uid)
|
||||
private[spark] override lazy val metadataDirectoryStr = PathTools.makeMetadataDirectoryStr(progressDir, appName, uid)
|
||||
private[spark] override lazy val progressDirectoryStr =
|
||||
PathTools.makeProgressDirectoryStr(progressDir, appName, uid)
|
||||
private[spark] override lazy val tempDirectoryStr =
|
||||
PathTools.makeTempDirectoryStr(progressDir, appName, uid)
|
||||
private[spark] override lazy val metadataDirectoryStr =
|
||||
PathTools.makeMetadataDirectoryStr(progressDir, appName, uid)
|
||||
|
||||
override def eventHubNameAndPartitions: Map[String, List[EventHubNameAndPartition]] = {
|
||||
val connector = StructuredStreamingProgressTracker.registeredConnectors(uid)
|
||||
|
|
|
@ -19,9 +19,8 @@ package org.apache.spark.streaming.eventhubs.checkpoint
|
|||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.SparkContext
|
||||
|
||||
import org.apache.spark.internal.Logging
|
||||
|
||||
/**
|
||||
* A DFS based OffsetStore implementation
|
||||
|
|
|
@ -21,8 +21,8 @@ import scala.collection.mutable.ListBuffer
|
|||
|
||||
import com.microsoft.azure.eventhubs.EventData
|
||||
|
||||
import org.apache.spark.eventhubscommon.client.{EventHubClient, EventHubsClientWrapper, EventHubsOffsetTypes}
|
||||
import org.apache.spark.eventhubscommon.EventHubNameAndPartition
|
||||
import org.apache.spark.eventhubscommon.client.{EventHubClient, EventHubsClientWrapper, EventHubsOffsetTypes}
|
||||
import org.apache.spark.eventhubscommon.client.EventHubsOffsetTypes.EventHubsOffsetType
|
||||
import org.apache.spark.streaming.StreamingContext
|
||||
|
||||
|
|
|
@ -56,9 +56,11 @@ class ProgressTrackerSuite extends SharedUtils {
|
|||
seq: Int): Unit = {
|
||||
for (partitionId <- partitionRange) {
|
||||
val filePath = Paths.get(progressPath + s"/${PathTools.makeProgressFileName(timestamp)}")
|
||||
val stdOpenOption = if (Files.exists(filePath))
|
||||
val stdOpenOption = if (Files.exists(filePath)) {
|
||||
StandardOpenOption.APPEND
|
||||
else StandardOpenOption.CREATE
|
||||
} else {
|
||||
StandardOpenOption.CREATE
|
||||
}
|
||||
|
||||
Files.write(filePath,
|
||||
s"${ProgressRecord(timestamp, namespace, ehName, partitionId, offset, seq)
|
||||
|
|
|
@ -17,8 +17,8 @@
|
|||
|
||||
package com.microsoft.spark.sql.examples
|
||||
|
||||
import org.apache.spark.sql.streaming.ProcessingTime
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.streaming.ProcessingTime
|
||||
import org.apache.spark.sql.functions._
|
||||
|
||||
object EventHubsStructuredStreamingExample {
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -335,6 +335,7 @@
|
|||
<plugin>
|
||||
<groupId>org.scala-tools</groupId>
|
||||
<artifactId>maven-scala-plugin</artifactId>
|
||||
<version>2.15.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
|
@ -368,6 +369,7 @@
|
|||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<version>3.0.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>empty-javadoc-jar</id>
|
||||
|
|
Загрузка…
Ссылка в новой задаче