Коммит
9b8423fab9
|
@ -1,29 +1,26 @@
|
|||
# Using the Spark CDM Connector
|
||||
|
||||
***Limited preview release*** <br/>
|
||||
Guide last updated, Sept 29, 2020
|
||||
|
||||
**NOTE: This latest release of the doc applies to the 0.17 version of the Spark CDM Connector.**
|
||||
Guide last updated, Oct 27, 2020
|
||||
|
||||
**NOTE: The 0.16 version simplified several of the connector options. Code written that uses earlier versions of the connector may need to be modified to use the revised options introduced in 0.16.**
|
||||
**NOTE: This latest version of the doc applies to the 0.18.1 Public Preview release of the Spark CDM Connector.**
|
||||
|
||||
**NOTE: From the 0.16 version onward, several of the connector options were simplified. Code written with earlier versions of the connector may need to be modified to use these revised options.**
|
||||
|
||||
## Overview
|
||||
|
||||
The Spark CDM Connector enables a Spark program to read and write CDM entities in a CDM folder via dataframes. In principle, the Spark CDM Connector will work in any Spark environment, however this limited preview release has only been tested with and is only supported with Azure Databricks and Apache Spark in Azure Synapse.
|
||||
The Spark CDM Connector enables a Spark program to read and write CDM entities in a CDM folder via Spark dataframes. This preview release is only tested with and supported with Apache Spark in Azure Synapse and Azure Databricks.
|
||||
|
||||
**During this limited preview, use of the Spark CDM Connector in production applications is not recommended or supported.**
|
||||
|
||||
**The connector capabilities and API may be changed without notice.**
|
||||
|
||||
For information on defining CDM documents using CDM 1.0 see
|
||||
For information on defining CDM documents using CDM 1.0 see
|
||||
[https://docs.microsoft.com/en-us/common-data-model/](https://docs.microsoft.com/en-us/common-data-model/).
|
||||
|
||||
## Installing the Spark CDM connector
|
||||
|
||||
**Apache Spark for Azure Synapse:** the Spark CDM Connector is pre-installed and requires no additional installation.
|
||||
**Apache Spark for Azure Synapse:** the Spark CDM Connector is pre-installed and requires no additional installation.
|
||||
|
||||
Note that there may be a delay before the latest version of the connector is available in Synapse. Use the API below to retrieve the current version of the Spark CDM Connector and compare with the [release notes](https://github.com/Azure/spark-cdm-connector/releases) in GitHub.
|
||||
```
|
||||
|
||||
```Scala
|
||||
com.microsoft.cdm.BuildInfo.version
|
||||
```
|
||||
|
||||
|
@ -33,7 +30,8 @@ com.microsoft.cdm.BuildInfo.version
|
|||
|
||||
**NOTE: The Spark CDM Connector does not yet support Spark 3.0. **
|
||||
|
||||
Once installed, sample code and CDM models are available in [GitHub](https://github.com/Azure/spark-cdm-connector/tree/master/samples).
|
||||
|
||||
**Samples:** Once installed, sample code and CDM models are available in [GitHub](https://github.com/Azure/spark-cdm-connector/tree/master/samples).
|
||||
|
||||
## Scenarios
|
||||
### Supported scenarios
|
||||
|
@ -48,13 +46,13 @@ The following scenarios are supported:
|
|||
The following capabilities or limitations apply:
|
||||
- Supports reading and writing to CDM folders in ADLS gen2 **with HNS enabled**.
|
||||
- Supports reading from CDM folders described by either manifest or model.json files.
|
||||
- Supports writing to CDM folders described by a manifest file. Write support for model.json file is not planned.
|
||||
- Supports data in CSV format with/without column headers and with user selectable delimiter character
|
||||
- Supports writing to CDM folders described by a manifest file. Write support for model.json is not supported.
|
||||
- Supports data in CSV format with/without column headers and with user selectable delimiter character.
|
||||
- Supports data in Apache Parquet format, including nested parquet.
|
||||
- Supports sub-manifests on read, optional use of entity-scoped submanifests on write.
|
||||
- Supports writing data using user modifiable partition patterns.
|
||||
- Supports use of managed identity (Synapse), user identity (Azure Databricks) and credentials
|
||||
- Supports resolving CDM aliases locations used in imports using adapter definitions in a config.json
|
||||
- Supports use of managed identity (Synapse), user identity (Azure Databricks) and credentials.
|
||||
- Supports resolving CDM aliases locations used in imports using CDM adapter definitions described in a config.json
|
||||
|
||||
See also _Known issues_ section at the end of this document.
|
||||
|
||||
|
@ -75,15 +73,18 @@ The Spark CDM connector is used to modify normal Spark dataframe read and write
|
|||
|
||||
When reading data, the connector uses metadata in the CDM folder to create the dataframe based on the resolved entity definition for the specified entity, as referenced in the manifest. Entity attribute names are used as dataframe column names and attribute datatypes are mapped to the column datatype. When the dataframe is loaded it is populated from the entity partitions identified in the manifest.
|
||||
|
||||
The connector looks in the specified manifest and any first-level submanifests for the specified entity. If the required entity is in a second-level or lower submanifest, or if there are multiple entities of the same name in different submanifests, then you should specify the submanifest that contains the required entity rather than the root manifest.
|
||||
The connector looks in the specified manifest and any first-level sub-manifests for the specified entity. If the required entity is in a second-level or lower sub-manifest, or if there are multiple entities of the same name in different sub-manifests, then you should specify the sub-manifest that contains the required entity rather than the root manifest.
|
||||
|
||||
Entity partitions can be in a mix of formats, for example, a mix of CSV and parquet files. All the entity data files identified in the manifest are combined into one dataset regardless of format and loaded to the dataframe.
|
||||
|
||||
When reading CSV data, the connector uses the Spark FAILFAST [option](https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fjava%2Forg%2Fapache%2Fspark%2Fsql%2FDataFrameReader.html%23csv-scala.collection.Seq-&data=04%7C01%7CBill.Gibson%40microsoft.com%7Ce799a08c91374ae2ae5108d87a1afd54%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637393603640786659%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=GXQv1dqgKjFX6d%2FqpWcR%2FkhXdd53EEPz9ccAikYtEyI%3D&reserved=0). It will throw an exception if the number of columns != the number of attributes in the entity.
|
||||
|
||||
### Writing Data
|
||||
|
||||
When writing to a CDM folder, if the entity does not already exist in the CDM folder, a new entity and definition is created and added to the CDM folder and referenced in the manifest. Two writing modes are supported:
|
||||
|
||||
**Explicit write**: the physical entity definition is based on a logical CDM entity definition that you specify.
|
||||
|
||||
- The specified logical entity definition is read and resolved to create the physical entity definition used in the CDM folder. If import statements in any directly or indirectly referenced CDM definition file include aliases then a config.json file that maps these aliases to CDM adapters and storage locations must be provided. For more on the use of aliases, see _Aliases and adapter configuration_ below.
|
||||
- If the dataframe schema does not match the referenced entity definition, an error is returned. Ensure that the column datatypes in the dataframe match the attribute datatypes in the entity, including for decimal data, precision and scale set via traits in CDM.
|
||||
- If the dataframe is inconsistent with the entity definition an error is returned
|
||||
|
@ -95,15 +96,17 @@ definitions do not match, an error is returned, otherwise data is written and th
|
|||
definition is written to the manifest in the CDM folder and data is written and the partition information in the manifest is updated.
|
||||
|
||||
**Implicit write**: the entity definition is derived from the dataframe structure.
|
||||
- If the entity does not exist in the CDM folder, the implicit definition is used to create the resolved entity definition in the target CDM folder.
|
||||
- If the entity exists in the CDM folder, the implicit definition is validated against the existing entity definition. If the definitions do not match an error is returned, otherwise data is written and a derived logical entity definition(s) is written into a subfolder of the entity folder.
|
||||
|
||||
Data is written to data folder(s) within an entity subfolder subject to a save mode. The save mode determines whether the new data overwrites or is appended to existing data, or an error is returned if data exists. The default is to return an error if data already exists.
|
||||
- If the entity does not exist in the CDM folder, the implicit definition is used to create the resolved entity definition in the target CDM folder.
|
||||
- If the entity exists in the CDM folder, the implicit definition is validated against the existing entity definition. If the definitions do not match an error is returned, otherwise data is written and a derived logical entity definition(s) is written into a sub-folder of the entity folder.
|
||||
|
||||
Data is written to data folder(s) within an entity sub-folder. A save mode determines whether the new data overwrites or is appended to existing data, or an error is returned if data exists. The default is to return an error if data already exists.
|
||||
|
||||
### Aliases and adapter configuration
|
||||
|
||||
|
||||
CDM definition files use aliases in import statements as a means to simplify the import statement and to allow the location of the imported content to be late bound at execution time. Using aliases also makes it easy to organize CDM files so that related CDM definitions can be grouped together at different locations. Using aliases also allows CDM content to be accessed from different deployed locations at runtime. The snippet below shows the use of aliases in import statements in a CDM definition file.
|
||||
```
|
||||
|
||||
```Scala
|
||||
"imports": [
|
||||
{
|
||||
"corpusPath": "cdm:/foundations.cdm.json"
|
||||
|
@ -116,6 +119,7 @@ CDM definition files use aliases in import statements as a means to simplify the
|
|||
}
|
||||
]
|
||||
```
|
||||
|
||||
In the example above, 'cdm' is used as an alias for the location of the CDM foundations file, and 'core' is used as an alias for the location of the TrackedEntity definition file.
|
||||
|
||||
Aliases are text labels that are matched to a namespace value in an adapter entry in a CDM config.json file. An adapter entry specifies the adapter type (e.g. "adls", "CDN", "GitHub", "local", etc.) and a URL that defines a location. Some adapters support other configuration options, such as a connection timeout. While aliases are arbitrary text labels, the 'cdm' alias is treated in a special manner as described below.
|
||||
|
@ -136,6 +140,7 @@ The connector library name, options and save mode are formatted as follows:
|
|||
- dataframe.write.format("com.microsoft.cdm") [.option("option", "value")]* .mode(savemode.\<saveMode\>)
|
||||
|
||||
Here's an example of how the connector is used for read, showing some of the options. More examples are provided later.
|
||||
|
||||
```scala
|
||||
val readDf = spark.read.format("com.microsoft.cdm")
|
||||
.option("storage", "mystorageaccount.dfs.core.windows.net")
|
||||
|
@ -143,8 +148,10 @@ val readDf = spark.read.format("com.microsoft.cdm")
|
|||
.option("entity", "Customer")
|
||||
.load()
|
||||
```
|
||||
|
||||
#### Token-based access control
|
||||
In Synapse, the Spark CDM Connector supports use of [Managed identities for Azure resource](https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview) to mediate access to the Azure datalake storage account containing the CDM folder. A managed identity is [automatically created for every Synapse workspace](https://docs.microsoft.com/en-us/azure/synapse-analytics/security/synapse-workspace-managed-identity). The connector uses the managed identity of the workspace that contains the notebook in which the connector is called to authenticate to the storage accounts being addressed.
|
||||
|
||||
In Synapse, the Spark CDM Connector supports use of [Managed identities for Azure resource](https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview) to mediate access to the Azure datalake storage account containing the CDM folder. A managed identity is [automatically created for every Synapse workspace](https://docs.microsoft.com/en-us/azure/synapse-analytics/security/synapse-workspace-managed-identity). The connector uses the managed identity of the workspace that contains the notebook in which the connector is called to authenticate to the storage accounts being addressed.
|
||||
|
||||
In Azure Databricks, you can enable [Azure Active Directory credential passthrough](https://docs.microsoft.com/en-us/azure/databricks/security/credential-passthrough/adls-passthrough). With this enabled, the Spark CDM Connector connector will authenticate using the same Azure Active Directory identity that was used to log into Azure Databricks. Enabling this is done on the cluster and requires an Azure Databricks Premium plan.
|
||||
|
||||
|
@ -152,7 +159,7 @@ With both Synapse and Azure Databricks, you must ensure the identity used is gra
|
|||
|
||||
#### Credential-based access control options
|
||||
|
||||
As an alternative to using a managed identity or a user identity, explicit credentials can be provided to enable the Spark CDM connector to access data. In Azure Active Directory, [create an App Registration](https://docs.microsoft.com/en-us/azure/active-directory/develop/quickstart-register-app) and then grant this App Registration access to the storage account using either of the following roles: **Storage Blob Data Contributor** to allow the library to write to CDM folders, or **Storage Blob Data Reader** to allow only read.
|
||||
As an alternative to using a managed identity or a user identity, explicit credentials can be provided to enable the Spark CDM connector to access data. In Azure Active Directory, [create an App Registration](https://docs.microsoft.com/en-us/azure/active-directory/develop/quickstart-register-app) and then grant this App Registration access to the storage account using either of the following roles: **Storage Blob Data Contributor** to allow the library to write to CDM folders, or **Storage Blob Data Reader** to allow only read.
|
||||
|
||||
Once permissions are created, you can pass the app id, app key, and tenant id to the connector on each call to it using the options below. It is recommended to use Azure Key Vault to secure these values to ensure they are not stored in clear text in your notebook file. In Azure Databricks, [create a secret scope which can be backed by Azure Key Vault](https://docs.microsoft.com/en-us/azure/databricks/security/secrets/secret-scopes#create-an-azurekey-vault-backed-secret-scope).
|
||||
|
||||
|
@ -177,8 +184,7 @@ NOTE: you no-longer need to specify a logical entity entity definition in additi
|
|||
|
||||
#### Explicit write options
|
||||
|
||||
The following options identify the logical entity definition that defines the entity being written. The logical entity definition will be resolved to a physical definition that defines how the entity will be written.
|
||||
|
||||
The following options identify the logical entity definition that defines the entity being written. The logical entity definition will be resolved to a physical definition that defines how the entity will be written.
|
||||
|
||||
|**Option** |**Description** |**Pattern / example usage** |
|
||||
|---------|---------|:---------:|
|
||||
|
@ -202,7 +208,7 @@ Note that initially, this is supported for CSV files only. Support for writing
|
|||
|
||||
#### Folder structure and data format options
|
||||
|
||||
Folder organization and file format can be changed with the following options.
|
||||
Folder organization and file format can be changed with the following options.
|
||||
|
||||
|**Option** |**Description** |**Pattern / example usage** |
|
||||
|---------|---------|:---------:|
|
||||
|
@ -233,6 +239,7 @@ See *Folder and file organization* below for details of how data files are named
|
|||
The following examples all use appId, appKey and tenantId variables initialized earlier in the code based on an Azure app registration that has been given Storage Blob Data Contributor permissions on the storage for write and Storage Blob Data Reader permissions for read.
|
||||
|
||||
#### Read
|
||||
|
||||
This code reads the Person entity from the CDM folder with manifest in mystorage.dfs.core.windows.net/cdmdata/contacts/root.manifest.cdm.json.
|
||||
|
||||
```scala
|
||||
|
@ -242,6 +249,7 @@ val df = spark.read.format("com.microsoft.cdm")
|
|||
.option("entity", "Person")
|
||||
.load()
|
||||
```
|
||||
|
||||
#### Implicit Write – using dataframe schema only
|
||||
|
||||
This code writes the dataframe _df_ to a CDM folder with a manifest to mystorage.dfs.core.windows.net/cdmdata/Contacts/default.manifest.cdm.json with an Event entity.
|
||||
|
@ -260,6 +268,7 @@ df.write.format("com.microsoft.cdm")
|
|||
.mode(SaveMode.Append)
|
||||
.save()
|
||||
```
|
||||
|
||||
#### Explicit Write - using an entity definition stored in ADLS
|
||||
|
||||
This code writes the dataframe _df_ to a CDM folder with manifest at
|
||||
|
@ -277,6 +286,7 @@ df.write.format("com.microsoft.cdm")
|
|||
.mode(SaveMode.Overwrite)
|
||||
.save()
|
||||
```
|
||||
|
||||
#### Explicit Write - using an entity defined in the CDM GitHub
|
||||
|
||||
This code writes the dataframe _df_ to a CDM folder with the manifest at [https://mystorage.dfs,core.windows.net/cdmdata/Teams/root.manifest.cdm.json](https://mystorage.dfs,core.windows.net/cdmdata/Teams/root.manifest.cdm.json) and a sub-manifest containing the TeamMembership entity, created in a TeamMembership subdirectory. TeamMembership data is written to CSV files (the default) that overwrite any existing data files. The TeamMembership entity definition is retrieved from the CDM CDN, at:
|
||||
|
@ -344,18 +354,18 @@ convenience and based on the user’s time zone, so not storing a UTC time is of
|
|||
|
||||
#### Handling CDM Time data
|
||||
|
||||
Spark does not support an explicit Time datatype. An attribute with the CDM _Time_ datatype is represented in a Spark dataframe as a column with a Timestamp datatype in a dataframe. When a time value is read, the timestamp in the dataframe will be initialized with the Spark epoch date 01/01/1970 plus the time value as read from the source.
|
||||
Spark does not support an explicit Time datatype. An attribute with the CDM _Time_ datatype is represented in a Spark dataframe as a column with a Timestamp datatype in a dataframe. When a time value is read, the timestamp in the dataframe will be initialized with the Spark epoch date 01/01/1970 plus the time value as read from the source.
|
||||
|
||||
When using explicit write, a timestamp column can be mapped to either a DateTime or Time attribute. If a timestamp is mapped to a Time attribute, the date portion of the timesamp is stripped off.
|
||||
|
||||
When using implicit write, a Timestamp column is mapped by default to a DateTime attribute. To map a timestamp column to a Time attribute, you must add a metadata object to the column in the dataframe that indicates that the timestamp should be interpreted as a time value. The code below shows how this is done in Scala.
|
||||
|
||||
```
|
||||
```scala
|
||||
val md = new MetadataBuilder().putString(“dataType”, “Time”)
|
||||
val schema = StructType(List(
|
||||
StructField(“ATimeColumn”, TimeStampType, true, md))
|
||||
|
||||
```
|
||||
```
|
||||
|
||||
#### Time value accuracy
|
||||
|
||||
|
@ -370,12 +380,13 @@ By default, data files are written into folders created for the current date, na
|
|||
Data file names are based on the following pattern: \<entity\>-\<jobid\>-*.\<fileformat\>.
|
||||
|
||||
The number of data partitions written can be controlled using the sparkContext.parallelize() method. The number of partitions is either determined by the number of executors in the Spark cluster or can be specified explicitly. The Scala example below creates a dataframe with two partitions.
|
||||
|
||||
```scala
|
||||
val df= spark.createDataFrame(spark.sparkContext.parallelize(data, 2), schema)
|
||||
```
|
||||
|
||||
|
||||
**Explicit Write** (defined by a referenced entity definition)
|
||||
|
||||
```
|
||||
+-- <CDMFolder>
|
||||
|-- default.manifest.cdm.json << with entity ref and partition info
|
||||
|
@ -385,7 +396,9 @@ val df= spark.createDataFrame(spark.sparkContext.parallelize(data, 2), schema)
|
|||
|-- <data folder>
|
||||
+-- ...
|
||||
```
|
||||
|
||||
**Explicit Write with sub-manifest:**
|
||||
|
||||
```
|
||||
+-- <CDMFolder>
|
||||
|-- default.manifest.cdm.json << contains reference to sub-manifest
|
||||
|
@ -396,7 +409,9 @@ val df= spark.createDataFrame(spark.sparkContext.parallelize(data, 2), schema)
|
|||
|-- <data folder>
|
||||
+-- ...
|
||||
```
|
||||
**Implicit (entity definition is derived from dataframe schema)**
|
||||
|
||||
**Implicit (entity definition is derived from dataframe schema):**
|
||||
|
||||
```
|
||||
+-- <CDMFolder>
|
||||
|-- default.manifest.cdm.json
|
||||
|
@ -408,7 +423,9 @@ val df= spark.createDataFrame(spark.sparkContext.parallelize(data, 2), schema)
|
|||
|-- <data folder>
|
||||
+-- ...
|
||||
```
|
||||
|
||||
**Implicit Write with sub-manifest:**
|
||||
|
||||
```
|
||||
+-- <CDMFolder>
|
||||
|-- default.manifest.cdm.json << contains reference to sub-manifest
|
||||
|
@ -419,7 +436,7 @@ val df= spark.createDataFrame(spark.sparkContext.parallelize(data, 2), schema)
|
|||
| +-- <entity>.cdm.json << logical entity definition(s)
|
||||
|-- <data folder>
|
||||
|-- <data folder>
|
||||
+-- ...
|
||||
+-- ...
|
||||
```
|
||||
|
||||
## Troubleshooting and Known issues
|
||||
|
@ -432,7 +449,6 @@ val df= spark.createDataFrame(spark.sparkContext.parallelize(data, 2), schema)
|
|||
- _cdm_ alias resolution
|
||||
- _cdmSource_ option
|
||||
|
||||
|
||||
## Not yet supported
|
||||
|
||||
The following features are not yet supported:
|
||||
|
@ -445,6 +461,7 @@ The following features are not yet supported:
|
|||
See https://github.com/Azure/spark-cdm-connector/tree/master/samples for sample code and CDM files.
|
||||
|
||||
## Changes to this doc
|
||||
|
||||
|**Date** |**Change**|
|
||||
|------ |---------|
|
||||
|5/4/20 | Clarified that Overwrite and Append save modes do not allow schema change <br/> Clarified in capabilities summary that partition patterns are supported on read but not write|
|
||||
|
@ -458,3 +475,4 @@ See https://github.com/Azure/spark-cdm-connector/tree/master/samples for sample
|
|||
|9/10/20|Noted that the submanifest containing the source entity must be explicitly specified on read if the entity is in a second or lower level manifest or if the source entity exists in multiple submanifests|
|
||||
|9/12/20|Noted that that Spark 3.0 is not yet supported.|
|
||||
|9/29/20|Noted default for cdmSource option is referenced,<br/> Listed Spark to CDM datatype mappings|
|
||||
|10/27/20|Updated the guide to reflect that release 18.1 is the public preview release; noted that the connector uses the Spark FAILFAST option on read.|
|
||||
|
|
Загрузка…
Ссылка в новой задаче