diff --git a/.gitignore b/.gitignore index afa40cd2..b8bf9309 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ target/ scalafmt-output.xml dependency-reduced-pom.xml metastore_db +.DS_Store diff --git a/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala b/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala index 3f9fd8c6..bcde2aca 100644 --- a/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala @@ -102,7 +102,7 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext, val indexOfNewLine = content.indexOf("\n") if (indexOfNewLine > 0) { val version = - parseVersion(content.substring(0, indexOfNewLine), VERSION) + parseLogVersion(content.substring(0, indexOfNewLine), VERSION) EventHubsSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1))) } else { throw new IllegalStateException("Log file was malformed.") @@ -111,6 +111,31 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext, EventHubsSourceOffset(SerializedOffset(content)) // Spark 2.1 log file } } + + private def parseLogVersion(text: String, maxSupportedVersion: Int): Int = { + if (text.length > 0 && text(0) == 'v') { + val version = + try { + text.substring(1, text.length).toInt + } catch { + case _: NumberFormatException => + throw new IllegalStateException(s"Log file was malformed: failed to read correct log " + + s"version from $text.") + } + if (version > 0) { + if (version > maxSupportedVersion) { + throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " + + s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " + + s"by a newer version of Spark and cannot be read by this version. Please upgrade.") + } else { + return version + } + } + } + // reaching here means we failed to read the correct log version + throw new IllegalStateException(s"Log file was malformed: failed to read correct log " + + s"version from $text.") + } } metadataLog