* initial working version

* update requirements.txt and commands to use low-pri

* cluster commands should use --id instead of --cluster-id

* make --size and --size-low-pri mutually exclusive

* update readme to include low-pri flag
This commit is contained in:
Pablo Selem 2017-06-19 21:58:56 -07:00 коммит произвёл GitHub
Родитель 5abeaceb66
Коммит 2f324fca9c
8 изменённых файлов: 53 добавлений и 18 удалений

Просмотреть файл

@ -29,6 +29,7 @@ First, create your cluster:
./bin/spark-cluster-create \
--id <my-cluster-id> \
--size <number of nodes> \
--size-low-pri <number of low priority nodes> \
--vm-size <vm-size> \
--custom-script <path to custom bash script to run on each node> \
--wait/--no-wait (optional)

Просмотреть файл

@ -15,7 +15,8 @@ CONFIG_PATH = os.path.join(os.path.dirname(__file__), '../configuration.cfg')
if __name__ == '__main__':
pool_id = None
vm_count = None
vm_count = 0
vm_low_pri_count = 0
vm_size = None
custom_script = None
wait = True
@ -25,8 +26,14 @@ if __name__ == '__main__':
parser.add_argument('--id', dest='cluster_id', required=True,
help='The unique id of your spark cluster')
parser.add_argument('--size', type=int, required=True,
# Make --size and --size-low-pri mutually exclusive until there is a fix for
# having clusters with mixed priority types
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--size', type=int,
help='Number of vms in your cluster')
group.add_argument('--size-low-pri', type=int,
help='Number of low priority vms in your cluster')
parser.add_argument('--vm-size', required=True,
help='VM size for nodes in your cluster')
parser.add_argument('--custom-script',
@ -43,6 +50,9 @@ if __name__ == '__main__':
if args.size is not None:
vm_count = args.size
if args.size_low_pri is not None:
vm_low_pri_count = args.size_low_pri
if args.vm_size is not None:
vm_size = args.vm_size
@ -55,7 +65,9 @@ if __name__ == '__main__':
print('-------------------------------------------')
print('spark cluster id: {}'.format(pool_id))
print('spark cluster size: {}'.format(vm_count))
print('spark cluster size: {}'.format(vm_count + vm_low_pri_count))
print('> dedicated: {}'.format(vm_count))
print('> low priority: {}'.format(vm_low_pri_count))
print('spark cluster vm size: {}'.format(vm_size))
print('path to custom script: {}'.format(custom_script))
print('wait for cluster: {}'.format(wait))
@ -94,5 +106,6 @@ if __name__ == '__main__':
custom_script,
pool_id,
vm_count,
vm_low_pri_count,
vm_size,
wait)

Просмотреть файл

