зеркало из
1
0
Форкнуть 0

Revert "Spark 3: Cache CDM Date and DateTime type methods (#100)" (#160)

This reverts commit 14048081dc.
This commit is contained in:
kecheung 2024-04-05 18:47:27 -07:00 коммит произвёл GitHub
Родитель 881b514e94
Коммит 23f01a1772
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
5 изменённых файлов: 67 добавлений и 265 удалений

Просмотреть файл

@ -20,7 +20,7 @@ import java.io.{File, FileOutputStream, InputStream}
class CDMDataReader(val storage: String,
val container: String,
val fileReader: ReaderConnector,
val hasHeader: Boolean,
val header: Boolean,
var schema: StructType,
var dataConverter: DataConverter,
val mode: String) extends PartitionReader[InternalRow] with Serializable {
@ -34,11 +34,11 @@ class CDMDataReader(val storage: String,
* @return Boolean indicating whether there is any data left to read.
*/
def next: Boolean = {
if (hasHeader && !headerRead) {
if (header && !headerRead) {
fileReader.readRow
//TODO: verify header names match with what we have in CDM
// println("TODO: Verify header names match")
println("TODO: Verify header names match")
headerRead = true
}
@ -60,7 +60,7 @@ class CDMDataReader(val storage: String,
if (row.length > schema.fields.length) {
seq = schema.zipWithIndex.map{ case (col, index) =>
val dataType = schema.fields(index).dataType
fileReader.jsonToData(dataType, row.apply(index), index, mode)
fileReader.jsonToData(dataType, row.apply(index), mode)
}
} else if (row.length < schema.fields.length) {
// When there are fewer columns in the CSV file the # of attributes in cdm entity file at the end
@ -69,13 +69,13 @@ class CDMDataReader(val storage: String,
null
} else {
val dataType = schema.fields(index).dataType
fileReader.jsonToData(dataType, row.apply(index), index, mode)
fileReader.jsonToData(dataType, row.apply(index), mode)
}
}
} else {
seq = row.zipWithIndex.map { case (col, index) =>
val dataType = schema.fields(index).dataType
fileReader.jsonToData(dataType, row.apply(index), index, mode)
fileReader.jsonToData(dataType, row.apply(index), mode)
}
}

Просмотреть файл

@ -48,9 +48,9 @@ class CDMSimpleScan(val storage: String,
override def toBatch: Batch = this
def getReader(fType: String, uriPath: String, filePath: String, schema: StructType, serializedHadoopConf: SparkSerializableConfiguration, delimiter: Char, hasHeader: Boolean): ReaderConnector ={
def getReader(fType: String, uriPath: String, filePath: String, schema: StructType, serializedHadoopConf: SparkSerializableConfiguration, delimiter: Char): ReaderConnector ={
return fType match {
case "is.partition.format.CSV" => new CSVReaderConnector(uriPath, filePath, serializedHadoopConf, delimiter, mode, schema, hasHeader)
case "is.partition.format.CSV" => new CSVReaderConnector(uriPath, filePath, serializedHadoopConf, delimiter, mode)
case "is.partition.format.parquet" => new ParquetReaderConnector(uriPath, filePath, schema, serializedHadoopConf)
}
}
@ -79,8 +79,8 @@ class CDMSimpleScan(val storage: String,
// Decode strings because hadoop cannot parse URI-encoded strings
val decodedFilePath = URLDecoder.decode(manifestPath + relPath, "UTF-8")
//we track hasHeader and pass it in to the reader so that we know if the first line is a header row
var hasHeader = false
//we track the header and pass it in to the reader so that we know if the first line is a header row
var header = false
val schema = readSchema();
var delimiter = Constants.DEFAULT_DELIMITER
val fileReader = {
@ -94,7 +94,7 @@ class CDMSimpleScan(val storage: String,
val arguments = traits.asInstanceOf[CdmTraitReference].getArguments().asScala
val headerArg = arguments.find(_.getName() == "columnHeaders")
if (headerArg != None) {
hasHeader = headerArg.get.getValue().toString.toBoolean
header = headerArg.get.getValue().toString.toBoolean
}
val delimiterArg = arguments.find(_.getName() == "delimiter")
if (delimiterArg != None) {
@ -102,18 +102,18 @@ class CDMSimpleScan(val storage: String,
if(strDelimiter.length > 1) throw new IllegalArgumentException(String.format(Messages.invalidDelimiterCharacter, strDelimiter))
delimiter = strDelimiter.charAt(0)
}
val reader = getReader(traits.getNamedReference, uriPrefix, decodedFilePath, schema, serializedHadoopOConf, delimiter, hasHeader)
val reader = getReader(traits.getNamedReference, uriPrefix, decodedFilePath, schema, serializedHadoopOConf, delimiter)
if (reader.isInstanceOf[ParquetReaderConnector] && Constants.PERMISSIVE.equalsIgnoreCase(mode)) {
throw new IllegalArgumentException(String.format(Messages.invalidPermissiveMode))
}
reader
} else {
SparkCDMLogger.log(Level.DEBUG, "No Named Reference Trait \"is.partition.format\" (CSV/Parquet", logger)
new CSVReaderConnector(uriPrefix, decodedFilePath, serializedHadoopOConf, delimiter, mode, schema, hasHeader)
new CSVReaderConnector(uriPrefix, decodedFilePath, serializedHadoopOConf, delimiter, mode)
}
}
factoryList.add(new CDMInputPartition(storage, container, fileReader, hasHeader, schema, dataConverter, mode))
factoryList.add(new CDMInputPartition(storage, container, fileReader, header, readSchema(), dataConverter, mode))
}
SparkCDMLogger.log(Level.DEBUG, "Count of partitions - "+eDec.getDataPartitions.size() + " Entity - " + eDec.getEntityName + " Manifest -"+ man.getManifestName, logger)
factoryList.asScala.toArray

Просмотреть файл

@ -1,5 +1,6 @@
package com.microsoft.cdm.read
import java.net.URLDecoder
import java.time.{Instant, LocalDate, LocalDateTime, LocalTime, ZoneId}
import java.time.format.{DateTimeFormatter, DateTimeParseException}
import java.time.temporal.ChronoUnit
@ -8,68 +9,42 @@ import com.microsoft.cdm.log.SparkCDMLogger
import com.univocity.parsers.csv.CsvParser
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DateType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, StructType, TimestampType}
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DateType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
import org.slf4j.LoggerFactory
import org.slf4j.event.Level
import scala.collection.mutable
class CSVReaderConnector(httpPrefix:String, filePath: String, serConf:SparkSerializableConfiguration, delimiter: Char, mode: String, schema: StructType, hasHeader: Boolean) extends ReaderConnector {
class CSVReaderConnector(httpPrefix:String, filePath: String, serConf:SparkSerializableConfiguration, delimiter: Char, mode: String) extends ReaderConnector {
val logger = LoggerFactory.getLogger(classOf[CSVReaderConnector])
SparkCDMLogger.log(Level.DEBUG, "CSV Reader for partition at path: " + httpPrefix + filePath, logger)
private var goodRow:Boolean = true
private var parser: CsvParser = _
private val dateFormatStrings = List(
"yyyy-MM-dd",
"M/d/yyyy" )
/**
* Map the index with a tuple (Any, Generic function)
* tuple._1 (Any) Format of value you are parsing
* tuple._2 (Generic function) Generic function to plug in format
*/
private var indexToCachedDateFunction: Array[(Any, (String, Any, Int, String) => LocalDate)] = _
private var indexToCachedDateTimeFunction: Array[(Any, (String, Any, Int, String) => java.lang.Long)] = _
private val localTimeFormatsNonStandard= List(
"M/d/yyyy H:mm",
"M/d/yyyy h:mm:ss a",
"M/d/yyyy H:mm:ss",
"yyyy-MM-dd H:mm:ss.S",
"yyyy-MM-dd H:mm:ss.SS",
"yyyy-MM-dd H:mm:ss.SSS",
"yyyy-MM-dd H:mm:ss.SSSS",
"yyyy-MM-dd H:mm:ss.SSSSS",
"yyyy-MM-dd H:mm:ss.SSSSSS",
"yyyy-MM-dd H:mm:ss",
"MMM d yyyy h:mma")
/**
* Formats have the format mapped to a generic function.
* The generic function will take in a format to perform the correct conversions
*/
private lazy val dateFormatStrings : mutable.Map[String, (String, Any, Int, String) => LocalDate] = mutable.Map(
"yyyy-MM-dd" -> parseGenericDateCached1,
"M/d/yyyy" -> parseGenericDateCached1)
// @transient + lazy required to ensure that DateTimeFormatter is not serialized when sent to executor
@transient private lazy val localTimeFormats : mutable.Map[DateTimeFormatter, (String, Any, Int, String) => java.lang.Long] = mutable.Map(
DateTimeFormatter.ISO_OFFSET_DATE_TIME -> parseGenericDateTimeCached1,
DateTimeFormatter.ISO_INSTANT -> parseGenericDateTimeCached1)
@transient private lazy val localTimeFormat2 : mutable.Map[DateTimeFormatter, (String, Any, Int, String) => java.lang.Long] = mutable.Map(
DateTimeFormatter.ISO_LOCAL_DATE_TIME -> parseGenericDateTimeCached2)
private lazy val localTimeFormatsNonStandard : mutable.Map[String, (String, Any, Int, String) => java.lang.Long] = mutable.Map(
"M/d/yyyy H:mm" -> parseGenericDateTimeCached3,
"M/d/yyyy h:mm:ss a" -> parseGenericDateTimeCached3,
"M/d/yyyy H:mm:ss" -> parseGenericDateTimeCached3,
"yyyy-MM-dd H:mm:ss.S" -> parseGenericDateTimeCached3,
"yyyy-MM-dd H:mm:ss.SS" -> parseGenericDateTimeCached3,
"yyyy-MM-dd H:mm:ss.SSS" -> parseGenericDateTimeCached3,
"yyyy-MM-dd H:mm:ss.SSSS" -> parseGenericDateTimeCached3,
"yyyy-MM-dd H:mm:ss.SSSSS" -> parseGenericDateTimeCached3,
"yyyy-MM-dd H:mm:ss.SSSSSS" -> parseGenericDateTimeCached3,
"yyyy-MM-dd H:mm:ss" -> parseGenericDateTimeCached3,
"MMM d yyyy h:mma" -> parseGenericDateTimeCached3)
private lazy val dateFormatStringsAsDateTime : mutable.Map[String, (String, Any, Int, String) => java.lang.Long] = mutable.Map(
"yyyy-MM-dd" -> parseGenericDateTimeCached4,
"M/d/yyyy" -> parseGenericDateTimeCached4)
private lazy val timeFormatStrings : mutable.Map[String, (String, Any, Int, String) => java.lang.Long] = mutable.Map(
"HH:mm:ss" -> parseGenericDateTimeCached5,
"HH:mm:ss.S" -> parseGenericDateTimeCached5,
"HH:mm:ss.SS" -> parseGenericDateTimeCached5,
"HH:mm:ss.SSS" -> parseGenericDateTimeCached5,
"HH:mm:ss.SSSS" -> parseGenericDateTimeCached5,
"HH:mm:ss.SSSSS" -> parseGenericDateTimeCached5,
"HH:mm:ss.SSSSSS" -> parseGenericDateTimeCached5)
private val timeFormatStrings = List(
"HH:mm:ss",
"HH:mm:ss.S",
"HH:mm:ss.SS",
"HH:mm:ss.SSS",
"HH:mm:ss.SSSS",
"HH:mm:ss.SSSSS",
"HH:mm:ss.SSSSSS")
def build: Unit = {
try {
@ -80,57 +55,12 @@ class CSVReaderConnector(httpPrefix:String, filePath: String, serConf:SparkSeria
parser.beginParsing {
inputStream
}
// Parse first row to catch methods
val tempStream = inputFile.newStream()
val tempParser = CsvParserFactory.build(delimiter)
tempParser.beginParsing {
tempStream
}
var temp = tempParser.parseNext()
if (hasHeader) { // if first row is a header, then read next
temp = tempParser.parseNext()
}
if (temp != null) { // if not null/CSV actually contains data, then set as first row
val firstRow = temp.asInstanceOf[Array[Any]]
// PARSE FIRST ROW
if (firstRow.length > schema.fields.length) {
schema.zipWithIndex.map{ case (col, index) =>
indexToCachedDateFunction = new Array[(Any, (String, Any, Int, String) => LocalDate)](schema.size)
indexToCachedDateTimeFunction = new Array[(Any, (String, Any, Int, String) => java.lang.Long)](schema.size)
val dataType = schema.fields(index).dataType
jsonToData(dataType, firstRow.apply(index), index, mode)
}
} else if (firstRow.length < schema.fields.length) {
// When there are fewer columns in the CSV file the # of attributes in cdm entity file at the end
schema.zipWithIndex.map{ case (col, index) =>
indexToCachedDateFunction = new Array[(Any, (String, Any, Int, String) => LocalDate)](schema.size)
indexToCachedDateTimeFunction = new Array[(Any, (String, Any, Int, String) => java.lang.Long)](schema.size)
if (index >= firstRow.length) {
null
} else {
val dataType = schema.fields(index).dataType
jsonToData(dataType, firstRow.apply(index), index, mode)
}
}
} else {
firstRow.zipWithIndex.map { case (col, index) =>
indexToCachedDateFunction = new Array[(Any, (String, Any, Int, String) => LocalDate)](firstRow.length)
indexToCachedDateTimeFunction = new Array[(Any, (String, Any, Int, String) => java.lang.Long)](firstRow.length)
val dataType = schema.fields(index).dataType
jsonToData(dataType, firstRow.apply(index), index, mode)
}
}
tempParser.stopParsing()
tempStream.close()
}
} catch {
case e: Throwable => SparkCDMLogger.log(Level.ERROR, e.printStackTrace.toString, logger)
}
}
def close(): Unit = {
parser.stopParsing()
}
def readRow(): Array[Any] = {
@ -149,7 +79,7 @@ class CSVReaderConnector(httpPrefix:String, filePath: String, serConf:SparkSeria
def isValidRow(): Boolean = goodRow
def jsonToData(dt: DataType, value: Any, schemaIndex: Int, mode: String): Any= {
def jsonToData(dt: DataType, value: Any, mode: String): Any= {
/* null is a valid value */
if (value == null) {
null
@ -166,15 +96,7 @@ class CSVReaderConnector(httpPrefix:String, filePath: String, serConf:SparkSeria
case BooleanType => util.Try(value.toString.toBoolean).getOrElse(null)
case DateType => {
if (value != None && value != null) {
var date : LocalDate = null
if (indexToCachedDateFunction(schemaIndex) != null) {
val tuple = indexToCachedDateFunction(schemaIndex)
val format = tuple._1
val fx = tuple._2
date = fx(value.toString, format, schemaIndex, mode)
} else {
date = tryParseDate(value.toString, schemaIndex, mode)
}
val date = tryParseDate(value.toString, mode)
/* If we can't parse the date we return a null. This enables permissive mode to work*/
if (date == null) {
@ -189,15 +111,7 @@ class CSVReaderConnector(httpPrefix:String, filePath: String, serConf:SparkSeria
case StringType => util.Try(UTF8String.fromString(value.toString)).getOrElse(null)
case TimestampType => {
if (value != None && value != null) {
var date : java.lang.Long = null
if (indexToCachedDateTimeFunction(schemaIndex) != null) {
val tuple = indexToCachedDateTimeFunction(schemaIndex)
val format = tuple._1
val fx = tuple._2
date = fx(value.toString, format, schemaIndex, mode)
} else {
date = tryParseDateTime(value.toString, schemaIndex, mode)
}
val date = tryParseDateTime(value.toString, mode)
/* If we can't parse the date we return a null. This enables permissive mode to work*/
if (date == null) {
@ -228,23 +142,18 @@ class CSVReaderConnector(httpPrefix:String, filePath: String, serConf:SparkSeria
}
}
/**
* Attempts to parse Date using all possible formats.
* Upon reaching the first successful parse, map the column index to a cached tuple (format, function).
*/
def tryParseDate(dateString: String, index: Int, mode: String): LocalDate= {
for (formatString <- dateFormatStrings.keySet) {
def tryParseDate(dateString: String, mode: String): LocalDate= {
for (formatString <- dateFormatStrings) {
try {
val dateTimeFormatter = DateTimeFormatter.ofPattern(formatString)
val localDate= LocalDate.parse(dateString, dateTimeFormatter)
indexToCachedDateFunction(index) = (formatString, dateFormatStrings(formatString))
return localDate
} catch {
case e: DateTimeParseException=>
}
}
val msg = s"Mode: $mode. Could not parse \'$dateString\'. Data in this format is not supported."
val msg = "Mode: " + mode + ". Could not parse " + dateString + " using any possible format"
SparkCDMLogger.log(Level.ERROR, msg, logger)
if (Constants.FAILFAST.equalsIgnoreCase(mode)) {
throw new IllegalArgumentException(msg)
@ -252,24 +161,21 @@ class CSVReaderConnector(httpPrefix:String, filePath: String, serConf:SparkSeria
null
}
/**
* Attempts to parse DateTime using all possible formats.
* Upon reaching the first successful parse, map the column index to a cached tuple (format, function).
*/
def tryParseDateTime(dateString: String, index: Int, mode: String): java.lang.Long = {
def tryParseDateTime(dateString: String, mode: String): java.lang.Long = {
val localTimeFormats = List(DateTimeFormatter.ISO_OFFSET_DATE_TIME,
DateTimeFormatter.ISO_INSTANT)
/* Conversions that to local time first */
for (format <- localTimeFormats.keySet) {
for (format <- localTimeFormats) {
var instant: Instant = null;
try {
val i = Instant.from(format.parse(dateString))
val zt = i.atZone(ZoneId.systemDefault())
instant = zt.toLocalDateTime.atZone(ZoneId.systemDefault()).toInstant();
val res = ChronoUnit.MICROS.between(Instant.EPOCH, instant)
indexToCachedDateTimeFunction(index) = (format, localTimeFormats(format))
return res
return ChronoUnit.MICROS.between(Instant.EPOCH, instant)
} catch {
case e: ArithmeticException => {
indexToCachedDateTimeFunction(index) = (format, localTimeFormats(format))
return instant.toEpochMilli()*1000
}
case e: DateTimeParseException=>
@ -277,17 +183,14 @@ class CSVReaderConnector(httpPrefix:String, filePath: String, serConf:SparkSeria
}
/* Local Time formatting */
for (format <- localTimeFormat2.keySet) {
for (format <- List(DateTimeFormatter.ISO_LOCAL_DATE_TIME)) {
var instant: Instant = null
try {
val localDateTime = LocalDateTime.parse(dateString, format)
instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
val res = ChronoUnit.MICROS.between(Instant.EPOCH, instant)
indexToCachedDateTimeFunction(index) = (format, localTimeFormat2(format))
return res
return ChronoUnit.MICROS.between(Instant.EPOCH, instant)
} catch {
case e: ArithmeticException => {
indexToCachedDateTimeFunction(index) = (format, localTimeFormat2(format))
return instant.toEpochMilli()*1000
}
case e: DateTimeParseException =>
@ -295,19 +198,16 @@ class CSVReaderConnector(httpPrefix:String, filePath: String, serConf:SparkSeria
}
/* Non-common formats in local time */
for (formatString <- localTimeFormatsNonStandard.keySet) {
for (formatString <- localTimeFormatsNonStandard) {
var instant: Instant = null
try {
val dateTimeFormatter = DateTimeFormatter.ofPattern(formatString)
val localDateTime = LocalDateTime.parse(dateString, dateTimeFormatter)
/* Assume non-standard times are in UTC */
instant = localDateTime.atZone(ZoneId.of("UTC")).toInstant();
val res = ChronoUnit.MICROS.between(Instant.EPOCH, instant)
indexToCachedDateTimeFunction(index) = (formatString, localTimeFormatsNonStandard(formatString))
return res
return ChronoUnit.MICROS.between(Instant.EPOCH, instant)
} catch {
case e: ArithmeticException => {
indexToCachedDateTimeFunction(index) = (formatString, localTimeFormatsNonStandard(formatString))
return instant.toEpochMilli()*1000
}
case e: DateTimeParseException =>
@ -315,19 +215,16 @@ class CSVReaderConnector(httpPrefix:String, filePath: String, serConf:SparkSeria
}
/* Just Dates (no-time element) formats formatting */
for (formatString <- dateFormatStringsAsDateTime.keySet) {
for (formatString <- dateFormatStrings) {
var instant: Instant = null
try {
val dateTimeFormatter = DateTimeFormatter.ofPattern(formatString)
val localDate = LocalDate.parse(dateString, dateTimeFormatter)
val localDateTime1 = localDate.atStartOfDay();
instant = localDateTime1.atZone(ZoneId.of("UTC")).toInstant();
val res = ChronoUnit.MICROS.between(Instant.EPOCH, instant)
indexToCachedDateTimeFunction(index) = (formatString, dateFormatStringsAsDateTime(formatString))
return res
return ChronoUnit.MICROS.between(Instant.EPOCH, instant)
} catch {
case e: ArithmeticException => {
indexToCachedDateTimeFunction(index) = (formatString, dateFormatStringsAsDateTime(formatString))
return instant.toEpochMilli()*1000
}
case e: DateTimeParseException =>
@ -335,121 +232,27 @@ class CSVReaderConnector(httpPrefix:String, filePath: String, serConf:SparkSeria
}
/* Finally, this could just be a Time - Try that */
for (formatString <- timeFormatStrings.keySet) {
for (formatString <- timeFormatStrings) {
var instant: Instant = null
try {
val formatterTime1 = DateTimeFormatter.ofPattern(formatString)
val ls = LocalTime.parse(dateString, formatterTime1)
instant = ls.atDate(LocalDate.of(1970, 1, 1)).atZone(ZoneId.of("UTC")).toInstant
val res = ChronoUnit.MICROS.between(Instant.EPOCH, instant)
indexToCachedDateTimeFunction(index) = (formatString, timeFormatStrings(formatString))
return res
return ChronoUnit.MICROS.between(Instant.EPOCH, instant)
} catch {
case e: ArithmeticException => {
indexToCachedDateTimeFunction(index) = (formatString, timeFormatStrings(formatString))
return instant.toEpochMilli()*1000
}
case e: DateTimeParseException =>
}
}
val msg = s"Mode: $mode. Could not parse \'$dateString\'. Data in this format is not supported."
val msg = "Mode: " + mode + ". Could not parse " + dateString + " using any possible format"
SparkCDMLogger.log(Level.ERROR, msg, logger)
if (Constants.FAILFAST.equalsIgnoreCase(mode)) {
throw new IllegalArgumentException(msg)
}
null
}
// DATE PARSING
private def parseGenericDateCached1(dateString: String, format: Any, index: Int, mode: String) : LocalDate = {
try {
val dateTimeFormatter = DateTimeFormatter.ofPattern(format.asInstanceOf[String])
return LocalDate.parse(dateString, dateTimeFormatter)
} catch {
case e: DateTimeParseException=>
}
return checkFailFast(dateString, format, index, mode)
}
// DATETIME PARSING
private def parseGenericDateTimeCached1(dateString: String, format: Any, index: Int, mode: String) : java.lang.Long = {
var instant: Instant = null;
try {
val i = Instant.from(format.asInstanceOf[DateTimeFormatter].parse(dateString))
val zt = i.atZone(ZoneId.systemDefault())
instant = zt.toLocalDateTime.atZone(ZoneId.systemDefault()).toInstant();
return ChronoUnit.MICROS.between(Instant.EPOCH, instant)
} catch {
case e: ArithmeticException => { return instant.toEpochMilli()*1000 }
case e: DateTimeParseException=>
}
return checkFailFast(dateString, format, index, mode)
}
private def parseGenericDateTimeCached2(dateString: String, format: Any, index: Int, mode: String) : java.lang.Long = {
var instant: Instant = null
try {
val localDateTime = LocalDateTime.parse(dateString, format.asInstanceOf[DateTimeFormatter])
instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
return ChronoUnit.MICROS.between(Instant.EPOCH, instant)
} catch {
case e: ArithmeticException => { return instant.toEpochMilli()*1000 }
case e: DateTimeParseException =>
}
return checkFailFast(dateString, format, index, mode)
}
private def parseGenericDateTimeCached3(dateString: String, format: Any, index: Int, mode: String) : java.lang.Long = {
var instant: Instant = null
try {
val dateTimeFormatter = DateTimeFormatter.ofPattern(format.asInstanceOf[String])
val localDateTime = LocalDateTime.parse(dateString, dateTimeFormatter)
/* Assume non-standard times are in UTC */
instant = localDateTime.atZone(ZoneId.of("UTC")).toInstant();
return ChronoUnit.MICROS.between(Instant.EPOCH, instant)
} catch {
case e: ArithmeticException => { return instant.toEpochMilli()*1000 }
case e: DateTimeParseException =>
}
return checkFailFast(dateString, format, index, mode)
}
private def parseGenericDateTimeCached4(dateString: String, format: Any, index: Int, mode: String) : java.lang.Long = {
var instant: Instant = null
try {
val dateTimeFormatter = DateTimeFormatter.ofPattern(format.asInstanceOf[String])
val localDate = LocalDate.parse(dateString, dateTimeFormatter)
val localDateTime1 = localDate.atStartOfDay();
instant = localDateTime1.atZone(ZoneId.of("UTC")).toInstant();
return ChronoUnit.MICROS.between(Instant.EPOCH, instant)
} catch {
case e: ArithmeticException => { return instant.toEpochMilli()*1000 }
case e: DateTimeParseException =>
}
return checkFailFast(dateString, format, index, mode)
}
private def parseGenericDateTimeCached5(dateString: String, format: Any, index: Int, mode: String) : java.lang.Long = {
var instant: Instant = null
try {
val formatterTime1 = DateTimeFormatter.ofPattern(format.asInstanceOf[String])
val ls = LocalTime.parse(dateString, formatterTime1)
instant = ls.atDate(LocalDate.of(1970, 1, 1)).atZone(ZoneId.of("UTC")).toInstant
return ChronoUnit.MICROS.between(Instant.EPOCH, instant)
} catch {
case e: ArithmeticException => { return instant.toEpochMilli()*1000 }
case e: DateTimeParseException =>
}
return checkFailFast(dateString, format, index, mode)
}
private def checkFailFast(dateString: String, format: Any, index: Int, mode: String) : Null = {
val msg = s"Mode: $mode. Each item in a column must have the same format. Column $index, item \'$dateString\' does not match the cached format \'$format\'."
SparkCDMLogger.log(Level.ERROR, msg, logger)
if (Constants.FAILFAST.equalsIgnoreCase(mode)) {
throw new IllegalArgumentException(msg)
}
return null
}
}

Просмотреть файл

@ -207,14 +207,14 @@ class ParquetReaderConnector(httpPrefix: String,
def isValidRow(): Boolean = true
def jsonToData(dt: DataType, value: Any, schemaIndex: Int, mode: String): Any = {
def jsonToData(dt: DataType, value: Any, mode: String): Any = {
return dt match {
case ar: ArrayType => {
util.Try({
val structs = value.toString.split(" ")
val seq = structs.zipWithIndex.map{ case (col, index) =>
val dataType = ar.elementType
jsonToData(dataType, col, schemaIndex, mode)
jsonToData(dataType, col, mode)
}
ArrayData.toArrayData(seq)
}).getOrElse(null)
@ -242,7 +242,7 @@ class ParquetReaderConnector(httpPrefix: String,
val arr = deSerializeObject(value.toString.getBytes());
val seq = arr.zipWithIndex.map { case (col, index) =>
val dataType = st.fields(index).dataType
jsonToData(dataType, col, schemaIndex, mode)
jsonToData(dataType, col, mode)
}
val isAllNull = arr.forall(x => x == null)
if (isAllNull) null else InternalRow.fromSeq(seq)

Просмотреть файл

@ -26,10 +26,9 @@ trait ReaderConnector extends Serializable {
* This method is to used to convert to Spark/CDM data types
* @param dataType
* @param col
* @param schemaIndex Used in CSVReaderConnector for schema mapping. In ParquetReaderConnector, it has no functional purpose other than keeping the same interface.
* @return
*/
def jsonToData(dataType: DataType, col: Any, schemaIndex: Int, mode: String): Any
def jsonToData(dataType: DataType, col: Any, mode: String): Any
def isValidRow(): Boolean