Updating DataSource implementations for spark 2.4.

This commit is contained in:
Matthew Normyle 2019-04-22 21:11:32 -07:00
Родитель c8e4b3ef36
Коммит 422508318e
9 изменённых файлов: 32 добавлений и 26 удалений

Просмотреть файл

@ -1,10 +1,10 @@
name := "spark-cdm"
version := "0.2"
version := "0.3"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.2" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.1" % "provided"
libraryDependencies += "com.microsoft.azure" % "adal4j" % "1.6.3"
libraryDependencies += "com.univocity" % "univocity-parsers" % "2.7.6"

Двоичный файл не отображается.

Просмотреть файл

@ -5,7 +5,8 @@ import java.io.InputStream
import com.microsoft.cdm.utils._
import com.univocity.parsers.csv.CsvParser
import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.reader.DataReader
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
import org.apache.spark.sql.types.{StringType, StructType}
/**
@ -18,7 +19,7 @@ import org.apache.spark.sql.types.{StringType, StructType}
class CDMDataReader(var remoteCSVPath: String,
var schema: StructType,
var adlProvider: ADLGen2Provider,
var dataConverter: DataConverter) extends DataReader[Row] {
var dataConverter: DataConverter) extends InputPartitionReader[InternalRow] {
var parser: CsvParser = _
var stream: InputStream = _
@ -43,7 +44,7 @@ class CDMDataReader(var remoteCSVPath: String,
* Called by the Spark runtime if there is data left to read.
* @return The next row of data.
*/
def get: Row = {
def get: InternalRow = {
val seq = row.zipWithIndex.map{ case (col, index) =>
val dataType = schema.fields(index).dataType
if(col == null || (col.length == 0 && dataType != StringType)) {
@ -53,7 +54,7 @@ class CDMDataReader(var remoteCSVPath: String,
dataConverter.jsonToData(schema.fields(index).dataType)(col)
}
}
Row.fromSeq(seq)
InternalRow.fromSeq(seq)
}
/**

Просмотреть файл

@ -2,7 +2,8 @@ package com.microsoft.cdm.read
import com.microsoft.cdm.utils.{ADLGen2Provider, CDMModel, DataConverter}
import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition}
import org.apache.spark.sql.types.StructType
/**
@ -29,12 +30,11 @@ class CDMDataSourceReader(val modelUri: String,
/**
* Called by the Spark runtime. Reads the model.json to find the number of data partitions for the entity specified.
* @return A list of CDMDataReaderFactory instances, one for each partition.
*/
def createDataReaderFactories : java.util.ArrayList[DataReaderFactory[Row]]= {
val factoryList = new java.util.ArrayList[DataReaderFactory[Row]]
def planInputPartitions: java.util.ArrayList[InputPartition[InternalRow]]= {
val factoryList = new java.util.ArrayList[InputPartition[InternalRow]]
modelJsonParser.partitionLocations(entityName).foreach(csvUri => {
factoryList.add(new CDMDataReaderFactory(csvUri, readSchema(), dataConverter, adlProvider))
factoryList.add(new CDMInputPartition(csvUri, readSchema(), dataConverter, adlProvider))
})
factoryList
}

Просмотреть файл

@ -2,7 +2,8 @@ package com.microsoft.cdm.read
import com.microsoft.cdm.utils.{ADLGen2Provider, DataConverter}
import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.reader.InputPartition
import org.apache.spark.sql.types.StructType
/**
@ -12,9 +13,9 @@ import org.apache.spark.sql.types.StructType
* @param adlProvider Provider for ADLSgen2 data
* @param dataConverter Converts CSV data into types according to schema
*/
class CDMDataReaderFactory(var csvPath: String,
class CDMInputPartition(var csvPath: String,
var schema: StructType,
var dataConverter: DataConverter,
var adlProvider: ADLGen2Provider) extends DataReaderFactory[Row] {
def createDataReader = new CDMDataReader(csvPath, schema, adlProvider, dataConverter)
var adlProvider: ADLGen2Provider) extends InputPartition[InternalRow] {
def createPartitionReader = new CDMDataReader(csvPath, schema, adlProvider, dataConverter)
}

Просмотреть файл

@ -8,9 +8,9 @@ import scala.collection.JavaConverters._
import org.apache.commons.httpclient.HttpStatus
import org.apache.commons.io.{Charsets, IOUtils}
import org.apache.http.client.utils.URIBuilder
import org.apache.http.{HttpEntity, NameValuePair}
import org.apache.http.{HttpEntity, HttpResponse, NameValuePair}
import org.apache.http.entity.{FileEntity, StringEntity}
import org.apache.http.impl.client.{BasicResponseHandler, HttpClients}
import org.apache.http.impl.client.{BasicResponseHandler, DefaultHttpClient}
import org.apache.http.message.BasicNameValuePair
import scala.util.Try
@ -111,7 +111,7 @@ class ADLGen2Provider(aadProvider: AADProvider) extends Serializable {
"%s%smodel.json".format(modelDirectory, getSep(modelDirectory))
}
private def ensureSuccess(response: CloseableHttpResponse): Unit = {
private def ensureSuccess(response: HttpResponse): Unit = {
val statusCode = response.getStatusLine.getStatusCode
if(statusCode != HttpStatus.SC_OK)
{
@ -126,7 +126,7 @@ class ADLGen2Provider(aadProvider: AADProvider) extends Serializable {
private def buildURI(uri: String, params: Seq[NameValuePair]): URI = {
val uriBuilder = new URIBuilder(encodeURI(uri))
uriBuilder.addParameters(params.asJava)
params.foreach(pair => uriBuilder.addParameter(pair.getName, pair.getValue))
uriBuilder.build()
}
@ -141,7 +141,8 @@ class ADLGen2Provider(aadProvider: AADProvider) extends Serializable {
// TODO: ensure we're not leaking InputStreams
private def execute(request: HttpRequestBase): InputStream = {
val response = HttpClients.createDefault.execute(request)
val httpclient = new DefaultHttpClient
val response = httpclient.execute(request)
ensureSuccess(response)
response.getEntity.getContent
}

Просмотреть файл

@ -1,6 +1,7 @@
package com.microsoft.cdm.write
import com.microsoft.cdm.utils._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.types.StructType
@ -26,7 +27,7 @@ class CDMDataSourceWriter(val jobId: String,
val entityName: String,
val dataConvert: DataConverter) extends DataSourceWriter {
def createWriterFactory: DataWriterFactory[Row] = {
def createWriterFactory: DataWriterFactory[InternalRow] = {
new CDMDataWriterFactory(adlProvider, schema, jobId, modelDirectory, entityName)
}

Просмотреть файл

@ -6,6 +6,7 @@ import com.microsoft.cdm.utils.{ADLGen2Provider, CsvParserFactory, DataConverter
import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriterCommitMessage}
import org.apache.commons.io.FilenameUtils
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
import scala.collection.JavaConversions
@ -20,7 +21,7 @@ import scala.collection.JavaConversions
class CDMDataWriter(var outputCSVFilePath: String,
var schema: StructType,
var adlProvider: ADLGen2Provider,
var dataConverter: DataConverter) extends DataWriter[Row] {
var dataConverter: DataConverter) extends DataWriter[InternalRow] {
private val stream = new ByteArrayOutputStream()
private val writer = CsvParserFactory.buildWriter(new OutputStreamWriter(stream))
@ -29,9 +30,9 @@ class CDMDataWriter(var outputCSVFilePath: String,
* Called by Spark runtime. Writes a row of data to an in-memory csv file.
* @param row row of data to write.
*/
def write(row: Row): Unit = {
def write(row: InternalRow): Unit = {
// TODO: Univocity probably doesn't need all these array conversions
val strings: java.util.List[String] = JavaConversions.seqAsJavaList(row.toSeq.zipWithIndex.map{ case(col, index) =>
val strings: java.util.List[String] = JavaConversions.seqAsJavaList(row.toSeq(schema).zipWithIndex.map{ case(col, index) =>
dataConverter.dataToString(col, schema.fields(index).dataType)
})

Просмотреть файл

@ -2,6 +2,7 @@ package com.microsoft.cdm.write
import com.microsoft.cdm.utils.{ADLGen2Provider, DataConverter}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory}
import org.apache.spark.sql.types.StructType
@ -17,11 +18,11 @@ class CDMDataWriterFactory(var adlProvider: ADLGen2Provider,
var schema: StructType,
var jobId: String,
var modelDirectory: String,
var entityName: String) extends DataWriterFactory[Row] {
var entityName: String) extends DataWriterFactory[InternalRow] {
// TODO: error handling. we're basically assuming successful writes. Need to add logic to remove/rewrite files on failure.
def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
def createDataWriter(partitionId: Int, taskId: Long, epochId: Long): DataWriter[InternalRow] = {
new CDMDataWriter(
adlProvider.getFilePathForPartition(modelDirectory, entityName, partitionId),
schema,