зеркало из https://github.com/Azure/aztk.git
3.2 KiB
3.2 KiB
SDK samples
Create the Spark client
You can get the values for this by either running the Getting Started script or using Batch Labs
import os
import sys
import time
import aztk.spark
from aztk.error import AztkError
# set your secrets
secrets_configuration = aztk.spark.models.SecretsConfiguration(
service_principal=aztk.spark.models.ServicePrincipalConfiguration(
tenant_id="<org>.onmicrosoft.com",
client_id="",
credential="",
batch_account_resource_id="",
storage_account_resource_id="",
),
ssh_pub_key="")
# create a client
client = aztk.spark.Client(secrets_configuration)
List available clusters
# list available clusters
clusters = client.cluster.list()
Create a new cluster
configuration_file_path = "/path/to/spark/configuration/files"
spark_configuration = aztk.spark.models.SparkConfiguration(
spark_defaults_conf=os.path.join(configuration_file_path, 'spark-defaults.conf'),
spark_env_sh=os.path.join(configuration_file_path, 'spark-env.sh'),
core_site_xml=os.path.join(configuration_file_path, 'core-site.xml'),
jars=[
os.path.join(configuration_file_path, 'jars', jar)
for jar in os.listdir(os.path.join(configuration_file_path, 'jars'))
])
# configure my cluster
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id="sdk-test",
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
size=2,
vm_size="standard_f2",
spark_configuration=spark_configuration)
# create a cluster, and wait until it is ready
try:
cluster = client.cluster.create(cluster_configuration, wait=True)
except AztkError as e:
raise e
Get an exiting cluster
# get details of the cluster
cluster = client.cluster.get(cluster.id)
Run an application on the cluster
# define a Spark application to run
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy1",
application=os.path.join(ROOT_PATH, 'examples', 'src', 'main', 'python', 'pi.py'),
application_args="10")
# submit the application and wait until it is finished running
client.cluster.submit(cluster.id, app1)
Get the logs of an application
# get logs for app, print to console
app1_logs = client.get_application_log(cluster_id=cluster_config.cluster_id, application_name=app1.name)
print(app1_logs.log)
Get status of app
# get status of application
status = client.cluster.get_application_status(cluster_configuration.cluster_id, app1.name)
Stream logs of app, print to console as it runs
# stream logs of app, print to console as it runs
current_bytes = 0
while True:
app1_logs = client.cluster.get_application_log(
id=cluster_configuration.cluster_id, application_name=app1.name, tail=True, current_bytes=current_bytes)
print(app1_logs.log, end="")
if app1_logs.application_state == 'completed':
break
current_bytes = app1_logs.total_bytes
time.sleep(1)
Stream logs of app, print to console as it runs
# delete the cluster
client.cluster.delete(cluster.id)