This commit is contained in:
nyaghma 2020-07-14 14:56:06 -05:00 коммит произвёл GitHub
Родитель dfb2c91d99
Коммит 54c2bd4955
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 27 добавлений и 1 удалений

1
.gitignore поставляемый
Просмотреть файл

@ -8,3 +8,4 @@ target/
scalafmt-output.xml
dependency-reduced-pom.xml
metastore_db
.DS_Store

Просмотреть файл

@ -102,7 +102,7 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext,
val indexOfNewLine = content.indexOf("\n")
if (indexOfNewLine > 0) {
val version =
parseVersion(content.substring(0, indexOfNewLine), VERSION)
parseLogVersion(content.substring(0, indexOfNewLine), VERSION)
EventHubsSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
} else {
throw new IllegalStateException("Log file was malformed.")
@ -111,6 +111,31 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext,
EventHubsSourceOffset(SerializedOffset(content)) // Spark 2.1 log file
}
}
private def parseLogVersion(text: String, maxSupportedVersion: Int): Int = {
if (text.length > 0 && text(0) == 'v') {
val version =
try {
text.substring(1, text.length).toInt
} catch {
case _: NumberFormatException =>
throw new IllegalStateException(s"Log file was malformed: failed to read correct log " +
s"version from $text.")
}
if (version > 0) {
if (version > maxSupportedVersion) {
throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " +
s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " +
s"by a newer version of Spark and cannot be read by this version. Please upgrade.")
} else {
return version
}
}
}
// reaching here means we failed to read the correct log version
throw new IllegalStateException(s"Log file was malformed: failed to read correct log " +
s"version from $text.")
}
}
metadataLog