diff --git a/build.sbt b/build.sbt index 2765a68..c6d5ea4 100644 --- a/build.sbt +++ b/build.sbt @@ -1,10 +1,10 @@ name := "spark-cdm" -version := "0.2" +version := "0.3" scalaVersion := "2.11.8" -libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.2" % "provided" +libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.1" % "provided" libraryDependencies += "com.microsoft.azure" % "adal4j" % "1.6.3" libraryDependencies += "com.univocity" % "univocity-parsers" % "2.7.6" diff --git a/release/spark-cdm-assembly-0.2.jar b/release/spark-cdm-assembly-0.3.jar similarity index 95% rename from release/spark-cdm-assembly-0.2.jar rename to release/spark-cdm-assembly-0.3.jar index 21d95c6..758e831 100644 Binary files a/release/spark-cdm-assembly-0.2.jar and b/release/spark-cdm-assembly-0.3.jar differ diff --git a/src/main/scala/com/microsoft/cdm/read/CDMDataReader.scala b/src/main/scala/com/microsoft/cdm/read/CDMDataReader.scala index 4a62a2e..10ebd88 100644 --- a/src/main/scala/com/microsoft/cdm/read/CDMDataReader.scala +++ b/src/main/scala/com/microsoft/cdm/read/CDMDataReader.scala @@ -5,7 +5,8 @@ import java.io.InputStream import com.microsoft.cdm.utils._ import com.univocity.parsers.csv.CsvParser import org.apache.spark.sql.Row -import org.apache.spark.sql.sources.v2.reader.DataReader +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources.v2.reader.InputPartitionReader import org.apache.spark.sql.types.{StringType, StructType} /** @@ -18,7 +19,7 @@ import org.apache.spark.sql.types.{StringType, StructType} class CDMDataReader(var remoteCSVPath: String, var schema: StructType, var adlProvider: ADLGen2Provider, - var dataConverter: DataConverter) extends DataReader[Row] { + var dataConverter: DataConverter) extends InputPartitionReader[InternalRow] { var parser: CsvParser = _ var stream: InputStream = _ @@ -43,7 +44,7 @@ class CDMDataReader(var remoteCSVPath: String, * Called by the Spark runtime if there is data left to read. * @return The next row of data. */ - def get: Row = { + def get: InternalRow = { val seq = row.zipWithIndex.map{ case (col, index) => val dataType = schema.fields(index).dataType if(col == null || (col.length == 0 && dataType != StringType)) { @@ -53,7 +54,7 @@ class CDMDataReader(var remoteCSVPath: String, dataConverter.jsonToData(schema.fields(index).dataType)(col) } } - Row.fromSeq(seq) + InternalRow.fromSeq(seq) } /** diff --git a/src/main/scala/com/microsoft/cdm/read/CDMDataSourceReader.scala b/src/main/scala/com/microsoft/cdm/read/CDMDataSourceReader.scala index 146a702..95eb4f0 100644 --- a/src/main/scala/com/microsoft/cdm/read/CDMDataSourceReader.scala +++ b/src/main/scala/com/microsoft/cdm/read/CDMDataSourceReader.scala @@ -2,7 +2,8 @@ package com.microsoft.cdm.read import com.microsoft.cdm.utils.{ADLGen2Provider, CDMModel, DataConverter} import org.apache.spark.sql.Row -import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition} import org.apache.spark.sql.types.StructType /** @@ -29,12 +30,11 @@ class CDMDataSourceReader(val modelUri: String, /** * Called by the Spark runtime. Reads the model.json to find the number of data partitions for the entity specified. - * @return A list of CDMDataReaderFactory instances, one for each partition. */ - def createDataReaderFactories : java.util.ArrayList[DataReaderFactory[Row]]= { - val factoryList = new java.util.ArrayList[DataReaderFactory[Row]] + def planInputPartitions: java.util.ArrayList[InputPartition[InternalRow]]= { + val factoryList = new java.util.ArrayList[InputPartition[InternalRow]] modelJsonParser.partitionLocations(entityName).foreach(csvUri => { - factoryList.add(new CDMDataReaderFactory(csvUri, readSchema(), dataConverter, adlProvider)) + factoryList.add(new CDMInputPartition(csvUri, readSchema(), dataConverter, adlProvider)) }) factoryList } diff --git a/src/main/scala/com/microsoft/cdm/read/CDMDataReaderFactory.scala b/src/main/scala/com/microsoft/cdm/read/CDMInputPartition.scala similarity index 71% rename from src/main/scala/com/microsoft/cdm/read/CDMDataReaderFactory.scala rename to src/main/scala/com/microsoft/cdm/read/CDMInputPartition.scala index cd6ec27..e60513b 100644 --- a/src/main/scala/com/microsoft/cdm/read/CDMDataReaderFactory.scala +++ b/src/main/scala/com/microsoft/cdm/read/CDMInputPartition.scala @@ -2,7 +2,8 @@ package com.microsoft.cdm.read import com.microsoft.cdm.utils.{ADLGen2Provider, DataConverter} import org.apache.spark.sql.Row -import org.apache.spark.sql.sources.v2.reader.DataReaderFactory +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources.v2.reader.InputPartition import org.apache.spark.sql.types.StructType /** @@ -12,9 +13,9 @@ import org.apache.spark.sql.types.StructType * @param adlProvider Provider for ADLSgen2 data * @param dataConverter Converts CSV data into types according to schema */ -class CDMDataReaderFactory(var csvPath: String, +class CDMInputPartition(var csvPath: String, var schema: StructType, var dataConverter: DataConverter, - var adlProvider: ADLGen2Provider) extends DataReaderFactory[Row] { - def createDataReader = new CDMDataReader(csvPath, schema, adlProvider, dataConverter) + var adlProvider: ADLGen2Provider) extends InputPartition[InternalRow] { + def createPartitionReader = new CDMDataReader(csvPath, schema, adlProvider, dataConverter) } diff --git a/src/main/scala/com/microsoft/cdm/utils/ADLGen2Provider.scala b/src/main/scala/com/microsoft/cdm/utils/ADLGen2Provider.scala index b58ec9a..10d2ebe 100644 --- a/src/main/scala/com/microsoft/cdm/utils/ADLGen2Provider.scala +++ b/src/main/scala/com/microsoft/cdm/utils/ADLGen2Provider.scala @@ -8,9 +8,9 @@ import scala.collection.JavaConverters._ import org.apache.commons.httpclient.HttpStatus import org.apache.commons.io.{Charsets, IOUtils} import org.apache.http.client.utils.URIBuilder -import org.apache.http.{HttpEntity, NameValuePair} +import org.apache.http.{HttpEntity, HttpResponse, NameValuePair} import org.apache.http.entity.{FileEntity, StringEntity} -import org.apache.http.impl.client.{BasicResponseHandler, HttpClients} +import org.apache.http.impl.client.{BasicResponseHandler, DefaultHttpClient} import org.apache.http.message.BasicNameValuePair import scala.util.Try @@ -111,7 +111,7 @@ class ADLGen2Provider(aadProvider: AADProvider) extends Serializable { "%s%smodel.json".format(modelDirectory, getSep(modelDirectory)) } - private def ensureSuccess(response: CloseableHttpResponse): Unit = { + private def ensureSuccess(response: HttpResponse): Unit = { val statusCode = response.getStatusLine.getStatusCode if(statusCode != HttpStatus.SC_OK) { @@ -126,7 +126,7 @@ class ADLGen2Provider(aadProvider: AADProvider) extends Serializable { private def buildURI(uri: String, params: Seq[NameValuePair]): URI = { val uriBuilder = new URIBuilder(encodeURI(uri)) - uriBuilder.addParameters(params.asJava) + params.foreach(pair => uriBuilder.addParameter(pair.getName, pair.getValue)) uriBuilder.build() } @@ -141,7 +141,8 @@ class ADLGen2Provider(aadProvider: AADProvider) extends Serializable { // TODO: ensure we're not leaking InputStreams private def execute(request: HttpRequestBase): InputStream = { - val response = HttpClients.createDefault.execute(request) + val httpclient = new DefaultHttpClient + val response = httpclient.execute(request) ensureSuccess(response) response.getEntity.getContent } diff --git a/src/main/scala/com/microsoft/cdm/write/CDMDataSourceWriter.scala b/src/main/scala/com/microsoft/cdm/write/CDMDataSourceWriter.scala index 98fc13c..4a22170 100644 --- a/src/main/scala/com/microsoft/cdm/write/CDMDataSourceWriter.scala +++ b/src/main/scala/com/microsoft/cdm/write/CDMDataSourceWriter.scala @@ -1,6 +1,7 @@ package com.microsoft.cdm.write import com.microsoft.cdm.utils._ +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.{Row, SaveMode} import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriterFactory, WriterCommitMessage} import org.apache.spark.sql.types.StructType @@ -26,7 +27,7 @@ class CDMDataSourceWriter(val jobId: String, val entityName: String, val dataConvert: DataConverter) extends DataSourceWriter { - def createWriterFactory: DataWriterFactory[Row] = { + def createWriterFactory: DataWriterFactory[InternalRow] = { new CDMDataWriterFactory(adlProvider, schema, jobId, modelDirectory, entityName) } diff --git a/src/main/scala/com/microsoft/cdm/write/CDMDataWriter.scala b/src/main/scala/com/microsoft/cdm/write/CDMDataWriter.scala index 3eaaa19..73a1a1b 100644 --- a/src/main/scala/com/microsoft/cdm/write/CDMDataWriter.scala +++ b/src/main/scala/com/microsoft/cdm/write/CDMDataWriter.scala @@ -6,6 +6,7 @@ import com.microsoft.cdm.utils.{ADLGen2Provider, CsvParserFactory, DataConverter import org.apache.spark.sql.Row import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriterCommitMessage} import org.apache.commons.io.FilenameUtils +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.StructType import scala.collection.JavaConversions @@ -20,7 +21,7 @@ import scala.collection.JavaConversions class CDMDataWriter(var outputCSVFilePath: String, var schema: StructType, var adlProvider: ADLGen2Provider, - var dataConverter: DataConverter) extends DataWriter[Row] { + var dataConverter: DataConverter) extends DataWriter[InternalRow] { private val stream = new ByteArrayOutputStream() private val writer = CsvParserFactory.buildWriter(new OutputStreamWriter(stream)) @@ -29,9 +30,9 @@ class CDMDataWriter(var outputCSVFilePath: String, * Called by Spark runtime. Writes a row of data to an in-memory csv file. * @param row row of data to write. */ - def write(row: Row): Unit = { + def write(row: InternalRow): Unit = { // TODO: Univocity probably doesn't need all these array conversions - val strings: java.util.List[String] = JavaConversions.seqAsJavaList(row.toSeq.zipWithIndex.map{ case(col, index) => + val strings: java.util.List[String] = JavaConversions.seqAsJavaList(row.toSeq(schema).zipWithIndex.map{ case(col, index) => dataConverter.dataToString(col, schema.fields(index).dataType) }) diff --git a/src/main/scala/com/microsoft/cdm/write/CDMDataWriterFactory.scala b/src/main/scala/com/microsoft/cdm/write/CDMDataWriterFactory.scala index 8bd14e9..3ac7395 100644 --- a/src/main/scala/com/microsoft/cdm/write/CDMDataWriterFactory.scala +++ b/src/main/scala/com/microsoft/cdm/write/CDMDataWriterFactory.scala @@ -2,6 +2,7 @@ package com.microsoft.cdm.write import com.microsoft.cdm.utils.{ADLGen2Provider, DataConverter} import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory} import org.apache.spark.sql.types.StructType @@ -17,11 +18,11 @@ class CDMDataWriterFactory(var adlProvider: ADLGen2Provider, var schema: StructType, var jobId: String, var modelDirectory: String, - var entityName: String) extends DataWriterFactory[Row] { + var entityName: String) extends DataWriterFactory[InternalRow] { // TODO: error handling. we're basically assuming successful writes. Need to add logic to remove/rewrite files on failure. - def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = { + def createDataWriter(partitionId: Int, taskId: Long, epochId: Long): DataWriter[InternalRow] = { new CDMDataWriter( adlProvider.getFilePathForPartition(modelDirectory, entityName, partitionId), schema,