зеркало из https://github.com/Azure/aztk.git
Feature/wasb (#47)
* update configuration.cfg to secrets.cfg * fix node count pretty print * add more unit tests * configure pytest to only run tests from the 'tests' directory * add missing space * remove static call to loading secrets file * initial commit for automatic wasb support * wasb native support * fix for wasb * bug fix in azure_api * clean up setup_wasb * README for wasb * README updates for wasb * configure wasb creds in start task environment variables * add missing merged files * revert back to working state * added storage account suffix * removed zip to secrets
This commit is contained in:
Родитель
4822172b6a
Коммит
577ff296b5
16
README.md
16
README.md
|
@ -129,6 +129,22 @@ azb spark cluster ssh \
|
||||||
--jupyter <local-port>
|
--jupyter <local-port>
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Connect your cluster to Azure Blob Storage (WASB connection)
|
||||||
|
|
||||||
|
Pre-built into this package is native support for connecting your spark cluster to Azure Blob Storage. To do so, make sure that the storage fields in your **secrets.cfg** file are properly filled out.
|
||||||
|
|
||||||
|
Even if you are just testing and have no need to connect with Azure Blob Storage, you still need to correctly fill out the storage fields in your **secrets.cfg** folder as it is a requirement for this package.
|
||||||
|
|
||||||
|
Once you have correctly filled out the **secrets.cfg** with your storage credentials, you will be able to access said storage account from your Spark job.
|
||||||
|
|
||||||
|
Please note: If you want to access another Azure Blob Storage account, you will need to recreate your cluster with an updated **secrets.cfg** file with the appropriate storage credentials.
|
||||||
|
|
||||||
|
Here's an example of how you may access your data in Blob Storage:
|
||||||
|
|
||||||
|
``` python
|
||||||
|
df = spark.read.csv("wasbs://<STORAGE_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<BLOB_NAME>")
|
||||||
|
```
|
||||||
|
|
||||||
### Manage your Spark cluster
|
### Manage your Spark cluster
|
||||||
|
|
||||||
You can also see your clusters from the CLI:
|
You can also see your clusters from the CLI:
|
||||||
|
|
|
@ -9,6 +9,7 @@ global_config = None
|
||||||
|
|
||||||
batch_client = None
|
batch_client = None
|
||||||
batch_config = None
|
batch_config = None
|
||||||
|
blob_config = None
|
||||||
blob_client = None
|
blob_client = None
|
||||||
|
|
||||||
|
|
||||||
|
@ -23,6 +24,13 @@ class BatchConfig:
|
||||||
self.account_url = account_url
|
self.account_url = account_url
|
||||||
|
|
||||||
|
|
||||||
|
class BlobConfig:
|
||||||
|
def __init__(self, account_key: str, account_name: str, account_suffix: str):
|
||||||
|
self.account_key = account_key
|
||||||
|
self.account_name = account_name
|
||||||
|
self.account_suffix = account_suffix
|
||||||
|
|
||||||
|
|
||||||
def get_batch_client():
|
def get_batch_client():
|
||||||
"""
|
"""
|
||||||
:returns: the batch client singleton
|
:returns: the batch client singleton
|
||||||
|
@ -34,7 +42,7 @@ def get_batch_client():
|
||||||
|
|
||||||
def get_blob_client():
|
def get_blob_client():
|
||||||
"""
|
"""
|
||||||
:returns: the batch client singleton
|
:returns: the blob client singleton
|
||||||
"""
|
"""
|
||||||
if not blob_client:
|
if not blob_client:
|
||||||
__load_blob_client()
|
__load_blob_client()
|
||||||
|
@ -129,9 +137,18 @@ def __load_batch_client():
|
||||||
config.account_url)
|
config.account_url)
|
||||||
|
|
||||||
|
|
||||||
def __load_blob_client():
|
|
||||||
|
def get_blob_config() -> BlobConfig:
|
||||||
|
if not blob_config:
|
||||||
|
__load_blob_config()
|
||||||
|
|
||||||
|
return blob_config
|
||||||
|
|
||||||
|
|
||||||
|
def __load_blob_config():
|
||||||
global_config = config.get()
|
global_config = config.get()
|
||||||
global blob_client
|
|
||||||
|
global blob_config
|
||||||
|
|
||||||
if not global_config.has_option('Storage', 'storageaccountkey'):
|
if not global_config.has_option('Storage', 'storageaccountkey'):
|
||||||
raise AzureApiInitError("Storage account key is not set in config")
|
raise AzureApiInitError("Storage account key is not set in config")
|
||||||
|
@ -148,8 +165,21 @@ def __load_blob_client():
|
||||||
storage_account_suffix = global_config.get(
|
storage_account_suffix = global_config.get(
|
||||||
'Storage', 'storageaccountsuffix')
|
'Storage', 'storageaccountsuffix')
|
||||||
|
|
||||||
|
blob_config = BlobConfig(
|
||||||
|
account_key=storage_account_key,
|
||||||
|
account_name=storage_account_name,
|
||||||
|
account_suffix=storage_account_suffix)
|
||||||
|
|
||||||
|
|
||||||
|
def __load_blob_client():
|
||||||
|
global blob_client
|
||||||
|
|
||||||
|
blob_config = get_blob_config()
|
||||||
|
|
||||||
# create storage client
|
# create storage client
|
||||||
blob_client = create_blob_client(
|
blob_client = create_blob_client(
|
||||||
storage_account_key,
|
blob_config.account_key,
|
||||||
storage_account_name,
|
blob_config.account_name,
|
||||||
storage_account_suffix)
|
blob_config.account_suffix)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -48,6 +48,9 @@ def docker_run_cmd() -> str:
|
||||||
cmd.add_option('-e', 'AZ_BATCH_ACCOUNT_NAME=$AZ_BATCH_ACCOUNT_NAME')
|
cmd.add_option('-e', 'AZ_BATCH_ACCOUNT_NAME=$AZ_BATCH_ACCOUNT_NAME')
|
||||||
cmd.add_option('-e', 'BATCH_ACCOUNT_KEY=$BATCH_ACCOUNT_KEY')
|
cmd.add_option('-e', 'BATCH_ACCOUNT_KEY=$BATCH_ACCOUNT_KEY')
|
||||||
cmd.add_option('-e', 'BATCH_ACCOUNT_URL=$BATCH_ACCOUNT_URL')
|
cmd.add_option('-e', 'BATCH_ACCOUNT_URL=$BATCH_ACCOUNT_URL')
|
||||||
|
cmd.add_option('-e', 'STORAGE_ACCOUNT_NAME=$STORAGE_ACCOUNT_NAME')
|
||||||
|
cmd.add_option('-e', 'STORAGE_ACCOUNT_KEY=$STORAGE_ACCOUNT_KEY')
|
||||||
|
cmd.add_option('-e', 'STORAGE_ACCOUNT_SUFFIX=$STORAGE_ACCOUNT_SUFFIX')
|
||||||
cmd.add_option('-e', 'AZ_BATCH_POOL_ID=$AZ_BATCH_POOL_ID')
|
cmd.add_option('-e', 'AZ_BATCH_POOL_ID=$AZ_BATCH_POOL_ID')
|
||||||
cmd.add_option('-e', 'AZ_BATCH_NODE_ID=$AZ_BATCH_NODE_ID')
|
cmd.add_option('-e', 'AZ_BATCH_NODE_ID=$AZ_BATCH_NODE_ID')
|
||||||
cmd.add_option(
|
cmd.add_option(
|
||||||
|
@ -93,11 +96,18 @@ def generate_cluster_start_task(
|
||||||
|
|
||||||
# TODO use certificate
|
# TODO use certificate
|
||||||
batch_config = azure_api.get_batch_config()
|
batch_config = azure_api.get_batch_config()
|
||||||
|
blob_config = azure_api.get_blob_config()
|
||||||
environment_settings = [
|
environment_settings = [
|
||||||
batch_models.EnvironmentSetting(
|
batch_models.EnvironmentSetting(
|
||||||
name="BATCH_ACCOUNT_KEY", value=batch_config.account_key),
|
name="BATCH_ACCOUNT_KEY", value=batch_config.account_key),
|
||||||
batch_models.EnvironmentSetting(
|
batch_models.EnvironmentSetting(
|
||||||
name="BATCH_ACCOUNT_URL", value=batch_config.account_url),
|
name="BATCH_ACCOUNT_URL", value=batch_config.account_url),
|
||||||
|
batch_models.EnvironmentSetting(
|
||||||
|
name="STORAGE_ACCOUNT_NAME", value=blob_config.account_name),
|
||||||
|
batch_models.EnvironmentSetting(
|
||||||
|
name="STORAGE_ACCOUNT_KEY", value=blob_config.account_key),
|
||||||
|
batch_models.EnvironmentSetting(
|
||||||
|
name="STORAGE_ACCOUNT_SUFFIX", value=blob_config.account_suffix),
|
||||||
batch_models.EnvironmentSetting(
|
batch_models.EnvironmentSetting(
|
||||||
name="SPARK_MASTER_UI_PORT", value=spark_master_ui_port),
|
name="SPARK_MASTER_UI_PORT", value=spark_master_ui_port),
|
||||||
batch_models.EnvironmentSetting(
|
batch_models.EnvironmentSetting(
|
||||||
|
|
|
@ -37,6 +37,8 @@ def __create_zip():
|
||||||
ensure_dir(local_tmp_zipfile)
|
ensure_dir(local_tmp_zipfile)
|
||||||
zipf = zipfile.ZipFile(local_tmp_zipfile, 'w', zipfile.ZIP_DEFLATED)
|
zipf = zipfile.ZipFile(local_tmp_zipfile, 'w', zipfile.ZIP_DEFLATED)
|
||||||
zipdir(os.path.join(root, "node_scripts"), zipf)
|
zipdir(os.path.join(root, "node_scripts"), zipf)
|
||||||
|
|
||||||
|
# zipf.write()
|
||||||
zipf.close()
|
zipf.close()
|
||||||
logging.debug("Ziped file")
|
logging.debug("Ziped file")
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,18 @@ if [ -d "$custom_script_dir" ]; then
|
||||||
else
|
else
|
||||||
echo "Custom script dir '$custom_script_dir' doesn't exists. Will not run any custom scripts."
|
echo "Custom script dir '$custom_script_dir' doesn't exists. Will not run any custom scripts."
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
storage_account_name=$STORAGE_ACCOUNT_NAME
|
||||||
|
storage_account_key=$STORAGE_ACCOUNT_KEY
|
||||||
|
storage_account_suffix=$STORAGE_ACCOUNT_SUFFIX
|
||||||
|
|
||||||
|
if [ -n "$STORAGE_ACCOUNT_NAME" ] && [ -n "$STORAGE_ACCOUNT_KEY" ] && [ -n "$STORAGE_ACCOUNT_SUFFIX" ]; then
|
||||||
|
echo "Setting up WASB connection"
|
||||||
|
bash $(dirname $0)/setup_wasb.sh $storage_account_name $storage_account_key $storage_account_suffix
|
||||||
|
else
|
||||||
|
echo "Storage credentials not set"
|
||||||
|
fi
|
||||||
|
|
||||||
echo "Starting setup using Docker"
|
echo "Starting setup using Docker"
|
||||||
pip3 install -r $(dirname $0)/requirements.txt
|
pip3 install -r $(dirname $0)/requirements.txt
|
||||||
|
|
||||||
|
|
|
@ -170,3 +170,5 @@ def start_spark_worker():
|
||||||
"--webui-port", str(config.spark_worker_ui_port)]
|
"--webui-port", str(config.spark_worker_ui_port)]
|
||||||
print("Connecting to master with '{0}'".format(" ".join(cmd)))
|
print("Connecting to master with '{0}'".format(" ".join(cmd)))
|
||||||
call(cmd)
|
call(cmd)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
# script to install blob storage connection on each node
|
||||||
|
# Usage:
|
||||||
|
# setup_wasb.sh [storage_account_name] [storage_account_key]
|
||||||
|
|
||||||
|
storage_account_name=$1
|
||||||
|
storage_account_key=$2
|
||||||
|
storage_account_suffix=$3
|
||||||
|
|
||||||
|
spark_home=/home/spark-2.2.0-bin-hadoop2.7
|
||||||
|
cd $spark_home/conf
|
||||||
|
|
||||||
|
cp spark-defaults.conf.template spark-defaults.conf
|
||||||
|
|
||||||
|
cat >> spark-defaults.conf <<EOF
|
||||||
|
spark.jars $spark_home/jars/azure-storage-2.0.0.jar,$spark_home/jars/hadoop-azure-2.7.3.jar
|
||||||
|
EOF
|
||||||
|
|
||||||
|
cat >> core-site.xml <<EOF
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>fs.AbstractFileSystem.wasb.Impl</name>
|
||||||
|
<value>org.apache.hadoop.fs.azure.Wasb</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>fs.azure.account.key.$storage_account_name.blob.$storage_account_suffix</name>
|
||||||
|
<value>$storage_account_key</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
||||||
|
EOF
|
||||||
|
|
||||||
|
cd $spark_home/jars
|
||||||
|
|
||||||
|
# install the appropriate jars
|
||||||
|
apt install wget
|
||||||
|
wget http://repo1.maven.org/maven2/com/microsoft/azure/azure-storage/2.0.0/azure-storage-2.0.0.jar
|
||||||
|
wget http://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure/2.7.3/hadoop-azure-2.7.3.jar
|
Загрузка…
Ссылка в новой задаче