Merge pull request #6 from slyons/master

Optimized the column metadata lookup to happen just once. This greatly improves performance with larger bulk loads and larger number of partitions.
This commit is contained in:
Arvind Shyamsundar 2018-05-23 13:47:50 -07:00 коммит произвёл GitHub
Родитель 93d2382389 eaa22efa2f
Коммит fa1cf19ed7
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
1 изменённых файлов: 42 добавлений и 15 удалений

Просмотреть файл

@ -31,6 +31,8 @@ import com.microsoft.azure.sqldb.spark.config.{Config, SqlDBConfig}
import com.microsoft.sqlserver.jdbc.SQLServerBulkCopy
import org.apache.spark.sql.{DataFrame, Row}
import scala.util.Try
/**
* Implicit functions for DataFrame
*/
@ -43,8 +45,45 @@ private[spark] case class DataFrameFunctions[T](@transient dataFrame: DataFrame)
* @param config the database connection properties and bulk copy properties
* @param metadata the metadata of the columns - will be null if not specified
*/
def bulkCopyToSqlDB(config: Config, metadata: BulkCopyMetadata = null): Unit = {
dataFrame.foreachPartition(iterator => bulkCopy(config, iterator, metadata))
def bulkCopyToSqlDB(config: Config, metadata: BulkCopyMetadata = null, createTable:Boolean = false): Unit = {
// Ensuring the table exists in the DB already
if(createTable) {
dataFrame.limit(0).write.sqlDB(config)
}
val actualMetadata = if(metadata == null) {
getConnectionOrFail(config).recover({
case e: ClassNotFoundException =>
logError("JDBC driver not found in class path", e)
throw e
case e1: SQLException =>
logError("Connection cannot be established to the database", e1)
throw e1
}).flatMap(conn => {
inferBulkCopyMetadata(config, conn)
}).recover({
case e: SQLException =>
logError("Column metadata not specified and cannot retrieve metadata from database", e)
throw e
}).get
} else {
metadata
}
dataFrame.foreachPartition(iterator => bulkCopy(config, iterator, actualMetadata))
}
private def getConnectionOrFail(config:Config):Try[Connection] = {
Try {
ConnectionUtils.getConnection(config)
}
}
private def inferBulkCopyMetadata(config: Config, connection:Connection):Try[BulkCopyMetadata] = {
val dbTable = config.get[String](SqlDBConfig.DBTable).get
Try {
val resultSetMetaData = BulkCopyUtils.getTableColumns(dbTable, connection)
BulkCopyUtils.createBulkCopyMetadata(resultSetMetaData)
}
}
/**
@ -71,19 +110,7 @@ private[spark] case class DataFrameFunctions[T](@transient dataFrame: DataFrame)
val dbTable = config.get[String](SqlDBConfig.DBTable).get
// Retrieves column metadata from external database table if user does not specify.
val bulkCopyMetadata =
if (metadata != null) {
metadata
} else {
try {
val resultSetMetaData = BulkCopyUtils.getTableColumns(dbTable, connection)
BulkCopyUtils.createBulkCopyMetadata(resultSetMetaData)
} catch {
case e: SQLException =>
logError("Column metadata not specified and cannot retrieve metadata from database", e)
throw e
}
}
val bulkCopyMetadata = metadata
var committed = false
val supportsTransactions = BulkCopyUtils.getTransactionSupport(connection)