@ -19,7 +19,8 @@ if __name__ == '__main__':
# parse arguments
parser = argparse.ArgumentParser(prog='az_spark')
parser.add_argument(dest='cluster_id',
parser.add_argument('--id',
dest='cluster_id',
help='The unique id of your spark cluster')
args = parser.parse_args()

Просмотреть файл

@ -19,7 +19,8 @@ if __name__ == '__main__':
# parse arguments
parser = argparse.ArgumentParser(prog='az_spark')
parser.add_argument(dest='cluster_id',
parser.add_argument('--id',
dest='cluster_id',
help='The unique id of your spark cluster')
args = parser.parse_args()

Просмотреть файл

@ -109,6 +109,7 @@ def create_cluster(
custom_script,
pool_id,
vm_count,
vm_low_pri_count,
vm_size,
wait = True):
"""
@ -149,7 +150,8 @@ def create_cluster(
image_reference = image_ref_to_use,
node_agent_sku_id = sku_to_use),
vm_size = vm_size,
target_dedicated = vm_count,
target_dedicated_nodes = vm_count,
target_low_priority_nodes = vm_low_pri_count,
start_task = batch_models.StartTask(
command_line = util.wrap_commands_in_shell(start_task_commands),
resource_files = resource_files,
@ -188,7 +190,7 @@ def create_cluster(
command_line = util.wrap_commands_in_shell(application_cmd),
resource_files = [],
multi_instance_settings = batch_models.MultiInstanceSettings(
number_of_instances = vm_count,
number_of_instances = vm_count + vm_low_pri_count,
coordination_command_line = util.wrap_commands_in_shell(coordination_cmd),
common_resource_files = []))
@ -245,17 +247,21 @@ def get_cluster_details(
print
nodes = batch_client.compute_node.list(pool_id=pool_id)
visible_state = pool.allocation_state.value if pool.state.value is 'active' else pool.state.value
node_count = '{} -> {}'.format(pool.current_dedicated, pool.target_dedicated) if pool.state.value is 'resizing' or (pool.state.value is 'deleting' and pool.allocation_state.value is 'resizing') else '{}'.format(pool.current_dedicated)
node_count = '{} -> {}'.format(
pool.current_dedicated_nodes + pool.current_low_priority_nodes,
pool.target_dedicated_nodes + pool.target_low_priority_nodes) if pool.state.value is 'resizing' or (pool.state.value is 'deleting' and pool.allocation_state.value is 'resizing') else '{}'.format(pool.current_dedicated_nodes)
print()
print('State: {}'.format(visible_state))
print('Node Size: {}'.format(pool.vm_size))
print('Nodes: {}'.format(node_count))
print('State: {}'.format(visible_state))
print('Node Size: {}'.format(pool.vm_size))
print('Nodes: {}'.format(node_count))
print('| Dedicated: {}'.format(pool.current_dedicated_nodes))
print('| Low priority: {}'.format(pool.current_low_priority_nodes))
print()
node_label = 'Nodes'
print_format = '{:<34}| {:<15} | {:<21}| {:<8}'
print_format_underline = '{:-<34}|{:-<17}|{:-<22}|{:-<8}'
print_format = '{:<36}| {:<15} | {:<21}| {:<8}'
print_format_underline = '{:-<36}|{:-<17}|{:-<22}|{:-<8}'
print(print_format.format(node_label, 'State', 'IP:Port', 'Master'))
print(print_format_underline.format('', '', '', ''))
@ -281,9 +287,11 @@ def list_clusters(
for pool in pools:
pool_state = pool.allocation_state.value if pool.state.value is 'active' else pool.state.value
node_count = pool.current_dedicated
target_nodes = util.get_cluster_total_target_nodes(pool)
current_nodes = util.get_cluster_total_current_nodes(pool);
node_count = current_nodes
if pool_state is 'resizing' or (pool_state is 'deleting' and pool.allocation_state.value is 'resizing'):
node_count = '{} -> {}'.format(pool.current_dedicated, pool.target_dedicated)
node_count = '{} -> {}'.format(current_nodes, target_nodes)
print(print_format.format(pool.id,
pool_state,
@ -358,4 +366,3 @@ def ssh(
print('\n\t{}\n'.format(ssh_command))
else:
call(ssh_command_array)

Просмотреть файл

@ -129,7 +129,7 @@ def submit_app(
# Get pool size
pool = batch_client.pool.get(pool_id)
pool_size = pool.target_dedicated
pool_size = util.get_total_target_nodes(pool)
# Affinitize task to master node
master_node_affinity_id = util.get_master_node_id(batch_client, pool_id)

Просмотреть файл

@ -342,3 +342,15 @@ def print_batch_exception(batch_exception):
for mesg in batch_exception.error.values:
print('{}:\t{}'.format(mesg.key, mesg.value))
print('-------------------------------------------')
def get_cluster_total_target_nodes(pool):
"""
Get the total number of target nodes (dedicated + low pri) for the pool
"""
return pool.target_dedicated_nodes + pool.target_low_priority_nodes
def get_cluster_total_current_nodes(pool):
"""
Get the total number of current nodes (dedicated + low pri) in the pool
"""
return pool.current_dedicated_nodes + pool.current_low_priority_nodes

Просмотреть файл

@ -1,2 +1,2 @@
azure-batch==2.0.0
azure-batch==3.0.0
azure-storage==0.33.0