This commit is contained in:
Daniel Thorn 2018-06-12 12:24:01 -07:00
Родитель 7e68043726
Коммит c6f1ec2c09
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 0CEF54EC2D4A9FE5
13 изменённых файлов: 1542 добавлений и 0 удалений

7
.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,7 @@
target/
lib_managed/
metastore_db/
.idea/
derby.log
*.iml
.DS_Store

20
.travis.yml Normal file
Просмотреть файл

@ -0,0 +1,20 @@
language: scala
sudo: false
cache:
directories:
- $HOME/.ivy2
- $HOME/.sbt
matrix:
include:
- jdk: openjdk8
scala: 2.11.8
python: 3.5
env: TEST_SPARK_VERSION="2.0.2" AWS_ACCESS_KEY_ID=foo AWS_SECRET_ACCESS_KEY=bar
install:
- pip install moto[server] --user
script:
- sbt -Dspark.testVersion=$TEST_SPARK_VERSION ++$TRAVIS_SCALA_VERSION coverage test coverageReport
- sbt ++$TRAVIS_SCALA_VERSION scalastyle
- sbt ++$TRAVIS_SCALA_VERSION "test:scalastyle"
after_success:
- bash <(curl -s https://codecov.io/bash)

Просмотреть файл

@ -0,0 +1,54 @@
[![Build Status](https://travis-ci.org/mozilla/mozdata.svg?branch=master)](https://travis-ci.org/mozilla/mozdata)
[![codecov.io](https://codecov.io/github/mozilla/mozdata/coverage.svg?branch=master)](https://codecov.io/github/mozilla/mozdata?branch=master)
# MozData
A consistent API for accessing Mozilla data that reports usage to Mozilla
```
## Using MozData in scala
In SBT:
```sbt
resolvers += "S3 local maven snapshots" at "s3://net-mozaws-data-us-west-2-ops-mavenrepo/snapshots"
libraryDependencies += "com.mozilla.telemetry" %% "mozdata" % "0.1-SNAPSHOT"
## Data Collection
This api sends usage data to telemetry when the env var `TELEMETRY_URL` is properly set.
On Mozilla servers `TELEMETRY_URL` should be set to `https://incoming.telemetry.mozilla.org`
To disable data collection in environment variables:
```bash
unset TELEMETRY_URL
```
or in scala:
```scala
import com.mozilla.telemetry.mozdata.MozData
val api: MozData = MozData(telemetryUrl=None)
```
To enable printing what you are or would be sending in log4j.properties (probably in src/main/resources or src/test/resources):
```properties
log4j.logger.com.mozilla.telemetry.mozdata.MozData=DEBUG
```
or in scala:
```scala
import org.apache.log4j.{Logger, Level}
Logger.getLogger(classOf[MozData]).setLevel(Level.DEBUG)
```
## Development
Run tests with sbt
```bash
sbt test scalastyle test:scalastyle
```
# License
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at http://mozilla.org/MPL/2.0/.

1
VERSION Normal file
Просмотреть файл

@ -0,0 +1 @@
0.1-SNAPSHOT

45
build.sbt Normal file
Просмотреть файл

@ -0,0 +1,45 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
scalacOptions ++= Seq(
"-Ywarn-unused",
"-Ywarn-unused-import"
)
name := "mozdata"
version := scala.io.Source.fromFile("VERSION").mkString
scalaVersion := "2.11.8"
organization := "com.mozilla.telemetry"
homepage := Some(url("http://github.com/mozilla/mozdata"))
resolvers += "S3 local maven snapshots" at "https://s3-us-west-2.amazonaws.com/net-mozaws-data-us-west-2-ops-mavenrepo/snapshots"
val sparkVersion = "2.2.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-hive" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.scalaj" %% "scalaj-http" % "2.4.0",
"com.mozilla.telemetry" %% "moztelemetry" % "1.1-SNAPSHOT",
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"com.github.tomakehurst" % "wiremock-standalone" % "2.14.0" % "test",
"com.holdenkarau" %% "spark-testing-base" % s"${sparkVersion}_0.9.0" % "test",
"io.findify" %% "s3mock" % "0.2.5" % "test"
)
test in assembly := {}
testOptions in Test := Seq(
// -oD add duration reporting; see http://www.scalatest.org/user_guide/using_scalatest_with_sbt
Tests.Argument("-oD")
)
val scalaStyleConfigUrl = Some(url("https://raw.githubusercontent.com/mozilla/moztelemetry/master/scalastyle-config.xml"))
(scalastyleConfigUrl in Compile) := scalaStyleConfigUrl
(scalastyleConfigUrl in Test) := scalaStyleConfigUrl

1
project/build.properties Normal file
Просмотреть файл

@ -0,0 +1 @@
sbt.version=1.0.3

10
project/plugins.sbt Normal file
Просмотреть файл

@ -0,0 +1,10 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
resolvers += "bintray-spark-packages" at "https://dl.bintray.com/spark-packages/maven/"
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")

Просмотреть файл

@ -0,0 +1,78 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package com.mozilla.telemetry.mozdata
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
object Hadoop {
/** List the directories and files within a directory using hadoop
*
* @param directory path to the directory to list
* @return Some((dirnames, filenames)) found in the directory or
* None if the path doesn't exist. omits symlinks.
*/
def ls(directory: String): Option[(List[String], List[String])] = {
val path = new Path(directory)
val fs = FileSystem.get(path.toUri, new Configuration())
if (!fs.exists(path)) {None} else {
val listing = fs.listStatus(path).toList
val dirNames = listing.collect {
case status if status.isDirectory => status.getPath.getName
}
val fileNames = listing.collect {
case status if status.isFile => status.getPath.getName
}
Some((dirNames, fileNames))
}
}
def lsOrNil(directory: String): (List[String], List[String]) =
ls(directory).getOrElse((Nil, Nil))
/** Delete a directory or file using hadoop
*
* @param pathString path to the directory or file to delete using hadoop
*/
def rm(pathString: String, recursive: Boolean=true): Unit = {
val path = new Path(pathString)
val fs = FileSystem.get(path.toUri, new Configuration())
if (fs.exists(path)) {
fs.delete(path, recursive)
}
}
/** Read utf-8 file contents using hadoop
*
* @param file path to the file to read
* @return contents of file, decoded using utf-8
*/
def read(file: String): String = {
val path = new Path(file)
val fs = FileSystem.get(path.toUri, new Configuration())
val fp = fs.open(path)
try {
IOUtils.toString(fp, "UTF-8")
} finally {
fp.close()
}
}
/** Write file contents using hadoop
*
* @param file path to the file to write
* @param body content to write
*/
def write(file: String, body: String): Unit = {
val path = new Path(file)
val fs = FileSystem.get(path.toUri, new Configuration())
val fp = fs.create(path)
try {
fp.write(body.getBytes())
} finally {
fp.close()
}
}
}

Просмотреть файл

@ -0,0 +1,374 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package com.mozilla.telemetry.mozdata
import java.util.UUID.randomUUID
import com.mozilla.telemetry.heka.{Dataset, Message}
import com.mozilla.telemetry.mozdata.Utils.{getTableInfo,isVersion,sparkListTables}
import com.mozilla.telemetry.utils.S3Store
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.json4s.jackson.Serialization.{read, write}
import org.json4s.{DefaultFormats, Formats}
import org.apache.spark.sql.{DataFrame,DataFrameReader,DataFrameWriter,Row,SparkSession}
import scalaj.http.Http
import scala.collection.immutable.ListMap
import scala.io.Source
/** A consistent API for accessing Mozilla data that reports usage to Mozilla
*
* example:
*
* val api = MozData(spark)
* api.write(
* api.read("main_summary")
* .where("submission_date_s3='20180101'")
* .groupBy("submission_date_s3", "channel")
* .agg(countDistinct("client_id").as("dau"))
* )
*
* @param spark spark session used to access data
* @param adHocTablesDir filesystem location of ad hoc tables
* @param globalTablesDir filesystem location of global tables
* @param defaultMetadataUpdateMethods default list of methods to use when
* updating table metadata, default is
* List("sql_repair", "sql_refresh")
* @param readConfig optional function used to configure all DataFrameReaders
* @param writeConfig optional function used to configure all DataFrameWriters
* @param telemetryUrl optional url where logs should be posted
*/
class MozData(spark: SparkSession, adHocTablesDir: String, globalTablesDir: String,
defaultMetadataUpdateMethods: List[String],
readConfig: Function[DataFrameReader,DataFrameReader],
writeConfig: Function[DataFrameWriter[Row],DataFrameWriter[Row]],
telemetryUrl: Option[String]){
final val listRDDsBucket: String = "net-mozaws-prod-us-west-2-pipeline-metadata"
val apiVersion: String = Source.fromFile("VERSION").mkString
val logger: Logger = Logger.getLogger(classOf[MozData])
if (telemetryUrl.isDefined) {
logger.debug(s"telemetryUrl: ${telemetryUrl.get}")
}
/** Report an interaction with this api */
private def log(action: String, event: Map[String, Option[String]]): Unit = {
implicit val formats: Formats = DefaultFormats
val ping: String = write(ListMap((event.collect {
case (k, Some(v)) => k -> v
} + ("apiVersion" -> apiVersion, "apiCall" -> action)).toList.sorted:_*))
logger.debug(s"$ping")
if (telemetryUrl.isDefined) {
val url = s"${telemetryUrl.get}/submit/mozdata/event/1/${randomUUID.toString}"
Http(url).postData(ping).header("content-type", "application/json").asString
}
}
/** List the rdds available to readRDD
*
* example:
*
* // list raw dataset names
* val api = MozData(spark)
* api.listRDDs().foreach(_("name"))
*
* @return list of source metadata objects, each updated with name of source
*/
def listRDDs(): List[Map[String,String]] = {
log("listRDDs", Map("sourcesJson" -> Some(s"s3://$listRDDsBucket/sources.json")))
implicit val formats: Formats = DefaultFormats
val sources = read[Map[String,Map[String,String]]](
Source.fromInputStream(S3Store.getKey(listRDDsBucket, "sources.json")).mkString
)
sources.map{kv => kv._2 + ("name" -> kv._1)}.toList
}
/** List the tables available to readTable
*
* example:
*
* val api = MozData(spark)
* // list global tables
* api.listTables()
*
* // list nobody@mozilla.com's tables
* api.listTables(owner=Some("nobody@mozilla.com"))
*
* @param owner optional email that identifies non-global namespace
* @return list of table names
*/
def listTables(owner: Option[String] = None): List[String] = {
log("listTables", Map(
"owner" -> owner,
"adHocTablesDir" -> owner.map(_=>adHocTablesDir)
))
if (owner.isDefined) {
val tablesUri = s"$adHocTablesDir/${owner.get}"
Hadoop.lsOrNil(tablesUri)._1.filter { table =>
Hadoop.lsOrNil(s"$tablesUri/$table")._1.exists(isVersion)
}
} else {
sparkListTables(spark)
}
}
/** Read a raw dataset
*
* example:
*
* // read a little bit of raw telemetry
* val api = MozData(spark)
* val rdd = api.readRDD(
* "telemetry",
* where={_.where("sourceVersion"){case "4" => true}},
* fileLimit=1
* )
*
* @param name dataset source name
* @param where clauses passed to Dataset.where
* @param fileLimit passed to Dataset.records
* @param minPartitions passed to Dataset.records
* @return Messages read
*/
def readRDD(name: String, where: Function[Dataset,Dataset] = identity,
fileLimit: Option[Int] = None,
minPartitions: Option[Int] = None): RDD[Message] = {
log("readRDD", Map("name" -> Some(name)))
where(Dataset(name)).records(fileLimit, minPartitions)(spark.sparkContext)
}
/** Read a table
*
* example:
*
* val api = MozData(spark)
*
* // read a global table
* val clientsDaily = api.readTable("clients_daily")
*
* // read v1 of nobody@mozilla.com's special_dau table
* val specialDauV1 = api.readTable(
* tableName="special_dau",
* owner=Some("nobody@mozilla.com"),
* version=Some("v1"),
* extraReadConfig={_.option("mergeSchema", "true")}
* )
*
* // read a json special_dau table defined by an s3 path
* val specialDauV2 = api.readTable(
* tableName="special_dau",
* uri=Some("s3://special-bucket/special_dau/v2"),
* extraReadConfig={_.format("json")}
* )
*
* @param tableName table to read
* @param version optional specific version of table, defaults to "v1" for
* new tables and the latest version for existing tables
* @param owner optional email that identifies non-global namespace
* @param uri optional non-standard location for this table
* @param extraReadConfig optional function to configure the DataFrameReader
* @return DataFrame of the requested table
*/
def readTable(tableName: String, version: Option[String] = None,
owner: Option[String] = None, uri: Option[String] = None,
extraReadConfig: Function[DataFrameReader,DataFrameReader] = identity
): DataFrame = {
val tableInfo = getTableInfo(
tableName=tableName,
version=version,
owner=owner,
uri=uri,
spark=spark,
adHocTablesDir=adHocTablesDir,
globalTablesDir=globalTablesDir
)
log(
action="readTable",
Map(
"detectedUri" -> tableInfo.uri,
"detectedVersion" -> tableInfo.version,
"owner" -> owner,
"sqlTableName" -> tableInfo.sqlTableName,
"tableName" -> Some(tableName),
"uri" -> uri,
"version" -> version
)
)
val reader = extraReadConfig(readConfig(spark.read))
if (tableInfo.inCatalog) {
reader.table(tableInfo.sqlTableName.get)
} else {
reader.load(tableInfo.uri.get)
}
}
/** Execute a SparkSQL query */
def sql(query: String): DataFrame = {
log("sql", Map("query" -> Some(query)))
spark.sql(query)
}
/** Write table to long term storage
*
* example:
*
* val api = MozData(spark)
* val myDF = (0 to 5)
* .map(v=>("20180101", v, "beta"))
* .toDF("submission_date_s3", "test_value", "channel")
*
* // append new partitions to a global table
* api.writeTable(
* df=myDF,
* tableName="clients_daily",
* version=Some("v4"),
* extraWriteConfig={_.mode("append").partitionBy("submission_date_s3")}
* // not a partitioned table, so exclude "sql_repair" from update methods
* metadataUpdateMethods=List("sql_refresh")
* )
*
* // write a single date to the latest version of nobody@mozilla.com's special_dau table
* api.writeTable(
* df=myDF.where("submission_date_s3='20180101'").drop("submission_date_s3"),
* tableName="special_dau",
* partitionValues=List(("submission_date_s3","20180101")),
* owner=Some("nobody@mozilla.com"),
* extraWriteConfig={_.mode("overwrite").partitionBy("channel")}
* )
*
* // write a json table to a specific s3 path
* api.writeTable(
* df=myDF.where("channel='beta'").drop("channel"),
* tableName="special_dau",
* uri=Some("s3://special-bucket/special_dau/v2"),
* extraReadConfig={_.format("json")},
* )
*
* // write a non-partitioned global table
* api.writeTable(
* df=myDF,
* tableName="special_list",
* version=Some("v1"),
* extraWriteConfig={_.mode("overwrite")}
* // not a partitioned table, so exclude "sql_repair" from update methods
* metadataUpdateMethods=List("sql_refresh")
* )
*
* @param df DataFrame to write
* @param tableName table to write
* @param partitionValues optional ordered list of key-value static partition
* identifiers, which must be absent from df
* @param version specific version of table, required for global tables,
* defaults to latest or "v1" if latest can't be determined
* @param owner optional email that identifies non-global namespace
* @param uri optional non-standard location for this table
* @param metadataUpdateMethods optional methods to use to update metadata
* after writing partitioned global tables,
* default is List("sql_repair", "sql_refresh")
* WARNING default "sql_repair" method
* uses "MSCK REPAIR TABLE" which will throw
* an exception if the table is not partitioned
* @param extraWriteConfig optional function to configure the DataFrameWriter
*/
def writeTable(df: DataFrame, tableName: String,
partitionValues: List[(String,String)] = Nil,
version: Option[String] = None, owner: Option[String] = None,
uri: Option[String] = None,
extraWriteConfig: Function[DataFrameWriter[Row],DataFrameWriter[Row]] = identity,
metadataUpdateMethods: List[String] = defaultMetadataUpdateMethods): Unit = {
require(
version.isDefined || owner.isDefined || uri.isDefined,
"version required to write global table"
)
val tableInfo = getTableInfo(
tableName=tableName,
version=version,
owner=owner,
uri=uri,
spark=spark,
adHocTablesDir=adHocTablesDir,
globalTablesDir=globalTablesDir
)
require(
tableInfo.uri.isDefined,
s"table is not external: ${tableInfo.sqlTableName.getOrElse(tableName)}"
)
// maybe find partition string
val partitionValuesString = partitionValues
.map{p=>s"${p._1}=${p._2}"}
.reduceLeftOption((a: String, b) => s"$a/$b")
// build uri
val detectedUri: String = List(
tableInfo.uri,
partitionValuesString
).flatten.mkString("/")
if (!tableInfo.inCatalog && owner.isEmpty && uri.isEmpty) {
logger.warn(s"writing non-catalog global table: $detectedUri")
}
log(
"writeTable",
Map(
"detectedUri" -> Some(detectedUri),
"detectedVersion" -> tableInfo.version,
"owner" -> owner,
"partition" -> partitionValuesString,
"sqlTableName" -> tableInfo.sqlTableName,
"tableName" -> Some(tableName),
"uri" -> uri,
"version" -> version
)
)
extraWriteConfig(writeConfig(df.write)).save(detectedUri)
if (tableInfo.inCatalog) {
// update metadata on catalog tables
metadataUpdateMethods.foreach{
case "sql_repair" => spark.sql(s"MSCK REPAIR TABLE `${tableInfo.sqlTableName.get}`")
case "sql_refresh" => spark.sql(s"REFRESH TABLE `${tableInfo.sqlTableName.get}`")
case value => throw new IllegalArgumentException(s"Unsupported metadata location: $value")
}
}
}
}
object MozData {
def apply(spark: SparkSession,
adHocTablesDir: String = sys.env.getOrElse(
"AD_HOC_TABLES_DIR",
"s3://net-mozaws-prod-us-west-2-pipeline-analysis"
),
globalTablesDir: String = sys.env.getOrElse(
"GLOBAL_TABLES_DIR",
"s3://telemetry-parquet"
),
defaultMetadataUpdateMethods: List[String] = sys.env.getOrElse(
"DEFAULT_METADATA_UPDATE_METHODS",
"sql_repair,sql_refresh"
).split(",").toList,
readConfig: Function[DataFrameReader,DataFrameReader] = identity,
writeConfig: Function[DataFrameWriter[Row],DataFrameWriter[Row]] = identity,
telemetryUrl: Option[String] = sys.env.get("TELEMETRY_URL")
): MozData = new MozData(
spark=spark,
adHocTablesDir=adHocTablesDir,
globalTablesDir=globalTablesDir,
defaultMetadataUpdateMethods=defaultMetadataUpdateMethods,
readConfig=readConfig,
writeConfig=writeConfig,
telemetryUrl=telemetryUrl
)
}

Просмотреть файл

@ -0,0 +1,130 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package com.mozilla.telemetry.mozdata
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import scala.util.matching.Regex
object Utils {
final val LocationInCreateTableStatementRegex: Regex = "(?s).*LOCATION[^']+'([^']+)'.*".r
/** check if version is in MozData version format */
def isVersion(version: String): Boolean = version.matches("v[0-9]+")
/** try to get table version from uri */
def uriVersion(uri: Option[String]): Option[String] = {
uri.getOrElse("").split("/").filter(!_.contains('=')).last match {
case version if isVersion(version) => Some(version)
case _ => None
}
}
/** list tables available in spark sql
*
* @param spark spark session used to access data
* @param tableName optional specific table name to check for
*/
def sparkListTables(spark: SparkSession,
tableName: Option[String] = None): List[String] = {
spark
.sql(s"SHOW TABLES ${tableName.map(t=>s"'$t'").getOrElse("")}")
.select("tableName")
.collect()
.map{t => t.getString(0)}
.toList
}
/** Extract non-catalog table info
*
* reusable code for getTableInfo
*
* @param tableUri uri of table without version
* @param version optional specific version of table
* @return TableInfo(uri, version) based on detected version
*/
def getFsTable(tableUri: String, version: Option[String]): TableInfo = {
val detectedVersion = version.getOrElse(
Hadoop
.lsOrNil(tableUri)._1
.filter(isVersion) // versions present in hadoop
.padTo(1, "v1") // new table with no versions fall back to "v1" for writing
.maxBy(_.substring(1).toInt) // latest
)
TableInfo(Some(s"$tableUri/$detectedVersion"), Some(detectedVersion))
}
/** Locate information needed for accessing the given table
*
* @param tableName table to look up
* @param version optional specific version of table
* @param owner optional email that identifies non-global namespace
* @param uri optional non-standard location for this table
* @param spark spark session used to access data
* @param adHocTablesDir filesystem location of ad hoc tables
* @param globalTablesDir filesystem location of global tables
* @return TableInfo(uri, version, sqlTableName) =>
* uri: detected uri, may be None if sqlTableName was found,
* but does not represent an external table
* version: detected version, may be None if version was not
* specified and could not be detected
* sqlTableName: name that should be used to reference the
* table in sql, suffixed with _${version} if
* version was specified, only present for
* global tables (no owner or uri specified)
* that are found in the catalog (table found
* by running "SHOW TABLES '$sqlTableName'")
*/
def getTableInfo(tableName: String, version: Option[String],
owner: Option[String], uri: Option[String],
spark: SparkSession, adHocTablesDir: String,
globalTablesDir: String): TableInfo = (uri, owner) match {
// uri provided
case (Some(u), _) => TableInfo(uri, uriVersion(uri))
// owner provided
case (_, Some(o)) => getFsTable(s"$adHocTablesDir/$o/$tableName", version)
// global table
case _ =>
val sqlTableName = version match {
case Some(value) => s"${tableName}_$value"
case None => tableName
}
if (sparkListTables(spark, Some(sqlTableName)).length < 1) {
// table does not exist in catalog
getFsTable(s"$globalTablesDir/$tableName", version)
} else {
// table exists in catalog
try {
// sql throws NoSuchTableException if table is temporary or a view
val createTableStatement = spark
.sql(s"SHOW CREATE TABLE `$sqlTableName`")
.select("createtab_stmt")
.take(1)
.head
.getString(0)
createTableStatement match {
// found location
case LocationInCreateTableStatementRegex(location) => TableInfo(
Some(location),
uriVersion(Some(location)),
Some(sqlTableName)
)
// throw NoSuchTableException because table is not external
case _ => throw new NoSuchTableException(db="default", table=sqlTableName)
}
} catch {
// no external table by that name
case _: NoSuchTableException => TableInfo(None, version, Some(sqlTableName))
}
}
}
case class TableInfo(uri: Option[String] = None, version: Option[String] = None,
sqlTableName: Option[String] = None) {
// whether or not this table is in the catalog
val inCatalog: Boolean = sqlTableName.isDefined
}
}

Просмотреть файл

@ -0,0 +1,32 @@
# Spark, Hadoop, and hive default log level
log4j.logger.org.apache=WARN
log4j.logger.hive=WARN
# Silence info logs from hive dependency
log4j.category.DataNucleus=WARN
# Silence info about table stats
log4j.category.hive.log=WARN
# Silence warnings about writing to non-catalog global tables
log4j.logger.com.mozilla.telemetry.mozdata.MozData=ERROR
# Silence hive storage warnings
log4j.logger.org.apache.hadoop.hive.metastore.ObjectStore=ERROR
log4j.logger.org.apache.hadoop.hive.metastore.HiveMetaStore=ERROR
# Silence logged error about default database already existing that is retried
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
# Silence hadoop native code warning
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
# Silence large-task warnings
log4j.logger.org.apache.spark.scheduler.TaskSetManager=ERROR
# Silence re-used spark context warnings
log4j.logger.org.apache.spark.SparkContext=ERROR
# Silence re-used spark context warnings
log4j.logger.org.apache.spark.sql.SparkSession$Builder=ERROR
log4j.logger.org.apache.spark.sql.internal.SharedState=ERROR
# Silence oversized string representation warnings
log4j.logger.org.apache.spark.util.Utils=ERROR
# Silence wiremock warnings
log4j.logger.wiremock=ERROR
# Silence s3mock INFO
log4j.logger.io.findify.s3mock=WARN
# Silence s3 warning about content length
log4j.logger.com.amazonaws.services.s3.AmazonS3Client=ERROR

Просмотреть файл

@ -0,0 +1,105 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package com.mozilla.telemetry.mozdata
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream}
import com.amazonaws.services.s3.model.ObjectMetadata
import com.google.protobuf.ByteString.copyFromUtf8
import com.mozilla.telemetry.heka.{Header, RichMessage}
import com.mozilla.telemetry.utils.S3Store
object DatasetTest {
val message = RichMessage(
"1234",
Map(
"bytes" -> copyFromUtf8("foo"),
"string" -> "foo",
"bool" -> true,
"double" -> 4.2,
"integer" -> 42L,
"string-with-int-value" -> "42",
"submission" ->
"""
| {
| "partiallyExtracted" : {
| "alpha" : "1",
| "beta" : "2"
| },
| "gamma": "3"
| }
""".stripMargin,
"extracted.subfield" -> """{"delta": "4"}""",
"extracted.nested.subfield"-> """{"epsilon": "5"}""",
"partiallyExtracted.nested" -> """{"zeta": "6"}"""
),
None
)
val header = Header(message.toByteArray.length)
private val framedMessage = {
val baos = new ByteArrayOutputStream
val bHeader = header.toByteArray
val bMessage = message.toByteArray
// see https://hekad.readthedocs.org/en/latest/message/index.html
baos.write(0x1E)
baos.write(bHeader.length)
baos.write(bHeader, 0, bHeader.length)
baos.write(0x1F)
baos.write(bMessage, 0, bMessage.length)
baos.toByteArray
}
def hekaFile(numRecords: Integer = 42, framedMessage: Array[Byte] = framedMessage): Array[Byte] = {
val ba = new Array[Byte](numRecords*framedMessage.length)
for (i <- 0 until numRecords) {
System.arraycopy(framedMessage, 0, ba, i*framedMessage.length, framedMessage.length)
}
ba
}
private val client = S3Store.s3
private val rddBucket = "net-mozaws-prod-us-west-2-pipeline-metadata"
def beforeAll(): Unit = {
client.setEndpoint("http://127.0.0.1:8001")
client.createBucket(rddBucket)
client.putObject(
rddBucket,
"sources.json",
s"""
|{
| "test": {
| "prefix": "test",
| "metadata_prefix": "test",
| "bucket": "$rddBucket"
| }
|}
""".stripMargin
)
client.putObject(
rddBucket,
"test/schema.json",
s"""
|{
| "dimensions": [
| { "field_name": "key" }
| ]
|}
""".stripMargin
)
def hekaStream: InputStream = new ByteArrayInputStream(hekaFile())
client.putObject(rddBucket, "test/val1/x", hekaStream, new ObjectMetadata())
client.putObject(rddBucket, "test/val2/x", hekaStream, new ObjectMetadata())
}
def afterAll(): Unit = {
val bucket = client.bucket(rddBucket).get
client.keys(bucket).foreach(x => client.deleteObject(rddBucket, x))
client.deleteBucket(bucket)
}
}

Просмотреть файл

@ -0,0 +1,685 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package com.mozilla.telemetry.mozdata
import com.github.tomakehurst.wiremock.WireMockServer
import com.github.tomakehurst.wiremock.client.WireMock
import com.github.tomakehurst.wiremock.client.WireMock.{aResponse,equalToJson,post,postRequestedFor,stubFor,urlMatching,verify}
import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig
import com.holdenkarau.spark.testing.DataFrameSuiteBase
import com.holdenkarau.spark.testing.Utils.createTempDir
import org.json4s.jackson.Serialization.writePretty
import org.json4s.{DefaultFormats,Formats,JArray,JField,JNothing,JObject,JValue}
import org.apache.spark.sql.Row
import org.scalatest.{FlatSpec, Matchers}
import io.findify.s3mock.S3Mock
class MozDataTest extends FlatSpec with Matchers with DataFrameSuiteBase {
private val s3 = S3Mock(port = 8001)
private val wireMockServer = new WireMockServer(wireMockConfig().port(9876))
private val tempDir = createTempDir().toPath.toString
private val adHocTablesDir = s"$tempDir/ad_hoc_tables"
private val globalTablesDir = s"$tempDir/global_tables"
lazy val api: MozData = MozData(
spark=spark,
adHocTablesDir=adHocTablesDir,
globalTablesDir=globalTablesDir,
readConfig={r=>r.format("json")},
writeConfig={w=>w.format("json")},
telemetryUrl=Some("http://localhost:9876")
)
override def beforeAll: Unit = {
// start wire mock for telemetry
wireMockServer.start()
WireMock.configureFor("localhost", 9876)
stubFor(post(urlMatching("^/submit/mozdata/event/1/([a-f0-9-]{36})$"))
.willReturn(aResponse().withStatus(200)))
// start s3Mock
s3.start
// send s3 requests to s3Mock for read_rdd and list_rdds
DatasetTest.beforeAll()
// set up spark
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
DatasetTest.afterAll()
s3.shutdown
WireMock.reset()
wireMockServer.stop()
}
def verifyTelemetry(count: Int, action: String, event: Map[String,String] = Map()): Unit = {
implicit val formats: Formats = DefaultFormats
verify(count, postRequestedFor(
urlMatching(s"/submit/mozdata/event/1/([a-f0-9-]{36})")
).withRequestBody(equalToJson(writePretty(Map("apiVersion" -> api.apiVersion, "apiCall" -> action)++event))))
}
"telemetry" must "not send when disabled" in {
MozData(spark).sql("SELECT 1")
verify(0, postRequestedFor(urlMatching(".*")))
}
"listRDDs" must "list rdds" in {
api.listRDDs should be (Array(Map[String,String](
"name" -> "test",
"prefix" -> "test",
"metadata_prefix" -> "test",
"bucket" -> "net-mozaws-prod-us-west-2-pipeline-metadata"
)))
verifyTelemetry(1, "listRDDs", Map(
"sourcesJson" -> s"s3://net-mozaws-prod-us-west-2-pipeline-metadata/sources.json"
))
}
"listTables" must "list global tables" in {
// make sure tables don't exist
api.listTables().sorted should be (List())
// create temporary tables
import spark.implicits._
List(1).toDF.createOrReplaceTempView("table1")
List(2).toDF.createOrReplaceTempView("table2")
api.listTables().sorted should be (List("table1", "table2"))
verifyTelemetry(2, "listTables", Map())
}
it must "list ad hoc tables" in {
val owner = "listTables"
// make sure tables don't exist
Hadoop.rm(s"$adHocTablesDir/$owner")
api.listTables(owner=Some(owner)).sorted should be (Array())
// create tables
import spark.implicits._
api.writeTable(
df=List[String]().toDF,
tableName="table1",
owner=Some(owner)
)
api.writeTable(
df=List[String]().toDF,
tableName="table2",
owner=Some(owner)
)
api.listTables(owner=Some(owner))
.sorted should be (Array("table1", "table2"))
verifyTelemetry(2, "listTables", Map(
"owner" -> owner,
"adHocTablesDir" -> adHocTablesDir
))
}
"readRDD" must "read rdd" in {
def sortJValue(in: JValue): JValue = {
in match {
case JObject(fields) => JObject(fields
.map{v=>JField(v._1,sortJValue(v._2))}
.sortBy(_._1)
)
case JArray(elements) => JArray(elements.map{v=>sortJValue(v)})
case _ => in
}
}
val messages = api.readRDD(
name="test"
).map{ message =>
implicit val formats: Formats = DefaultFormats
writePretty(sortJValue(message.toJValue.getOrElse(JNothing)))
}
messages.collect.foreach{ message =>
message should be (
"""{
| "extracted" : {
| "nested" : {
| "subfield" : {
| "epsilon" : "5"
| }
| },
| "subfield" : {
| "delta" : "4"
| }
| },
| "gamma" : "3",
| "meta" : {
| "Timestamp" : 0,
| "bool" : true,
| "bytes" : "foo",
| "double" : 4.2,
| "integer" : 42,
| "string" : "foo",
| "string-with-int-value" : "42"
| },
| "partiallyExtracted" : {
| "alpha" : "1",
| "beta" : "2",
| "nested" : {
| "zeta" : "6"
| }
| }
|}""".stripMargin
)
}
messages.count should be (84)
api.readRDD("test", where={w=>w.where("key"){case "val1" => true}}).count should be (42)
api.readRDD("test", where={w=>w.where("key"){case "val2" => true}}).count should be (42)
api.readRDD("test", fileLimit=Some(1)).count should be (42)
api.readRDD("test", minPartitions=Some(2)).partitions.length should be (2)
verifyTelemetry(5, "readRDD", Map("name" -> "test"))
}
"readTable" must "read latest ad hoc table" in {
val (owner, tableName) = ("read_table", "read_table")
var version = "v0"
def uri: String = s"$adHocTablesDir/$owner/$tableName/$version"
Hadoop.write(uri, version)
version = "v1"
Hadoop.write(uri, version)
api.readTable(
tableName=tableName,
owner=Some(owner),
extraReadConfig={r=>r.format("csv")}
).collect() should be (List(Row(Hadoop.read(uri))))
verifyTelemetry(1, "readTable", Map(
"detectedUri" -> uri,
"detectedVersion" -> version,
"owner" -> owner,
"tableName" -> tableName
))
}
it must "read versioned ad hoc table" in {
val (owner, tableName, version) = ("read_table", "read_table", "v0")
val uri = s"$adHocTablesDir/$owner/$tableName/$version"
Hadoop.write(uri, version)
api.readTable(
tableName=tableName,
owner=Some(owner),
version=Some(version),
extraReadConfig={r=>r.format("csv")}
).collect() should be (List(Row(version)))
verifyTelemetry(1, "readTable", Map(
"detectedUri" -> uri,
"detectedVersion" -> version,
"owner" -> owner,
"tableName" -> tableName,
"version" -> version
))
}
it must "read latest undefined global table" in {
val tableName = "read_table"
var version = "v0"
def uri: String = s"$globalTablesDir/$tableName/$version"
Hadoop.write(uri, version)
version = "v1"
Hadoop.write(uri, version)
api.readTable(
tableName,
extraReadConfig={r=>r.format("csv")}
).collect() should be (List(Row(version)))
verifyTelemetry(1, "readTable", Map(
"detectedUri" -> uri,
"detectedVersion" -> version,
"tableName" -> tableName
))
}
it must "read defined global table" in {
val tableName = "read_table"
spark.sql(s"CREATE OR REPLACE TEMPORARY VIEW `$tableName` AS SELECT 1")
api.readTable(tableName).collect() should be (List(Row(1)))
verifyTelemetry(1, "readTable", Map(
"tableName" -> tableName,
"sqlTableName" -> tableName
))
}
"sql" must "run query" in {
val query = "SELECT 0"
api.sql(query).collect() should be (Array(Row(0)))
verifyTelemetry(1, "sql", Map("query" -> query))
}
"writeTable" must "write ad_hoc table partitions" in {
import spark.implicits._
val (owner, tableName) = ("write_table_partition", "write_table_partition")
var version = "v1"
def uri: String = s"$adHocTablesDir/$owner/$tableName/$version"
// make sure tables don't exist
Hadoop.rm(uri.dropRight(3))
// write new table
api.writeTable(
df=List("a").toDF("0"),
tableName=tableName,
partitionValues=List("1" -> "b", "2" -> "c"),
owner=Some(owner)
)
spark.read.json(uri).collect() should be (List(Row("a", "b", "c")))
verifyTelemetry(1, "writeTable", Map(
"detectedUri" -> s"$uri/1=b/2=c",
"detectedVersion" -> version,
"owner" -> owner,
"partition" -> "1=b/2=c",
"tableName" -> tableName
))
// write new version
version = "v2"
api.writeTable(
df=List(("d","f")).toDF("0", "2"),
tableName=tableName,
partitionValues=List("1" -> "e"),
version=Some(version),
owner=Some(owner),
extraWriteConfig={w=>w.partitionBy("2")}
)
spark.read.json(uri).collect() should be (List(Row("d", "e", "f")))
verifyTelemetry(1, "writeTable", Map(
"detectedUri" -> s"$uri/1=e",
"detectedVersion" -> version,
"owner" -> owner,
"partition" -> "1=e",
"tableName" -> tableName,
"version" -> version
))
// append to latest
api.writeTable(
df=List(("g","f")).toDF("0", "2"),
tableName=tableName,
partitionValues=List("1" -> "e"),
owner=Some(owner),
extraWriteConfig={w=>w.mode("append").partitionBy("2")}
)
// add new dynamic partition to latest
api.writeTable(
df=List(("h","j")).toDF("0", "2"),
tableName=tableName,
partitionValues=List("1" -> "i"),
owner=Some(owner),
extraWriteConfig={w=>w.partitionBy("2")}
)
// add new static partition to latest
api.writeTable(
df=List("k").toDF("0"),
tableName=tableName,
partitionValues=List("1" -> "l", "2" -> "m"),
owner=Some(owner)
)
spark.read.json(uri).orderBy("0").collect() should be (List(
Row("d","e","f"),
Row("g","e","f"),
Row("h","i","j"),
Row("k","l","m")
))
verifyTelemetry(1, "writeTable", Map(
"detectedUri" -> s"$uri/1=e",
"detectedVersion" -> version,
"owner" -> owner,
"partition" -> "1=e",
"tableName" -> tableName
))
verifyTelemetry(1, "writeTable", Map(
"detectedUri" -> s"$uri/1=i",
"detectedVersion" -> version,
"owner" -> owner,
"partition" -> "1=i",
"tableName" -> tableName
))
verifyTelemetry(1, "writeTable", Map(
"detectedUri" -> s"$uri/1=l/2=m",
"detectedVersion" -> version,
"owner" -> owner,
"partition" -> "1=l/2=m",
"tableName" -> tableName
))
}
it must "write partition uris" in {
import spark.implicits._
val tableName = "write_table_partition"
val uri = s"$adHocTablesDir/uri/$tableName"
// make sure table doesn't exist
Hadoop.rm(uri)
// write only uri
api.writeTable(
df=List("a").toDF("0"),
tableName=tableName,
uri=Some(s"$uri/1=b/2=c")
)
// write mixed uri & partition spec
api.writeTable(
df=List("d").toDF("0"),
tableName=tableName,
partitionValues=List("2" -> "f"),
uri=Some(s"$uri/1=e")
)
// write all partitions in partition spec
api.writeTable(
df=List("g").toDF("0"),
tableName=tableName,
partitionValues=List("1" -> "h", "2" -> "i"),
uri=Some(uri)
)
spark.read.json(uri).orderBy("0").collect() should be (List(
Row("a","b","c"),
Row("d","e","f"),
Row("g","h","i")))
verifyTelemetry(1, "writeTable", Map(
"detectedUri" -> s"$uri/1=b/2=c",
"tableName" -> tableName,
"uri" -> s"$uri/1=b/2=c"
))
verifyTelemetry(1, "writeTable", Map(
"detectedUri" -> s"$uri/1=e/2=f",
"partition" -> "2=f",
"tableName" -> tableName,
"uri" -> s"$uri/1=e"
))
verifyTelemetry(1, "writeTable", Map(
"detectedUri" -> s"$uri/1=h/2=i",
"partition" -> "1=h/2=i",
"tableName" -> tableName,
"uri" -> uri
))
}
it must "write undefined global table partition" in {
import spark.implicits._
val (tableName, version) = ("write_table_partition", "v1")
val uri = s"$globalTablesDir/$tableName/$version"
// make sure table doesn't exist
Hadoop.rm(uri)
// write
api.writeTable(
df=List("a").toDF,
tableName=tableName,
partitionValues=List("b" -> "b"),
version=Some(version)
)
spark.read.json(uri).collect() should be (List(Row("a", "b")))
verifyTelemetry(1, "writeTable", Map(
"detectedUri" -> s"$uri/b=b",
"detectedVersion" -> version,
"partition" -> "b=b",
"tableName" -> tableName,
"version" -> version
))
}
it must "write and overwrite defined global table partitions" in {
import spark.implicits._
val (tableName, version) = ("write_table_partition", "v3")
val uri = s"$globalTablesDir/$tableName/$version"
// create table in catalog
spark.sql(
s"""
|CREATE EXTERNAL TABLE `${tableName}_$version`(`0` int)
|PARTITIONED BY (`1` string, `2` string)
|STORED AS TEXTFILE
|LOCATION '$uri'
""".stripMargin)
// make sure table doesn't exist
Hadoop.rm(uri)
// write
api.writeTable(
df=List(0).toDF("0"),
tableName=tableName,
partitionValues=List("1" -> "a", "2" -> "b"),
version=Some(version),
extraWriteConfig={w=>w.format("csv")}
)
spark.sql(s"SELECT * FROM `${tableName}_$version`").collect() should be (List(Row(0, "a", "b")))
verifyTelemetry(1, "writeTable", Map(
"detectedUri" -> s"file:$uri/1=a/2=b",
"detectedVersion" -> version,
"partition" -> "1=a/2=b",
"sqlTableName" -> s"${tableName}_$version",
"tableName" -> tableName,
"version" -> version
))
// overwrite
api.writeTable(
df=List((1,"b")).toDF("0", "2"),
tableName=tableName,
partitionValues=List("1" -> "a"),
version=Some(version),
extraWriteConfig={w=>w.format("csv").mode("overwrite").partitionBy("2")}
)
spark.sql(s"SELECT * FROM `${tableName}_$version`").collect() should be (List(Row(1, "a", "b")))
verifyTelemetry(1, "writeTable", Map(
"detectedUri" -> s"file:$uri/1=a",
"detectedVersion" -> version,
"partition" -> "1=a",
"tableName" -> tableName,
"sqlTableName" -> s"${tableName}_$version",
"version" -> version
))
}
it must "write ad_hoc tables" in {
import spark.implicits._
val (owner, tableName) = ("write_table", "write_table")
var version = "v1"
def uri: String = s"$adHocTablesDir/$owner/$tableName/$version"
// make sure tables don't exist
Hadoop.rm(uri.dropRight(3))
// write new table
api.writeTable(
df=List(("a", "b")).toDF,
tableName=tableName,
owner=Some(owner)
)
spark.read.json(uri).collect() should be (Array(Row("a", "b")))
verifyTelemetry(1, "writeTable", Map(
"detectedUri" -> uri,
"detectedVersion" -> version,
"owner" -> owner,
"tableName" -> tableName
))
// write new version
version = "v2"
api.writeTable(
df=List(("c", "d")).toDF,
tableName=tableName,
owner=Some(owner),
version=Some(version)
)
spark.read.json(uri).collect() should be (Array(Row("c", "d")))
// append to latest
api.writeTable(
df=List(("e", "f")).toDF,
tableName=tableName,
owner=Some(owner),
extraWriteConfig={w=>w.mode("append")}
)
spark.read.json(uri).orderBy("_1").collect() should be (List(Row("c","d"),Row("e","f")))
verifyTelemetry(1, "writeTable", Map(
"detectedUri" -> uri,
"detectedVersion" -> version,
"owner" -> owner,
"tableName" -> tableName,
"version" -> version
))
verifyTelemetry(1, "writeTable", Map(
"detectedUri" -> uri,
"detectedVersion" -> version,
"owner" -> owner,
"tableName" -> tableName
))
}
it must "write table uri" in {
import spark.implicits._
val (tableName, version) = ("write_table", "v1")
val uri = s"$adHocTablesDir/uri/$tableName/$version"
// make sure table doesn't exist
Hadoop.rm(uri)
// write
api.writeTable(
df=List(("a", "b")).toDF,
tableName=tableName,
uri=Some(uri)
)
spark.read.json(uri).collect() should be (Array(Row("a", "b")))
verifyTelemetry(1, "writeTable", Map(
"detectedUri" -> uri,
"detectedVersion" -> version,
"tableName" -> tableName,
"uri" -> uri
))
}
it must "write undefined global table" in {
import spark.implicits._
val (tableName, version) = ("write_table", "v1")
val uri = s"$globalTablesDir/$tableName/$version"
// make sure table doesn't exist
Hadoop.rm(uri)
// write
api.writeTable(
df=List(("a", "b")).toDF,
tableName=tableName,
version=Some(version)
)
spark.read.json(uri).collect() should be (Array(Row("a", "b")))
verifyTelemetry(1, "writeTable", Map(
"detectedUri" -> uri,
"detectedVersion" -> version,
"tableName" -> tableName,
"version" -> version
))
}
it must "write defined global table" in {
import spark.implicits._
val (tableName, version) = ("write_table", "v2")
val uri = s"$globalTablesDir/$tableName/$version"
// create table in catalog
spark.sql(
s"""
|CREATE EXTERNAL TABLE `${tableName}_$version`(`0` int)
|STORED AS TEXTFILE
|LOCATION '$uri'
""".stripMargin)
// make sure table doesn't exist
Hadoop.rm(uri)
// write
api.writeTable(
df=List(0).toDF("0"),
tableName=tableName,
version=Some(version),
// must disable "sql_repair" update for unpartitioned table
metadataUpdateMethods=List("sql_refresh"),
extraWriteConfig={w=>w.format("csv")}
)
spark.sql(s"SELECT * FROM `${tableName}_$version`").collect() should be (List(Row(0)))
verifyTelemetry(1, "writeTable", Map(
"detectedUri" -> s"file:$uri",
"detectedVersion" -> version,
"sqlTableName" -> s"${tableName}_$version",
"tableName" -> tableName,
"version" -> version
))
}
it must "write and overwrite defined partitioned global table" in {
import spark.implicits._
val (tableName, version) = ("write_table", "v3")
val uri = s"$globalTablesDir/$tableName/$version"
// create table in catalog
spark.sql(
s"""
|CREATE EXTERNAL TABLE `${tableName}_$version`(`0` int)
|PARTITIONED BY (`1` string)
|STORED AS TEXTFILE
|LOCATION '$uri'
""".stripMargin)
// make sure table doesn't exist
Hadoop.rm(uri)
// write
api.writeTable(
df=List((0, "a")).toDF("0", "1"),
tableName=tableName,
version=Some(version),
extraWriteConfig={w=>w.format("csv").partitionBy("1")}
)
spark.sql(s"SELECT * FROM `${tableName}_$version`").collect() should be (List(Row(0, "a")))
// NOTE: metadata update is currently unable to drop partitions
// and partitions deleted by rewrite will cause read failures.
// overwrite
api.writeTable(
df=List((1, "a")).toDF("0", "1"),
tableName=tableName,
version=Some(version),
extraWriteConfig={w=>w.format("csv").mode("overwrite").partitionBy("1")}
)
spark.sql(s"SELECT * FROM `${tableName}_$version`").collect() should be (List(Row(1, "a")))
verifyTelemetry(2, "writeTable", Map(
"detectedUri" -> s"file:$uri",
"detectedVersion" -> version,
"sqlTableName" -> s"${tableName}_$version",
"tableName" -> tableName,
"version" -> version
))
}
it must "throw an exception on missing version" in {
val thrown = intercept[Exception] {
api.writeTable(
df=spark.sql("SELECT 1"),
tableName="view"
)
}
assert(thrown.getMessage === "requirement failed: version required to write global table")
}
it must "throw an exception on internal table" in {
spark.sql("CREATE TABLE table_v1 AS SELECT 0")
val thrown = intercept[IllegalArgumentException] {
api.writeTable(
df=spark.sql("SELECT 1"),
tableName="table",
version=Some("v1")
)
}
assert(thrown.getMessage === "requirement failed: table is not external: table_v1")
}
it must "throw an exception on view" in {
spark.sql("CREATE OR REPLACE TEMP VIEW view_v1 AS SELECT 0")
val thrown = intercept[IllegalArgumentException] {
api.writeTable(
df=spark.sql("SELECT 1"),
tableName="view",
version=Some("v1"),
partitionValues=List(("a","b"))
)
}
assert(thrown.getMessage === "requirement failed: table is not external: view_v1")
}
it must "throw an exception on invalid metadata update method" in {
spark.sql(
s"""CREATE EXTERNAL TABLE invalid_metadata_v1(`1` int)
|PARTITIONED BY (a string)
|STORED AS TEXTFILE
|LOCATION '$globalTablesDir/invalid_metdata/v1'
|""".stripMargin
)
val thrown = intercept[IllegalArgumentException] {
api.writeTable(
df=spark.sql("SELECT 1"),
tableName="invalid_metadata",
version=Some("v1"),
partitionValues=List(("a", "b")),
extraWriteConfig={w=>w.format("csv")},
metadataUpdateMethods=List("method")
)
}
assert(thrown.getMessage === "Unsupported metadata location: method")
}
}