Merge branch 'master' of https://github.com/tjprescott/azure-cli into AddResourceShowAndSetCommands

# Conflicts:
#	azure-cli.pyproj
This commit is contained in:
Travis Prescott 2016-04-01 16:43:58 -07:00
Родитель b7aeb053ec 86dd9fd0b7
Коммит 55e5496a49
35 изменённых файлов: 1862 добавлений и 2167 удалений

Просмотреть файл

@ -8,6 +8,6 @@ install:
- pip install -r requirements.txt
script:
- export PYTHONPATH=$PATHONPATH:./src
- python -m azure.cli
- python -m azure.cli -h
- pylint src/azure
- python -m unittest discover -s src/azure/cli/tests --buffer

Просмотреть файл

@ -25,6 +25,7 @@
<PtvsTargetsFile>$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)\Python Tools\Microsoft.PythonTools.targets</PtvsTargetsFile>
</PropertyGroup>
<ItemGroup>
<Compile Include="azure\cli\application.py" />
<Compile Include="azure\cli\commands\account.py" />
<Compile Include="azure\cli\commands\login.py" />
<Compile Include="azure\cli\commands\logout.py" />
@ -36,7 +37,6 @@
<Compile Include="azure\cli\commands\_auto_command.py" />
<Compile Include="azure\cli\commands\_command_creation.py" />
<Compile Include="azure\cli\commands\__init__.py" />
<Compile Include="azure\cli\extensions\experimental.py" />
<Compile Include="azure\cli\extensions\query.py">
<SubType>Code</SubType>
</Compile>
@ -45,7 +45,6 @@
<Compile Include="azure\cli\main.py" />
<Compile Include="azure\cli\tests\command_specs\test_spec_network.py" />
<Compile Include="azure\cli\tests\command_specs\test_spec_resource.py" />
<Compile Include="azure\cli\tests\command_specs\test_spec_storage.py" />
<Compile Include="azure\cli\tests\command_specs\test_spec_vm.py" />
<Compile Include="azure\cli\tests\command_specs\__init__.py" />
<Compile Include="azure\cli\tests\test_api_check.py">
@ -54,19 +53,21 @@
<Compile Include="azure\cli\tests\test_argparse.py">
<SubType>Code</SubType>
</Compile>
<Compile Include="azure\cli\parser.py" />
<Compile Include="azure\cli\tests\test_parser.py" />
<Compile Include="azure\cli\tests\test_autocommand.py" />
<Compile Include="azure\cli\tests\test_commands.py" />
<Compile Include="azure\cli\tests\test_connection_verify.py" />
<Compile Include="azure\cli\tests\test_help.py" />
<Compile Include="azure\cli\tests\test_application.py" />
<Compile Include="azure\cli\tests\test_output.py" />
<Compile Include="azure\cli\_debug.py" />
<Compile Include="azure\cli\tests\test_help.py" />
<Compile Include="azure\cli\tests\test_output.py" />
<Compile Include="azure\cli\_help.py" />
<Compile Include="azure\cli\_helpdocgen.py" />
<Compile Include="azure\cli\_help_files.py" />
<Compile Include="azure\cli\extensions\_event_dispatcher.py" />
<Compile Include="azure\cli\_locale.py" />
<Compile Include="azure\cli\tests\test_profile.py" />
<Compile Include="azure\cli\_argparse.py" />
<Compile Include="azure\cli\_logging.py" />
<Compile Include="azure\cli\_output.py" />
<Compile Include="azure\cli\_profile.py" />
@ -104,14 +105,6 @@
</Interpreter>
</ItemGroup>
<ItemGroup>
<Content Include="azure\cli\tests\recordings\command_specs.test_spec_network.network_usage_list.yaml" />
<Content Include="azure\cli\tests\recordings\command_specs.test_spec_resource.resource_group_list.yaml" />
<Content Include="azure\cli\tests\recordings\command_specs.test_spec_storage.storage_account_check_name.yaml" />
<Content Include="azure\cli\tests\recordings\command_specs.test_spec_storage.storage_account_list.yaml" />
<Content Include="azure\cli\tests\recordings\command_specs.test_spec_storage.storage_account_list_keys.yaml" />
<Content Include="azure\cli\tests\recordings\command_specs.test_spec_storage.storage_account_usage.yaml" />
<Content Include="azure\cli\tests\recordings\command_specs.test_spec_vm.vm_usage_list_westus.yaml" />
<Content Include="azure\cli\tests\recordings\expected_results.res" />
<Content Include="azure\cli\tests\recordings\README.rst" />
</ItemGroup>
<Import Project="$(PtvsTargetsFile)" Condition="Exists($(PtvsTargetsFile))" />

Просмотреть файл

@ -1,4 +1,5 @@
applicationinsights==0.10.0
argcomplete==1.1.0
azure==2.0.0rc1
jmespath
mock==1.3.0

Просмотреть файл

@ -57,6 +57,7 @@ CLASSIFIERS = [
DEPENDENCIES = [
'adal==0.2.0', #from internal index server.
'applicationinsights',
'argcomplete',
'azure==2.0.0rc1',
'jmespath',
'pyyaml',

Просмотреть файл

@ -1,4 +1,5 @@
import sys
import os
import azure.cli.main
@ -11,6 +12,15 @@ try:
except Exception: #pylint: disable=broad-except
pass
sys.exit(azure.cli.main.main(sys.argv[1:]))
args = sys.argv[1:]
# Check if we are in argcomplete mode - if so, we
# need to pick up our args from environment variables
if os.environ.get('_ARGCOMPLETE'):
comp_line = os.environ.get('COMP_LINE')
if comp_line:
args = comp_line.split()[1:]
sys.exit(azure.cli.main.main(args))
finally:
telemetry_flush()

Просмотреть файл

@ -1,428 +0,0 @@
from __future__ import print_function
import sys
from ._help import (GroupHelpFile, CommandHelpFile, HelpFile, print_detailed_help,
print_welcome_message, print_description_list)
from ._helpdocgen import generate_help
from ._locale import L
from ._logging import logger
from ._output import OutputProducer
from azure.cli.extensions import event_dispatcher
# Named arguments are prefixed with one of these strings
ARG_PREFIXES = sorted(('-', '--', '/'), key=len, reverse=True)
# Values are separated from argument name with one or more of these characters
# or a space
ARG_SEPARATORS = ':='
class IncorrectUsageError(Exception):
'''Raised when a command is incorrectly used and the usage should be
displayed to the user.
'''
pass
class Arguments(dict):
def __init__(self, source=None): #pylint: disable=super-init-not-called
self.positional = []
if source:
self.update(source)
def add_from_dotted(self, key, value):
d = self
bits = key.split('.')
for p in bits[:-1]:
d = d.setdefault(p, Arguments())
if not isinstance(d, Arguments):
raise RuntimeError('incompatible arguments for "{}"'.format(p))
d[bits[-1]] = value
def __getattr__(self, key):
try:
return self[key]
except LookupError:
pass
logger.debug('Argument %s is required', key)
raise IncorrectUsageError(L('Argument {0} is required').format(key))
def _read_arg(string):
for prefix in ARG_PREFIXES:
if string.startswith(prefix):
a1, a2 = string, None
indices = sorted((a1.find(sep), sep) for sep in ARG_SEPARATORS)
sep = next((i[1] for i in indices if i[0] > len(prefix)), None)
if sep:
a1, _, a2 = a1.partition(sep)
return a1[len(prefix):].lower(), a2
return None, None
def _index(string, char, default=sys.maxsize):
try:
return string.index(char)
except ValueError:
return default
class ArgumentParserResult(object): #pylint: disable=too-few-public-methods
def __init__(self, result, output_format=None):
self.result = result
self.output_format = output_format
class ArgumentParser(object):
def __init__(self, prog):
self.prog = prog
self.noun_map = {
'$full_name': 'azure-cli',
}
self.help_args = {'--help', '-h'}
self.complete_args = {'--complete'}
self.global_args = {'--verbose', '--debug'}
self.genhelp_args = {'--genhelp'}
def add_global_param(self, spec, desc):
# TODO: Keep track of all global args to allow help
# and statement completion to pick them up
pass
def add_command(self, #pylint: disable=too-many-arguments
handler,
name=None,
description=None,
args=None,
accepts_unexpected_args=False):
'''Registers a command that may be parsed by this parser.
`handler` is the function to call with two `Arguments` objects.
All recognized arguments will appear on the first; all others on the
second. Accessing a missing argument as an attribute will raise
`IncorrectUsageError` that typically displays the command help.
Accessing a missing argument as an index or using `get` behaves like a
dictionary.
`name` is a space-separated list of names identifying the command.
`description` is a short piece of help text to display in usage info.
`args` is a list of (spec, description) tuples. Each spec is either the
name of a positional argument, or an ``'--argument -a <variable>'``
string listing one or more argument names and an optional variable name.
When multiple names are specified, the first is always used as the
name on `Arguments`.
'''
nouns = (name or handler.__name__).split()
full_name = ''
m = self.noun_map
for n in nouns:
full_name += n
m = m.setdefault(n.lower(), {
'$full_name': full_name
})
full_name += '.'
m['$description'] = description
m['$handler'] = handler
m['$doctext'] = handler.__doc__
m['$accepts_unexpected_args'] = accepts_unexpected_args
m['$args'] = []
m['$kwargs'] = kw = {}
m['$argdoc'] = ad = []
for spec, desc, req, target in args or []:
if not any(spec.startswith(p) for p in ARG_PREFIXES):
m['$args'].append(spec.strip('<> '))
ad.append((spec, desc, req))
continue
aliases = spec.split()
if any(aliases[-1].startswith(p) for p in ARG_PREFIXES):
v = True
else:
v = aliases.pop().strip('<> ')
if not target:
target, _ = _read_arg(aliases[0])
kw.update({_read_arg(a)[0]: (target, v, req, aliases) for a in aliases})
ad.append(('/'.join(aliases), desc, req))
def execute(self,
args,
show_usage=False,
show_completions=False,
out=None):
'''Parses `args` and invokes the associated handler.
The handler is passed two `Arguments` objects with all arguments other
than those in `self.help_args`, `self.complete_args` and
`self.global_args`. The first contains arguments that were defined by
the handler spec, while the second contains all other arguments.
If `show_usage` is ``True`` or any of `self.help_args` has been provided
then usage information will be displayed instead of executing the
command.
If `show_completions` is ``True`` or any of `self.complete_args` has
been provided then raw information about the likely arguments will be
provided.
If `out` is not specified, we default to sys.stdout.
'''
out = sys.stdout if out is None else out
if not show_usage:
show_usage = any(a in self.help_args for a in args)
if not show_completions:
show_completions = any(a in self.complete_args for a in args)
generate_help_files = any(a in self.genhelp_args for a in args)
if generate_help_files:
for a in self.genhelp_args:
args.remove(a)
try:
it = self._get_args_itr(args)
m, n = self._get_noun_map(args, it, out)
if generate_help_files:
generate_help(m)
return ArgumentParserResult(None)
if show_usage:
return ArgumentParserResult(self._display_usage(m, out))
if show_completions:
return ArgumentParserResult(
self._display_completions(m, args, out))
if len(args) == 0:
print_welcome_message(out)
self._display_children(m, args, out)
return ArgumentParserResult(None)
expected_kwargs, handler = self._get_noun_data(m, n, args, out)
others, parsed = self._parse_nouns(expected_kwargs, it, m, n, out)
output_format = self._get_output_format(m, others, out)
bad_args_passed = self._handle_bad_args(m, others, out)
self._handle_required_args(expected_kwargs, parsed, out)
if bad_args_passed:
raise ArgParseError()
except (ArgParseFinished, ArgParseError):
return ArgumentParserResult(None)
return self._execute(m, out, handler, parsed, others, output_format)
def _get_args_itr(self, args):
all_global_args = set(
a.lstrip('-/') for a in self.help_args | self.complete_args | self.global_args)
def not_global(a):
return a.lstrip('-/') not in all_global_args
it = filter(not_global, args).__iter__() #pylint: disable=bad-builtin
return it
def _get_noun_map(self, args, it, out):
m = self.noun_map
n = next(it, '')
while n:
try:
m = m[n.lower()]
except LookupError:
if '$args' not in m:
print(L('\nCommand "{0}" not found, names starting with "{0}":\n'.format(n)),
file=out)
self._display_completions(m, args, out=out)
raise ArgParseFinished()
break
n = next(it, '')
return m, n
def _parse_nouns(self, expected_kwargs, it, m, n, out): #pylint: disable=too-many-arguments
parsed = Arguments()
others = Arguments()
while n:
next_n = next(it, '')
key_n, value = _read_arg(n)
if key_n:
target_value = expected_kwargs.get(key_n)
if target_value is None:
# Unknown arg always takes an argument.
if value is None:
value, next_n = next_n, next(it, '')
others.add_from_dotted(key_n, value)
elif target_value[1] is True:
# Arg with no value
if value is not None:
print(L("argument '{0}' does not take a value").format(key_n),
file=out)
self._display_usage(m)
raise ArgParseFinished()
parsed.add_from_dotted(target_value[0], True)
else:
# Arg with a value
if value is None:
value, next_n = next_n, next(it, '')
parsed.add_from_dotted(target_value[0], value)
else:
# Positional arg
parsed.positional.append(n)
n = next_n
return others, parsed
def _get_noun_data(self, m, n, args, out):
try:
expected_kwargs = m['$kwargs']
handler = m['$handler']
except LookupError:
logger.debug('Missing data for noun %s', n)
self._display_children(m, args, out=out)
raise ArgParseFinished()
return expected_kwargs, handler
@staticmethod
def _handle_required_args(expected_kwargs, parsed, out):
required_args = [a for a, _, req, _ in expected_kwargs.values() if req]
required_args = sorted(list(set(required_args)))
missing_arg = False
for a in required_args:
try:
parsed[a]
except KeyError:
missing_arg = True
full_name = ['{0}/{1}'.format(names[0], names[1])
for arg, _, req, names in expected_kwargs.values() if arg == a][0]
print(L('Missing required argument: {}'.format(full_name)),
file=out)
if missing_arg:
raise ArgParseError(L('Missing required argument(s)'))
@staticmethod
def _handle_bad_args(m, others, out):
bad_args_passed = False
if not m['$accepts_unexpected_args'] and len(others) > 0:
print(L('\nUnexpected parameter(s): {0}\n').format(', '.join(others)), file=out)
bad_args_passed = True
return bad_args_passed
def _get_output_format(self, m, others, out):
try:
output_format = others.pop('output') if others else None
if output_format is not None and output_format not in OutputProducer.format_dict:
print(L("Invalid output format '{}'.".format(output_format)), file=out)
self._display_usage(m)
raise ArgParseFinished()
except KeyError:
output_format = None
return output_format
@staticmethod
def _execute(m, out, handler, parsed, others, output_format): #pylint: disable=too-many-arguments
old_stdout = sys.stdout
try:
sys.stdout = out
event_data = {
'handler': handler,
'command_metadata': m,
'args': parsed,
'unexpected': others
}
# Let any event handlers that want to modify/munge the parameters do so...
event_dispatcher.raise_event(event_dispatcher.PARSING_PARAMETERS, event_data)
# Let any event handlers that want to know that we are about to execute do their
# thing...
event_dispatcher.raise_event(event_dispatcher.EXECUTING_COMMAND, event_data)
return ArgumentParserResult(event_data['handler'](parsed, others), output_format)
except IncorrectUsageError as ex:
print(str(ex), file=out)
return ArgumentParserResult(None)
finally:
sys.stdout = old_stdout
def _display_usage(self, noun_map, out=sys.stdout):
subnouns = sorted(k for k in noun_map if not k.startswith('$'))
argdoc = noun_map.get('$argdoc')
delimiters = noun_map['$full_name']
doc = GroupHelpFile(delimiters, subnouns) \
if len(subnouns) > 0 \
else CommandHelpFile(delimiters, argdoc)
doc.load(noun_map)
if isinstance(doc, GroupHelpFile):
for child in doc.children:
args = delimiters.split('.') if delimiters != 'azure-cli' else []
args.append(child.name)
child.command = ' '.join(args)
child.delimiters = '.'.join(args)
it = self._get_args_itr(args)
m, _ = self._get_noun_map(args, it, out)
child.load(m)
print_detailed_help(doc, out)
def _display_completions(self, noun_map, arguments, out=sys.stdout):
nouns = self._get_noun_matches(arguments, noun_map, out)
last_arg = arguments[-1] if len(arguments) > 0 else ''
matches = [k for k in nouns if k.startswith(last_arg)]
print('\n'.join(sorted(set(matches))), file=out)
out.flush()
def _display_children(self, noun_map, arguments, out=sys.stdout):
nouns = self._get_noun_matches(arguments, noun_map, out)
group_help = HelpFile(noun_map['$full_name'])
group_help.load(noun_map)
print(L('Group'), file=out)
print_description_list([group_help], out)
print(L('\nSub-Commands'), file=out)
help_files = []
for noun in sorted(nouns):
args = '{0} {1}'.format(' '.join(arguments), noun).split(' ') \
if arguments \
else [noun]
it = self._get_args_itr(args)
m, _ = self._get_noun_map(args, it, out)
help_file = HelpFile(m['$full_name'])
help_file.load(m)
help_files.append(help_file)
print_description_list(help_files, out)
out.flush()
def _get_noun_matches(self, arguments, noun_map, out):
for a in self.complete_args:
try:
arguments.remove(a)
except ValueError:
pass
kwargs = noun_map.get('$kwargs') or []
last_arg = arguments[-1] if len(arguments) > 0 else ''
args_candidates = []
arguments_set = set(arguments)
for a in kwargs:
alias = kwargs[a][3]
#check whether the arg has been used already
if not [x for x in alias if x in arguments_set]:
args_candidates.extend(alias)
if last_arg.startswith('-') and (last_arg in args_candidates):
print('\n', file=out) # TODO: parameter value completion is N.Y.I
return []
else:
subcommand_candidates = [k for k in noun_map if not k.startswith('$')]
candidates = subcommand_candidates + args_candidates
return candidates
class ArgParseError(Exception):
pass
class ArgParseFinished(Exception):
pass

Просмотреть файл

@ -4,12 +4,8 @@ import sys
import json
import re
from collections import OrderedDict
from datetime import datetime
from enum import Enum
from six import StringIO
from azure.cli.extensions import event_dispatcher
class OutputFormatException(Exception):
pass
@ -59,40 +55,13 @@ class OutputProducer(object): #pylint: disable=too-few-public-methods
'tsv': format_tsv,
}
KEYS_CAMELCASE_PATTERN = re.compile('(?!^)_([a-zA-Z])')
def __init__(self, formatter=format_list, file=sys.stdout): #pylint: disable=redefined-builtin
self.formatter = formatter
self.file = file
def out(self, obj):
obj = OutputProducer.todict(obj)
event_data = {'result': obj}
event_dispatcher.raise_event(event_dispatcher.TRANSFORM_RESULT, event_data)
event_dispatcher.raise_event(event_dispatcher.FILTER_RESULT, event_data)
print(self.formatter(event_data['result']), file=self.file)
print(self.formatter(obj), file=self.file)
@staticmethod
def todict(obj): #pylint: disable=too-many-return-statements
def to_camelcase(s):
return re.sub(OutputProducer.KEYS_CAMELCASE_PATTERN, lambda x: x.group(1).upper(), s)
if isinstance(obj, dict):
return {k: OutputProducer.todict(v) for (k, v) in obj.items()}
elif isinstance(obj, list):
return [OutputProducer.todict(a) for a in obj]
elif isinstance(obj, Enum):
return obj.value
elif isinstance(obj, datetime):
return obj.isoformat()
elif hasattr(obj, '_asdict'):
return OutputProducer.todict(obj._asdict())
elif hasattr(obj, '__dict__'):
return dict([(to_camelcase(k), OutputProducer.todict(v))
for k, v in obj.__dict__.items()
if not callable(v) and not k.startswith('_')])
else:
return obj
@staticmethod
def get_formatter(format_type):
@ -135,7 +104,7 @@ class ListOutput(object): #pylint: disable=too-few-public-methods
@staticmethod
def _dump_line(io, line, indent):
io.write(' ' * indent)
io.write(line)
io.write(str(line))
io.write('\n')
def _dump_object(self, io, obj, indent):
@ -248,7 +217,7 @@ class TsvOutput(object): #pylint: disable=too-few-public-methods
# and a dictionary value in other...
stream.write('{object}')
else:
stream.write(data)
stream.write(str(data))
@staticmethod
def _dump_row(data, stream):

Просмотреть файл

@ -0,0 +1,144 @@
from collections import defaultdict
from datetime import datetime
import sys
import re
import argparse
import logging
from enum import Enum
from .parser import AzCliCommandParser
import azure.cli.extensions
class Configuration(object): # pylint: disable=too-few-public-methods
"""The configuration object tracks session specific data such
as output formats, available commands etc.
"""
def __init__(self, argv):
self.argv = argv or sys.argv[1:]
self.log = logging.getLogger('az')
self.output_format = 'list'
def get_command_table(self):
import azure.cli.commands as commands
# Find the first noun on the command line and only load commands from that
# module to improve startup time.
for a in self.argv:
if not a.startswith('-'):
return commands.get_command_table(a)
# No noun found, so load all commands.
return commands.get_command_table()
class Application(object):
TRANSFORM_RESULT = 'Application.TransformResults'
FILTER_RESULT = 'Application.FilterResults'
GLOBAL_PARSER_CREATED = 'GlobalParser.Created'
COMMAND_PARSER_CREATED = 'CommandParser.Created'
COMMAND_PARSER_LOADED = 'CommandParser.Loaded'
COMMAND_PARSER_PARSED = 'CommandParser.Parsed'
def __init__(self, configuration):
self._event_handlers = defaultdict(lambda: [])
self.configuration = configuration
# Register presence of and handlers for global parameters
self.register(self.GLOBAL_PARSER_CREATED, Application._register_builtin_arguments)
self.register(self.COMMAND_PARSER_LOADED, Application._enable_autocomplete)
self.register(self.COMMAND_PARSER_PARSED, self._handle_builtin_arguments)
# Let other extensions make their presence known
azure.cli.extensions.register_extensions(self)
self.global_parser = AzCliCommandParser(prog='az', add_help=False)
self.raise_event(self.GLOBAL_PARSER_CREATED, self.global_parser)
self.parser = AzCliCommandParser(prog='az', parents=[self.global_parser])
self.raise_event(self.COMMAND_PARSER_CREATED, self.parser)
def load_commands(self):
self.parser.load_command_table(self.configuration.get_command_table())
self.raise_event(self.COMMAND_PARSER_LOADED, self.parser)
def execute(self, argv):
args = self.parser.parse_args(argv)
self.raise_event(self.COMMAND_PARSER_PARSED, args)
# Consider - we are using any args that start with an underscore (_) as 'private'
# arguments and remove them from the arguments that we pass to the actual function.
# This does not feel quite right.
params = dict([(key, value)
for key, value in args.__dict__.items() if not key.startswith('_')])
params.pop('subcommand', None)
params.pop('func', None)
# TODO: Remove this conversion code once internal key references are updated (#116797761)
converted_params = {}
for key in params.keys():
converted_key = key.replace('_', '-')
converted_params[converted_key] = params[key]
result = args.func(converted_params)
result = self.todict(result)
event_data = {'result': result}
self.raise_event(self.TRANSFORM_RESULT, event_data)
self.raise_event(self.FILTER_RESULT, event_data)
return event_data['result']
def raise_event(self, name, event_data):
'''Raise the event `name`.
'''
for func in self._event_handlers[name]:
func(event_data)
def register(self, name, handler):
'''Register a callable that will be called when the
event `name` is raised.
param: name: The name of the event
param: handler: Function that takes two parameters;
name: name of the event raised
event_data: `dict` with event specific data.
'''
self._event_handlers[name].append(handler)
KEYS_CAMELCASE_PATTERN = re.compile('(?!^)_([a-zA-Z])')
@classmethod
def todict(cls, obj): #pylint: disable=too-many-return-statements
def to_camelcase(s):
return re.sub(cls.KEYS_CAMELCASE_PATTERN, lambda x: x.group(1).upper(), s)
if isinstance(obj, dict):
return {k: cls.todict(v) for (k, v) in obj.items()}
elif isinstance(obj, list):
return [cls.todict(a) for a in obj]
elif isinstance(obj, Enum):
return obj.value
elif isinstance(obj, datetime):
return obj.isoformat()
elif hasattr(obj, '_asdict'):
return cls.todict(obj._asdict())
elif hasattr(obj, '__dict__'):
return dict([(to_camelcase(k), cls.todict(v))
for k, v in obj.__dict__.items()
if not callable(v) and not k.startswith('_')])
else:
return obj
@staticmethod
def _enable_autocomplete(parser):
import argcomplete
argcomplete.autocomplete(parser)
@staticmethod
def _register_builtin_arguments(parser):
parser.add_argument('--subscription', dest='_subscription_id', help=argparse.SUPPRESS)
parser.add_argument('--output', '-o', dest='_output_format',
choices=['list', 'json', 'tsv'],
help=argparse.SUPPRESS)
def _handle_builtin_arguments(self, args):
self.configuration.output_format = args._output_format #pylint: disable=protected-access
del args._output_format

Просмотреть файл

@ -1,5 +1,7 @@
from .._argparse import IncorrectUsageError
from .._logging import logger
from __future__ import print_function
import sys
import time
from collections import defaultdict
# TODO: Alternatively, simply scan the directory for all modules
COMMAND_MODULES = [
@ -13,56 +15,111 @@ COMMAND_MODULES = [
'vm',
]
_COMMANDS = {}
COMMON_PARAMETERS = {
'resource_group_name': {
'name': '--resourcegroup -g',
'metavar': 'RESOURCE GROUP',
'help': 'Name of resource group',
'required': True
},
'location': {
'name': '--location -l',
'metavar': 'LOCATION',
'help': 'Location',
'required': True
}
}
def command(name, accepts_unexpected_args=False):
def add_command(handler):
cmd = _COMMANDS.setdefault(handler, {})
cmd['name'] = name
cmd['accepts_unexpected_args'] = accepts_unexpected_args
logger.debug('Added %s as command "%s"', handler, name)
return handler
return add_command
class LongRunningOperation(object): #pylint: disable=too-few-public-methods
def description(description_text):
def add_description(handler):
_COMMANDS.setdefault(handler, {})['description'] = description_text
logger.debug('Added description "%s" to %s', description_text, handler)
return handler
return add_description
progress_file = sys.stderr
def option(spec, description_text=None, required=False, target=None):
def add_option(handler):
_COMMANDS.setdefault(handler, {}).setdefault('args', []) \
.append((spec, description_text, required, target))
logger.debug('Added option "%s" to %s', spec, handler)
return handler
return add_option
def __init__(self, start_msg='', finish_msg='', poll_interval_ms=1000.0):
self.start_msg = start_msg
self.finish_msg = finish_msg
self.poll_interval_ms = poll_interval_ms
def add_to_parser(parser, command_name=None):
'''Loads commands into the parser
def __call__(self, poller):
print(self.start_msg, file=self.progress_file)
succeeded = False
try:
while not poller.done():
if self.progress_file:
print('.', end='', file=self.progress_file)
self.progress_file.flush()
time.sleep(self.poll_interval_ms / 1000.0)
result = poller.result()
succeeded = True
return result
finally:
# Ensure that we get a newline after the dots...
if self.progress_file:
print(file=self.progress_file)
print(self.finish_msg if succeeded else '', file=self.progress_file)
When `command` is specified, only commands from that module will be loaded.
class CommandTable(defaultdict):
"""A command table is a dictionary of func -> {name,
func,
**kwargs}
objects.
The `name` is the space separated name - i.e. 'az vm list'
`func` represents the handler for the method, and will be called with the parsed
args from argparse.ArgumentParser. The remaining keyword arguments will be passed to
ArgumentParser.add_parser.
"""
def __init__(self):
super(CommandTable, self).__init__(lambda: {'arguments': []})
def command(self, name, **kwargs):
def wrapper(func):
self[func]['name'] = name
self[func].update(kwargs)
return func
return wrapper
def description(self, description):
def wrapper(func):
self[func]['description'] = description
return func
return wrapper
def option(self, name, **kwargs):
def wrapper(func):
try:
opt = dict(kwargs['kwargs'])
except KeyError:
opt = dict(kwargs)
opt['name'] = name
self[func]['arguments'].append(opt)
return func
return wrapper
def _get_command_table(module_name):
module = __import__('azure.cli.commands.' + module_name)
for part in ('cli.commands.' + module_name).split('.'):
module = getattr(module, part)
return module.command_table
def get_command_table(module_name=None):
'''Loads command table(s)
When `module_name` is specified, only commands from that module will be loaded.
If the module is not found, all commands are loaded.
'''
# Importing the modules is sufficient to invoke the decorators. Then we can
# get all of the commands from the _COMMANDS variable.
loaded = False
if command_name:
if module_name:
try:
__import__('azure.cli.commands.' + command_name)
command_table = _get_command_table(module_name)
loaded = True
except ImportError:
# Unknown command - we'll load all below
pass
if not loaded:
command_table = {}
for mod in COMMAND_MODULES:
__import__('azure.cli.commands.' + mod)
command_table.update(_get_command_table(mod))
loaded = True
for handler, info in _COMMANDS.items():
# args have probably been added in reverse order
info.setdefault('args', []).reverse()
parser.add_command(handler, **info)
return command_table

Просмотреть файл

@ -1,56 +1,22 @@
from __future__ import print_function
import inspect
import sys
import time
from msrest.paging import Paged
from msrest.exceptions import ClientException
from azure.cli._argparse import IncorrectUsageError
from ..commands import command, description, option
from azure.cli.parser import IncorrectUsageError
from ..commands import COMMON_PARAMETERS
EXCLUDED_PARAMS = frozenset(['self', 'raw', 'custom_headers', 'operation_config'])
GLOBALPARAMALIASES = {
'resource_group_name': '--resourcegroup --rg <resourcegroupname>'
}
class AutoCommandDefinition(object): #pylint: disable=too-few-public-methods
def __init__(self, operation, return_type, command_alias=None):
self.operation = operation
self.return_type = return_type
self.opname = command_alias if command_alias else operation.__name__.replace('_', '-')
class LongRunningOperation(object): #pylint: disable=too-few-public-methods
progress_file = sys.stderr
def __init__(self, start_msg='', finish_msg='', poll_interval_ms=1000.0):
self.start_msg = start_msg
self.finish_msg = finish_msg
self.poll_interval_ms = poll_interval_ms
def __call__(self, poller):
print(self.start_msg, file=self.progress_file)
succeeded = False
try:
while not poller.done():
if self.progress_file:
print('.', end='', file=self.progress_file)
self.progress_file.flush()
time.sleep(self.poll_interval_ms / 1000.0)
result = poller.result()
succeeded = True
return result
finally:
# Ensure that we get a newline after the dots...
if self.progress_file:
print(file=self.progress_file)
print(self.finish_msg if succeeded else '', file=self.progress_file)
def _decorate_command(name, func):
return command(name)(func)
def _decorate_description(desc, func):
return description(desc)(func)
def _decorate_option(spec, descr, target, func):
return option(spec, descr, target=target)(func)
def _decorate_option(command_table, func, name, **kwargs):
return command_table.option(name, kwargs=kwargs['kwargs'])(func)
def _get_member(obj, path):
"""Recursively walk down the dot-separated path
@ -59,16 +25,27 @@ def _get_member(obj, path):
Ex. a.b.c would get the property 'c' of property 'b' of the
object a
"""
path = path or ''
for segment in path.split('.'):
obj = getattr(obj, segment)
try:
obj = getattr(obj, segment)
except AttributeError:
pass
return obj
def _make_func(client_factory, member_path, return_type_or_func, unbound_func):
def call_client(args, unexpected): #pylint: disable=unused-argument
client = client_factory()
def call_client(args):
client = client_factory(args)
ops_instance = _get_member(client, member_path)
# TODO: Remove this conversion code once internal key references are updated (#116797761)
converted_params = {}
for key in args.keys():
converted_key = key.replace('-', '_')
converted_params[converted_key] = args[key]
try:
result = unbound_func(ops_instance, **args)
result = unbound_func(ops_instance, **converted_params)
if not return_type_or_func:
return {}
if callable(return_type_or_func):
@ -94,23 +71,61 @@ def _option_description(operation, arg):
return ' '.join(l.split(':')[-1] for l in inspect.getdoc(operation).splitlines()
if l.startswith(':param') and arg + ':' in l)
def build_operation(command_name, member_path, client_type, operations, #pylint: disable=dangerous-default-value
paramaliases=GLOBALPARAMALIASES):
for operation, return_type_name in operations:
opname = operation.__name__.replace('_', '-')
func = _make_func(client_type, member_path, return_type_name, operation)
func = _decorate_command(' '.join([command_name, opname]), func)
#pylint: disable=too-many-arguments
def build_operation(command_name,
member_path,
client_type,
operations,
command_table,
common_parameters=None,
extra_parameters=None):
merged_common_parameters = COMMON_PARAMETERS.copy()
merged_common_parameters.update(common_parameters or {})
for op in operations:
func = _make_func(client_type, member_path, op.return_type, op.operation)
args = []
try:
# only supported in python3 - falling back to argspec if not available
sig = inspect.signature(operation)
sig = inspect.signature(op.operation)
args = sig.parameters
except AttributeError:
sig = inspect.getargspec(operation) #pylint: disable=deprecated-method
sig = inspect.getargspec(op.operation) #pylint: disable=deprecated-method
args = sig.args
options = []
for arg in [a for a in args if not a in EXCLUDED_PARAMS]:
spec = paramaliases.get(arg, '--%s <%s>' % (arg, arg))
func = _decorate_option(spec, _option_description(operation, arg),
target=arg, func=func)
try:
# this works in python3
default = args[arg].default
required = default == inspect.Parameter.empty # pylint: disable=no-member
except TypeError:
arg_defaults = dict(zip(sig.args[-len(sig.defaults):], sig.defaults))
default = arg_defaults[arg] if arg in arg_defaults else None
required = False if default else True
# TODO: Add action here if a boolean default value exists to create a flag
common_param = merged_common_parameters.get(arg, {
'name': '--' + arg.replace('_', '-'),
'required': required,
'default': default,
'help': _option_description(op.operation, arg)
}).copy() # We need to make a copy to allow consumers to mutate the value
# retrieved from the common parameters without polluting future
# use...
common_param['dest'] = common_param.get('dest', arg)
options.append(common_param)
command_table[func] = {
'name': ' '.join([command_name, op.opname]),
'handler': func,
'arguments': options
}
if extra_parameters:
for item in extra_parameters.values() or []:
func = _decorate_option(command_table, func, item['name'], kwargs=item)

Просмотреть файл

@ -1,9 +1,10 @@
from .._profile import Profile
from ..commands import command, description, option
from ..commands import CommandTable
from .._locale import L
@command('account list')
@description(L('List the imported subscriptions.'))
command_table = CommandTable()
@command_table.command('account list', description=L('List the imported subscriptions.'))
def list_subscriptions(args, unexpected): #pylint: disable=unused-argument
"""
type: command
@ -20,12 +21,11 @@ def list_subscriptions(args, unexpected): #pylint: disable=unused-argument
return subscriptions
@command('account set')
@description(L('Set the current subscription'))
@option('--subscription-id -n <subscription-id>',
L('Subscription Id, unique name also works.'),
required=True)
def set_active_subscription(args, unexpected): #pylint: disable=unused-argument
@command_table.command('account set')
@command_table.description(L('Set the current subscription'))
@command_table.option('--subscription-id -n', metavar='SUBSCRIPTION_ID', dest='subscription_id',
help=L('Subscription Id, unique name also works.'))
def set_active_subscription(args):
"""
type: command
short-summary: this module does xyz one-line or so

Просмотреть файл

@ -8,19 +8,23 @@ from azure.mgmt.resource.subscriptions import (SubscriptionClient,
SubscriptionClientConfiguration)
from .._profile import Profile
from ..commands import command, description, option
from ..commands import CommandTable
#TODO: update adal-python to support it
#from .._debug import should_disable_connection_verify
from .._locale import L
@command('login')
@description(L('log in to an Azure subscription using Active Directory Organization Id'))
@option('--username -u <username>',
L('organization Id or service principal. Microsoft Account is not yet supported.'))
@option('--password -p <password>', L('user password or client secret, will prompt if not given.'))
@option('--service-principal', L('the credential represents a service principal.'))
@option('--tenant -t <tenant>', L('the tenant associated with the service principal.'))
def login(args, unexpected): #pylint: disable=unused-argument
command_table = CommandTable()
@command_table.command('login')
@command_table.description(L('log in to an Azure subscription using Active Directory Organization Id')) # pylint: disable=line-too-long
@command_table.option('--username -u',
help=L('organization Id or service principal. Microsoft Account is not yet supported.')) # pylint: disable=line-too-long
@command_table.option('--password -p',
help=L('user password or client secret, will prompt if not given.'))
@command_table.option('--service-principal',
help=L('the credential represents a service principal.'))
@command_table.option('--tenant -t', help=L('the tenant associated with the service principal.'))
def login(args):
interactive = False
username = args.get('username')

Просмотреть файл

@ -1,14 +1,14 @@
from .._profile import Profile
from ..commands import command, description, option
from ..commands import CommandTable
from .._locale import L
@command('logout')
@description(L('Log out from Azure subscription using Active Directory.'))
@option('--username -u <username>', L('User name used to log out from Azure Active Directory.'))
def logout(args, unexpected): #pylint: disable=unused-argument
username = args.get('username')
if not username:
raise ValueError(L('Please provide a valid username to logout.'))
command_table = CommandTable()
@command_table.command('logout',
description=L('Log out from Azure subscription using Active Directory.'))
@command_table.option('--username -u',
help=L('User name used to log out from Azure Active Directory.'),
required=True)
def logout(args):
profile = Profile()
profile.logout(username)
profile.logout(args['username'])

Просмотреть файл

@ -20,17 +20,14 @@ from azure.mgmt.network.operations import (ApplicationGatewaysOperations,
VirtualNetworksOperations)
from ._command_creation import get_mgmt_service_client
from ..commands._auto_command import build_operation, LongRunningOperation, GLOBALPARAMALIASES
from ..commands import command, description, option
from ..commands._auto_command import build_operation, AutoCommandDefinition
from ..commands import CommandTable, LongRunningOperation
def _network_client_factory():
command_table = CommandTable()
def _network_client_factory(*args): # pylint: disable=unused-argument
return get_mgmt_service_client(NetworkManagementClient, NetworkManagementClientConfiguration)
PARAMALIASES = GLOBALPARAMALIASES.copy()
PARAMALIASES.update({
'virtual_network_name': '--name <virtualNetworkName>',
'load_balancer_name': '--name <loadBalancerName>'
})
# pylint: disable=line-too-long
# Application gateways
@ -38,83 +35,83 @@ build_operation("network appgateway",
"application_gateways",
_network_client_factory,
[
(ApplicationGatewaysOperations.delete, LongRunningOperation(L('Deleting application gateway'), L('Application gateway deleted'))),
(ApplicationGatewaysOperations.get, 'ApplicationGateway'),
(ApplicationGatewaysOperations.list, '[ApplicationGateway]'),
(ApplicationGatewaysOperations.list_all, '[ApplicationGateway]'),
(ApplicationGatewaysOperations.start, LongRunningOperation(L('Starting application gateway'), L('Application gateway started'))),
(ApplicationGatewaysOperations.stop, LongRunningOperation(L('Stopping application gateway'), L('Application gateway stopped'))),
AutoCommandDefinition(ApplicationGatewaysOperations.delete, LongRunningOperation(L('Deleting application gateway'), L('Application gateway deleted'))),
AutoCommandDefinition(ApplicationGatewaysOperations.get, 'ApplicationGateway'),
AutoCommandDefinition(ApplicationGatewaysOperations.list, '[ApplicationGateway]'),
AutoCommandDefinition(ApplicationGatewaysOperations.list_all, '[ApplicationGateway]'),
AutoCommandDefinition(ApplicationGatewaysOperations.start, LongRunningOperation(L('Starting application gateway'), L('Application gateway started'))),
AutoCommandDefinition(ApplicationGatewaysOperations.stop, LongRunningOperation(L('Stopping application gateway'), L('Application gateway stopped'))),
],
PARAMALIASES)
command_table)
# ExpressRouteCircuitAuthorizationsOperations
build_operation("network expressroutecircuitauth",
"express_route_circuit_authorizations",
_network_client_factory,
[
(ExpressRouteCircuitAuthorizationsOperations.delete, LongRunningOperation(L('Deleting express route authorization'), L('Express route authorization deleted'))),
(ExpressRouteCircuitAuthorizationsOperations.get, 'ExpressRouteCircuitAuthorization'),
(ExpressRouteCircuitAuthorizationsOperations.list, '[ExpressRouteCircuitAuthorization]'),
AutoCommandDefinition(ExpressRouteCircuitAuthorizationsOperations.delete, LongRunningOperation(L('Deleting express route authorization'), L('Express route authorization deleted'))),
AutoCommandDefinition(ExpressRouteCircuitAuthorizationsOperations.get, 'ExpressRouteCircuitAuthorization'),
AutoCommandDefinition(ExpressRouteCircuitAuthorizationsOperations.list, '[ExpressRouteCircuitAuthorization]'),
],
PARAMALIASES)
command_table)
# ExpressRouteCircuitPeeringsOperations
build_operation("network expressroutecircuitpeering",
"express_route_circuit_peerings",
_network_client_factory,
[
(ExpressRouteCircuitPeeringsOperations.delete, LongRunningOperation(L('Deleting express route circuit peering'), L('Express route circuit peering deleted'))),
(ExpressRouteCircuitPeeringsOperations.get, 'ExpressRouteCircuitPeering'),
(ExpressRouteCircuitPeeringsOperations.list, '[ExpressRouteCircuitPeering]'),
AutoCommandDefinition(ExpressRouteCircuitPeeringsOperations.delete, LongRunningOperation(L('Deleting express route circuit peering'), L('Express route circuit peering deleted'))),
AutoCommandDefinition(ExpressRouteCircuitPeeringsOperations.get, 'ExpressRouteCircuitPeering'),
AutoCommandDefinition(ExpressRouteCircuitPeeringsOperations.list, '[ExpressRouteCircuitPeering]'),
],
PARAMALIASES)
command_table)
# ExpressRouteCircuitsOperations
build_operation("network expressroutecircuit",
"express_route_circuits",
_network_client_factory,
[
(ExpressRouteCircuitsOperations.delete, LongRunningOperation(L('Deleting express route circuit'), L('Express route circuit deleted'))),
(ExpressRouteCircuitsOperations.get, 'ExpressRouteCircuit'),
(ExpressRouteCircuitsOperations.list_arp_table, '[ExpressRouteCircuitArpTable]'),
(ExpressRouteCircuitsOperations.list_routes_table, '[ExpressRouteCircuitRoutesTable]'),
(ExpressRouteCircuitsOperations.list_stats, '[ExpressRouteCircuitStats]'),
(ExpressRouteCircuitsOperations.list, '[ExpressRouteCircuit]'),
(ExpressRouteCircuitsOperations.list_all, '[ExpressRouteCircuit]'),
AutoCommandDefinition(ExpressRouteCircuitsOperations.delete, LongRunningOperation(L('Deleting express route circuit'), L('Express route circuit deleted'))),
AutoCommandDefinition(ExpressRouteCircuitsOperations.get, 'ExpressRouteCircuit'),
AutoCommandDefinition(ExpressRouteCircuitsOperations.list_arp_table, '[ExpressRouteCircuitArpTable]', 'list-arp'),
AutoCommandDefinition(ExpressRouteCircuitsOperations.list_routes_table, '[ExpressRouteCircuitRoutesTable]', 'list-routes'),
AutoCommandDefinition(ExpressRouteCircuitsOperations.list_stats, '[ExpressRouteCircuitStats]'),
AutoCommandDefinition(ExpressRouteCircuitsOperations.list, '[ExpressRouteCircuit]'),
AutoCommandDefinition(ExpressRouteCircuitsOperations.list_all, '[ExpressRouteCircuit]'),
],
PARAMALIASES)
command_table)
# ExpressRouteServiceProvidersOperations
build_operation("network expressroutesp",
"express_route_service_providers",
_network_client_factory,
[
(ExpressRouteServiceProvidersOperations.list, '[ExpressRouteServiceProvider]'),
AutoCommandDefinition(ExpressRouteServiceProvidersOperations.list, '[ExpressRouteServiceProvider]'),
],
PARAMALIASES)
command_table)
# LoadBalancersOperations
build_operation("network lb",
"load_balancers",
_network_client_factory,
[
(LoadBalancersOperations.delete, LongRunningOperation(L('Deleting load balancer'), L('Load balancer deleted'))),
(LoadBalancersOperations.get, 'LoadBalancer'),
(LoadBalancersOperations.list_all, '[LoadBalancer]'),
(LoadBalancersOperations.list, '[LoadBalancer]'),
AutoCommandDefinition(LoadBalancersOperations.delete, LongRunningOperation(L('Deleting load balancer'), L('Load balancer deleted'))),
AutoCommandDefinition(LoadBalancersOperations.get, 'LoadBalancer'),
AutoCommandDefinition(LoadBalancersOperations.list_all, '[LoadBalancer]'),
AutoCommandDefinition(LoadBalancersOperations.list, '[LoadBalancer]'),
],
PARAMALIASES)
command_table)
# LocalNetworkGatewaysOperations
build_operation("network localgateways",
"local_network_gateways",
_network_client_factory,
[
(LocalNetworkGatewaysOperations.get, 'LocalNetworkGateway'),
(LocalNetworkGatewaysOperations.delete, LongRunningOperation(L('Deleting local network gateway'), L('Local network gateway deleted'))),
(LocalNetworkGatewaysOperations.list, '[LocalNetworkGateway]'),
AutoCommandDefinition(LocalNetworkGatewaysOperations.get, 'LocalNetworkGateway'),
AutoCommandDefinition(LocalNetworkGatewaysOperations.delete, LongRunningOperation(L('Deleting local network gateway'), L('Local network gateway deleted'))),
AutoCommandDefinition(LocalNetworkGatewaysOperations.list, '[LocalNetworkGateway]'),
],
PARAMALIASES)
command_table)
# NetworkInterfacesOperations
@ -122,140 +119,140 @@ build_operation("network nic",
"network_interfaces",
_network_client_factory,
[
(NetworkInterfacesOperations.delete, LongRunningOperation(L('Deleting network interface'), L('Network interface deleted'))),
(NetworkInterfacesOperations.get, 'NetworkInterface'),
(NetworkInterfacesOperations.list_virtual_machine_scale_set_vm_network_interfaces, '[NetworkInterface]'),
(NetworkInterfacesOperations.list_virtual_machine_scale_set_network_interfaces, '[NetworkInterface]'),
(NetworkInterfacesOperations.get_virtual_machine_scale_set_network_interface, 'NetworkInterface'),
(NetworkInterfacesOperations.list_all, '[NetworkInterface]'),
(NetworkInterfacesOperations.list, '[NetworkInterface]'),
AutoCommandDefinition(NetworkInterfacesOperations.delete, LongRunningOperation(L('Deleting network interface'), L('Network interface deleted'))),
AutoCommandDefinition(NetworkInterfacesOperations.get, 'NetworkInterface'),
AutoCommandDefinition(NetworkInterfacesOperations.list_virtual_machine_scale_set_vm_network_interfaces, '[NetworkInterface]', 'list-scaleset-vm-network-interfaces'),
AutoCommandDefinition(NetworkInterfacesOperations.list_virtual_machine_scale_set_network_interfaces, '[NetworkInterface]', 'list-scaleset-network-interfaces'),
AutoCommandDefinition(NetworkInterfacesOperations.get_virtual_machine_scale_set_network_interface, 'NetworkInterface', 'get-scaleset-network-interface'),
AutoCommandDefinition(NetworkInterfacesOperations.list_all, '[NetworkInterface]'),
AutoCommandDefinition(NetworkInterfacesOperations.list, '[NetworkInterface]'),
],
PARAMALIASES)
command_table)
# NetworkSecurityGroupsOperations
build_operation("network securitygroup",
"network_security_groups",
_network_client_factory,
[
(NetworkSecurityGroupsOperations.delete, LongRunningOperation(L('Deleting network security group'), L('Network security group deleted'))),
(NetworkSecurityGroupsOperations.delete, 'NetworkSecurityGroup'),
(NetworkSecurityGroupsOperations.list_all, '[NetworkSecurityGroup]'),
(NetworkSecurityGroupsOperations.list, '[NetworkSecurityGroup]'),
AutoCommandDefinition(NetworkSecurityGroupsOperations.delete, LongRunningOperation(L('Deleting network security group'), L('Network security group deleted'))),
AutoCommandDefinition(NetworkSecurityGroupsOperations.delete, 'NetworkSecurityGroup'),
AutoCommandDefinition(NetworkSecurityGroupsOperations.list_all, '[NetworkSecurityGroup]'),
AutoCommandDefinition(NetworkSecurityGroupsOperations.list, '[NetworkSecurityGroup]'),
],
PARAMALIASES)
command_table)
# PublicIPAddressesOperations
build_operation("network publicipaddress",
"public_ip_addresses",
_network_client_factory,
[
(PublicIPAddressesOperations.delete, LongRunningOperation(L('Deleting public IP address'), L('Public IP address deleted'))),
(PublicIPAddressesOperations.get, 'PublicIPAddress'),
(PublicIPAddressesOperations.list_all, '[PublicIPAddress]'),
(PublicIPAddressesOperations.list, '[PublicIPAddress]'),
AutoCommandDefinition(PublicIPAddressesOperations.delete, LongRunningOperation(L('Deleting public IP address'), L('Public IP address deleted'))),
AutoCommandDefinition(PublicIPAddressesOperations.get, 'PublicIPAddress'),
AutoCommandDefinition(PublicIPAddressesOperations.list_all, '[PublicIPAddress]'),
AutoCommandDefinition(PublicIPAddressesOperations.list, '[PublicIPAddress]'),
],
PARAMALIASES)
command_table)
# RouteTablesOperations
build_operation("network routetable",
"route_tables",
_network_client_factory,
[
(RouteTablesOperations.delete, LongRunningOperation(L('Deleting route table'), L('Route table deleted'))),
(RouteTablesOperations.get, 'RouteTable'),
(RouteTablesOperations.list, '[RouteTable]'),
(RouteTablesOperations.list_all, '[RouteTable]'),
AutoCommandDefinition(RouteTablesOperations.delete, LongRunningOperation(L('Deleting route table'), L('Route table deleted'))),
AutoCommandDefinition(RouteTablesOperations.get, 'RouteTable'),
AutoCommandDefinition(RouteTablesOperations.list, '[RouteTable]'),
AutoCommandDefinition(RouteTablesOperations.list_all, '[RouteTable]'),
],
PARAMALIASES)
command_table)
# RoutesOperations
build_operation("network routeoperation",
"routes",
_network_client_factory,
[
(RoutesOperations.delete, LongRunningOperation(L('Deleting route'), L('Route deleted'))),
(RoutesOperations.get, 'Route'),
(RoutesOperations.list, '[Route]'),
AutoCommandDefinition(RoutesOperations.delete, LongRunningOperation(L('Deleting route'), L('Route deleted'))),
AutoCommandDefinition(RoutesOperations.get, 'Route'),
AutoCommandDefinition(RoutesOperations.list, '[Route]'),
],
PARAMALIASES)
command_table)
# SecurityRulesOperations
build_operation("network securityrules",
"security_rules",
_network_client_factory,
[
(SecurityRulesOperations.delete, LongRunningOperation(L('Deleting security rule'), L('Security rule deleted'))),
(SecurityRulesOperations.get, 'SecurityRule'),
(SecurityRulesOperations.list, '[SecurityRule]'),
AutoCommandDefinition(SecurityRulesOperations.delete, LongRunningOperation(L('Deleting security rule'), L('Security rule deleted'))),
AutoCommandDefinition(SecurityRulesOperations.get, 'SecurityRule'),
AutoCommandDefinition(SecurityRulesOperations.list, '[SecurityRule]'),
],
PARAMALIASES)
command_table)
# SubnetsOperations
build_operation("network subnet",
"subnets",
_network_client_factory,
[
(SubnetsOperations.delete, LongRunningOperation(L('Deleting subnet'), L('Subnet deleted'))),
(SubnetsOperations.get, 'Subnet'),
(SubnetsOperations.list, '[Subnet]'),
AutoCommandDefinition(SubnetsOperations.delete, LongRunningOperation(L('Deleting subnet'), L('Subnet deleted'))),
AutoCommandDefinition(SubnetsOperations.get, 'Subnet'),
AutoCommandDefinition(SubnetsOperations.list, '[Subnet]'),
],
PARAMALIASES)
command_table)
# UsagesOperations
build_operation("network usage",
"usages",
_network_client_factory,
[
(UsagesOperations.list, '[Usage]'),
AutoCommandDefinition(UsagesOperations.list, '[Usage]'),
],
PARAMALIASES)
command_table)
# VirtualNetworkGatewayConnectionsOperations
build_operation("network vnetgatewayconnection",
"virtual_network_gateway_connections",
_network_client_factory,
[
(VirtualNetworkGatewayConnectionsOperations.delete, LongRunningOperation(L('Deleting virtual network gateway connection'), L('Virtual network gateway connection deleted'))),
(VirtualNetworkGatewayConnectionsOperations.get, 'VirtualNetworkGatewayConnection'),
(VirtualNetworkGatewayConnectionsOperations.get_shared_key, 'ConnectionSharedKeyResult'),
(VirtualNetworkGatewayConnectionsOperations.list, '[VirtualNetworkGatewayConnection]'),
(VirtualNetworkGatewayConnectionsOperations.reset_shared_key, 'ConnectionResetSharedKey'),
(VirtualNetworkGatewayConnectionsOperations.set_shared_key, 'ConnectionSharedKey'),
AutoCommandDefinition(VirtualNetworkGatewayConnectionsOperations.delete, LongRunningOperation(L('Deleting virtual network gateway connection'), L('Virtual network gateway connection deleted'))),
AutoCommandDefinition(VirtualNetworkGatewayConnectionsOperations.get, 'VirtualNetworkGatewayConnection'),
AutoCommandDefinition(VirtualNetworkGatewayConnectionsOperations.get_shared_key, 'ConnectionSharedKeyResult'),
AutoCommandDefinition(VirtualNetworkGatewayConnectionsOperations.list, '[VirtualNetworkGatewayConnection]'),
AutoCommandDefinition(VirtualNetworkGatewayConnectionsOperations.reset_shared_key, 'ConnectionResetSharedKey'),
AutoCommandDefinition(VirtualNetworkGatewayConnectionsOperations.set_shared_key, 'ConnectionSharedKey'),
],
PARAMALIASES)
command_table)
# VirtualNetworkGatewaysOperations
build_operation("network vnetgateway",
"virtual_network_gateways",
_network_client_factory,
[
(VirtualNetworkGatewaysOperations.delete, LongRunningOperation(L('Deleting virtual network gateway'), L('Virtual network gateway deleted'))),
(VirtualNetworkGatewaysOperations.get, 'VirtualNetworkGateway'),
(VirtualNetworkGatewaysOperations.list, '[VirtualNetworkGateway]'),
(VirtualNetworkGatewaysOperations.reset, 'VirtualNetworkGateway'),
AutoCommandDefinition(VirtualNetworkGatewaysOperations.delete, LongRunningOperation(L('Deleting virtual network gateway'), L('Virtual network gateway deleted'))),
AutoCommandDefinition(VirtualNetworkGatewaysOperations.get, 'VirtualNetworkGateway'),
AutoCommandDefinition(VirtualNetworkGatewaysOperations.list, '[VirtualNetworkGateway]'),
AutoCommandDefinition(VirtualNetworkGatewaysOperations.reset, 'VirtualNetworkGateway'),
],
PARAMALIASES)
command_table)
# VirtualNetworksOperations
build_operation("network vnet",
"virtual_networks",
_network_client_factory,
[
(VirtualNetworksOperations.delete, LongRunningOperation(L('Deleting virtual network'), L('Virtual network deleted'))),
(VirtualNetworksOperations.get, 'VirtualNetwork'),
(VirtualNetworksOperations.list, '[VirtualNetwork]'),
(VirtualNetworksOperations.list_all, '[VirtualNetwork]'),
AutoCommandDefinition(VirtualNetworksOperations.delete, LongRunningOperation(L('Deleting virtual network'), L('Virtual network deleted'))),
AutoCommandDefinition(VirtualNetworksOperations.get, 'VirtualNetwork'),
AutoCommandDefinition(VirtualNetworksOperations.list, '[VirtualNetwork]'),
AutoCommandDefinition(VirtualNetworksOperations.list_all, '[VirtualNetwork]'),
],
PARAMALIASES)
command_table)
@command('network vnet create')
@description(L('Create or update a virtual network (VNet)'))
@option('--resource-group -g <resourceGroup>', L('the resource group name'), required=True)
@option('--name -n <vnetName>', L('the VNet name'), required=True)
@option('--location -l <location>', L('the VNet location'), required=True)
@option('--address-space -a <vnetAddressSpace>', L('the VNet address-space in CIDR notation or multiple address-spaces, quoted and space-separated'), required=True)
@option('--dns-servers -d <dnsServers>', L('the VNet DNS servers, quoted and space-separated'))
def create_update_vnet(args, unexpected): #pylint: disable=unused-argument
@command_table.command('network vnet create')
@command_table.description(L('Create or update a virtual network (VNet)'))
@command_table.option('--resource-group -g', help=L('the resource group name'), required=True)
@command_table.option('--name -n', help=L('the VNet name'), required=True)
@command_table.option('--location -l', help=L('the VNet location'), required=True)
@command_table.option('--address-space -a', metavar='ADDRESS SPACE', help=L('the VNet address-space in CIDR notation or multiple address-spaces, quoted and space-separated'), required=True)
@command_table.option('--dns-servers -d', metavar='DNS SERVERS', help=L('the VNet DNS servers, quoted and space-separated'))
def create_update_vnet(args):
from azure.mgmt.network.models import AddressSpace, DhcpOptions, VirtualNetwork
resource_group = args.get('resource-group')
@ -273,13 +270,13 @@ def create_update_vnet(args, unexpected): #pylint: disable=unused-argument
poller = smc.virtual_networks.create_or_update(resource_group, name, vnet_settings)
return op(poller)
@command('network subnet create')
@description(L('Create or update a virtual network (VNet) subnet'))
@option('--resource-group -g <resourceGroup>', L('the the resource group name'), required=True)
@option('--name -n <subnetName>', L('the the subnet name'), required=True)
@option('--vnet -v <vnetName>', L('the name of the subnet vnet'), required=True)
@option('--address-prefix -a <addressPrefix>', L('the the address prefix in CIDR format'), required=True)
def create_update_subnet(args, unexpected): #pylint: disable=unused-argument
@command_table.command('network subnet create')
@command_table.description(L('Create or update a virtual network (VNet) subnet'))
@command_table.option('--resource-group -g', help=L('the the resource group name'), required=True)
@command_table.option('--name -n', help=L('the the subnet name'), required=True)
@command_table.option('--vnet -v', help=L('the name of the subnet vnet'), required=True)
@command_table.option('--address-prefix -a', help=L('the the address prefix in CIDR format'), required=True)
def create_update_subnet(args):
from azure.mgmt.network.models import Subnet
resource_group = args.get('resource-group')

Просмотреть файл

@ -1,16 +1,17 @@
from .._argparse import IncorrectUsageError
from ..commands import command, description, option
from ..parser import IncorrectUsageError
from ..commands import CommandTable, COMMON_PARAMETERS
from ._command_creation import get_mgmt_service_client
from .._locale import L
from azure.mgmt.resource.resources import (ResourceManagementClient,
ResourceManagementClientConfiguration)
@command('resource group list')
@description('List resource groups')
@option('--tag-name -tn <tagName>', L("the resource group's tag name"))
@option('--tag-value -tv <tagValue>', L("the resource group's tag value"))
def list_groups(args, unexpected): #pylint: disable=unused-argument
command_table = CommandTable()
@command_table.command('resource group list', description=L('List resource groups'))
@command_table.option('--tag-name -tn', help=L("the resource group's tag name"))
@command_table.option('--tag-value -tv', help=L("the resource group's tag value"))
def list_groups(args):
from azure.mgmt.resource.resources.models import ResourceGroup, ResourceGroupFilter
rmc = get_mgmt_service_client(ResourceManagementClient, ResourceManagementClientConfiguration)
@ -26,16 +27,19 @@ def list_groups(args, unexpected): #pylint: disable=unused-argument
groups = rmc.resource_groups.list(filter=filter_text)
return list(groups)
@command('resource show')
@description(L('Show details of a specific resource in a resource group or subscription'))
@option('--resource-group -g <resourceGroup>', L('the resource group name'), required=True)
@option('--name -n <name>', L('the resource name'), required=True)
@option('--resource-type -r <resourceType>',
L('the resource type in format: <provider-namespace>/<type>'), required=True)
@option('--api-version -o <apiVersion>', L('the API version of the resource provider'))
@option('--parent <parent>',
L('the name of the parent resource (if needed), in <parent-type>/<parent-name> format'))
def show_resource(args, unexpected): #pylint: disable=unused-argument
@command_table.command('resource show')
@command_table.description(
L('Show details of a specific resource in a resource group or subscription'))
@command_table.option(**COMMON_PARAMETERS['resource_group_name'])
@command_table.option('--name -n', help=L('the resource name'), required=True)
@command_table.option('--resource-type -r',
help=L('the resource type in format: <provider-namespace>/<type>'),
required=True)
@command_table.option('--api-version -o', help=L('the API version of the resource provider'))
@command_table.option('--parent',
help=L('the name of the parent resource (if needed), ' + \
'in <parent-type>/<parent-name> format'))
def show_resource(args):
rmc = get_mgmt_service_client(ResourceManagementClient, ResourceManagementClientConfiguration)
full_type = args.get('resource-type').split('/')
try:

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -1,9 +1,11 @@
from __future__ import print_function
from ..commands import command, description
from ..commands import CommandTable
from .._locale import L
@command('taskhelp deploy-arm-template')
@description(L('How to deploy and ARM template using Azure CLI.'))
command_table = CommandTable()
@command_table.command('taskhelp deploy-arm-template')
@command_table.description(L('How to deploy and ARM template using Azure CLI.'))
def deploy_template_help(args, unexpected): #pylint: disable=unused-argument
print(L("""
***********************

Просмотреть файл

@ -11,109 +11,121 @@ from azure.mgmt.compute.operations import (AvailabilitySetsOperations,
VirtualMachineScaleSetVMsOperations)
from ._command_creation import get_mgmt_service_client
from ..commands._auto_command import build_operation, LongRunningOperation
from ..commands._auto_command import build_operation, AutoCommandDefinition
from ..commands import CommandTable, LongRunningOperation
def _compute_client_factory():
def _compute_client_factory(*args): # pylint: disable=unused-argument
return get_mgmt_service_client(ComputeManagementClient, ComputeManagementClientConfiguration)
command_table = CommandTable()
# pylint: disable=line-too-long
build_operation("vm availabilityset",
"availability_sets",
_compute_client_factory,
[
(AvailabilitySetsOperations.delete, None),
(AvailabilitySetsOperations.get, 'AvailabilitySet'),
(AvailabilitySetsOperations.list, '[AvailabilitySet]'),
(AvailabilitySetsOperations.list_available_sizes, '[VirtualMachineSize]')
])
AutoCommandDefinition(AvailabilitySetsOperations.delete, None),
AutoCommandDefinition(AvailabilitySetsOperations.get, 'AvailabilitySet'),
AutoCommandDefinition(AvailabilitySetsOperations.list, '[AvailabilitySet]'),
AutoCommandDefinition(AvailabilitySetsOperations.list_available_sizes, '[VirtualMachineSize]', 'list-sizes')
],
command_table)
build_operation("vm machineextensionimage",
"virtual_machine_extension_images",
_compute_client_factory,
[
(VirtualMachineExtensionImagesOperations.get, 'VirtualMachineExtensionImage'),
(VirtualMachineExtensionImagesOperations.list_types, '[VirtualMachineImageResource]'),
(VirtualMachineExtensionImagesOperations.list_versions, '[VirtualMachineImageResource]'),
])
AutoCommandDefinition(VirtualMachineExtensionImagesOperations.get, 'VirtualMachineExtensionImage'),
AutoCommandDefinition(VirtualMachineExtensionImagesOperations.list_types, '[VirtualMachineImageResource]'),
AutoCommandDefinition(VirtualMachineExtensionImagesOperations.list_versions, '[VirtualMachineImageResource]'),
],
command_table)
build_operation("vm extension",
"virtual_machine_extensions",
_compute_client_factory,
[
(VirtualMachineExtensionsOperations.delete, LongRunningOperation(L('Deleting VM extension'), L('VM extension deleted'))),
(VirtualMachineExtensionsOperations.get, 'VirtualMachineExtension'),
])
AutoCommandDefinition(VirtualMachineExtensionsOperations.delete, LongRunningOperation(L('Deleting VM extension'), L('VM extension deleted'))),
AutoCommandDefinition(VirtualMachineExtensionsOperations.get, 'VirtualMachineExtension'),
],
command_table)
build_operation("vm image",
"virtual_machine_images",
_compute_client_factory,
[
(VirtualMachineImagesOperations.get, 'VirtualMachineImage'),
(VirtualMachineImagesOperations.list, '[VirtualMachineImageResource]'),
(VirtualMachineImagesOperations.list_offers, '[VirtualMachineImageResource]'),
(VirtualMachineImagesOperations.list_publishers, '[VirtualMachineImageResource]'),
(VirtualMachineImagesOperations.list_skus, '[VirtualMachineImageResource]'),
])
AutoCommandDefinition(VirtualMachineImagesOperations.get, 'VirtualMachineImage'),
AutoCommandDefinition(VirtualMachineImagesOperations.list, '[VirtualMachineImageResource]'),
AutoCommandDefinition(VirtualMachineImagesOperations.list_offers, '[VirtualMachineImageResource]'),
AutoCommandDefinition(VirtualMachineImagesOperations.list_publishers, '[VirtualMachineImageResource]'),
AutoCommandDefinition(VirtualMachineImagesOperations.list_skus, '[VirtualMachineImageResource]'),
],
command_table)
build_operation("vm usage",
"usage",
_compute_client_factory,
[
(UsageOperations.list, '[Usage]'),
])
AutoCommandDefinition(UsageOperations.list, '[Usage]'),
],
command_table)
build_operation("vm size",
"virtual_machine_sizes",
_compute_client_factory,
[
(VirtualMachineSizesOperations.list, '[VirtualMachineSize]'),
])
AutoCommandDefinition(VirtualMachineSizesOperations.list, '[VirtualMachineSize]'),
],
command_table)
build_operation("vm",
"virtual_machines",
_compute_client_factory,
[
(VirtualMachinesOperations.delete, LongRunningOperation(L('Deleting VM'), L('VM Deleted'))),
(VirtualMachinesOperations.deallocate, LongRunningOperation(L('Deallocating VM'), L('VM Deallocated'))),
(VirtualMachinesOperations.generalize, None),
(VirtualMachinesOperations.get, 'VirtualMachine'),
(VirtualMachinesOperations.list, '[VirtualMachine]'),
(VirtualMachinesOperations.list_all, '[VirtualMachine]'),
(VirtualMachinesOperations.list_available_sizes, '[VirtualMachineSize]'),
(VirtualMachinesOperations.power_off, LongRunningOperation(L('Powering off VM'), L('VM powered off'))),
(VirtualMachinesOperations.restart, LongRunningOperation(L('Restarting VM'), L('VM Restarted'))),
(VirtualMachinesOperations.start, LongRunningOperation(L('Starting VM'), L('VM Started'))),
])
AutoCommandDefinition(VirtualMachinesOperations.delete, LongRunningOperation(L('Deleting VM'), L('VM Deleted'))),
AutoCommandDefinition(VirtualMachinesOperations.deallocate, LongRunningOperation(L('Deallocating VM'), L('VM Deallocated'))),
AutoCommandDefinition(VirtualMachinesOperations.generalize, None),
AutoCommandDefinition(VirtualMachinesOperations.get, 'VirtualMachine'),
AutoCommandDefinition(VirtualMachinesOperations.list, '[VirtualMachine]'),
AutoCommandDefinition(VirtualMachinesOperations.list_all, '[VirtualMachine]'),
AutoCommandDefinition(VirtualMachinesOperations.list_available_sizes, '[VirtualMachineSize]', 'list-sizes'),
AutoCommandDefinition(VirtualMachinesOperations.power_off, LongRunningOperation(L('Powering off VM'), L('VM powered off'))),
AutoCommandDefinition(VirtualMachinesOperations.restart, LongRunningOperation(L('Restarting VM'), L('VM Restarted'))),
AutoCommandDefinition(VirtualMachinesOperations.start, LongRunningOperation(L('Starting VM'), L('VM Started'))),
],
command_table)
build_operation("vm scaleset",
"virtual_machine_scale_sets",
_compute_client_factory,
[
(VirtualMachineScaleSetsOperations.deallocate, LongRunningOperation(L('Deallocating VM scale set'), L('VM scale set deallocated'))),
(VirtualMachineScaleSetsOperations.delete, LongRunningOperation(L('Deleting VM scale set'), L('VM scale set deleted'))),
(VirtualMachineScaleSetsOperations.get, 'VirtualMachineScaleSet'),
(VirtualMachineScaleSetsOperations.delete_instances, LongRunningOperation(L('Deleting VM scale set instances'), L('VM scale set instances deleted'))),
(VirtualMachineScaleSetsOperations.get_instance_view, 'VirtualMachineScaleSetInstanceView'),
(VirtualMachineScaleSetsOperations.list, '[VirtualMachineScaleSet]'),
(VirtualMachineScaleSetsOperations.list_all, '[VirtualMachineScaleSet]'),
(VirtualMachineScaleSetsOperations.list_skus, '[VirtualMachineScaleSet]'),
(VirtualMachineScaleSetsOperations.power_off, LongRunningOperation(L('Powering off VM scale set'), L('VM scale set powered off'))),
(VirtualMachineScaleSetsOperations.restart, LongRunningOperation(L('Restarting VM scale set'), L('VM scale set restarted'))),
(VirtualMachineScaleSetsOperations.start, LongRunningOperation(L('Starting VM scale set'), L('VM scale set started'))),
(VirtualMachineScaleSetsOperations.update_instances, LongRunningOperation(L('Updating VM scale set instances'), L('VM scale set instances updated'))),
])
AutoCommandDefinition(VirtualMachineScaleSetsOperations.deallocate, LongRunningOperation(L('Deallocating VM scale set'), L('VM scale set deallocated'))),
AutoCommandDefinition(VirtualMachineScaleSetsOperations.delete, LongRunningOperation(L('Deleting VM scale set'), L('VM scale set deleted'))),
AutoCommandDefinition(VirtualMachineScaleSetsOperations.get, 'VirtualMachineScaleSet'),
AutoCommandDefinition(VirtualMachineScaleSetsOperations.delete_instances, LongRunningOperation(L('Deleting VM scale set instances'), L('VM scale set instances deleted'))),
AutoCommandDefinition(VirtualMachineScaleSetsOperations.get_instance_view, 'VirtualMachineScaleSetInstanceView'),
AutoCommandDefinition(VirtualMachineScaleSetsOperations.list, '[VirtualMachineScaleSet]'),
AutoCommandDefinition(VirtualMachineScaleSetsOperations.list_all, '[VirtualMachineScaleSet]'),
AutoCommandDefinition(VirtualMachineScaleSetsOperations.list_skus, '[VirtualMachineScaleSet]'),
AutoCommandDefinition(VirtualMachineScaleSetsOperations.power_off, LongRunningOperation(L('Powering off VM scale set'), L('VM scale set powered off'))),
AutoCommandDefinition(VirtualMachineScaleSetsOperations.restart, LongRunningOperation(L('Restarting VM scale set'), L('VM scale set restarted'))),
AutoCommandDefinition(VirtualMachineScaleSetsOperations.start, LongRunningOperation(L('Starting VM scale set'), L('VM scale set started'))),
AutoCommandDefinition(VirtualMachineScaleSetsOperations.update_instances, LongRunningOperation(L('Updating VM scale set instances'), L('VM scale set instances updated'))),
],
command_table)
build_operation("vm scalesetvm",
"virtual_machine_scale_set_vms",
_compute_client_factory,
[
(VirtualMachineScaleSetVMsOperations.deallocate, LongRunningOperation(L('Deallocating VM scale set VMs'), L('VM scale set VMs deallocated'))),
(VirtualMachineScaleSetVMsOperations.delete, LongRunningOperation(L('Deleting VM scale set VMs'), L('VM scale set VMs deleted'))),
(VirtualMachineScaleSetVMsOperations.get, 'VirtualMachineScaleSetVM'),
(VirtualMachineScaleSetVMsOperations.get_instance_view, 'VirtualMachineScaleSetVMInstanceView'),
(VirtualMachineScaleSetVMsOperations.list, '[VirtualMachineScaleSetVM]'),
(VirtualMachineScaleSetVMsOperations.power_off, LongRunningOperation(L('Powering off VM scale set VMs'), L('VM scale set VMs powered off'))),
(VirtualMachineScaleSetVMsOperations.restart, LongRunningOperation(L('Restarting VM scale set VMs'), L('VM scale set VMs restarted'))),
(VirtualMachineScaleSetVMsOperations.start, LongRunningOperation(L('Starting VM scale set VMs'), L('VM scale set VMs started'))),
])
AutoCommandDefinition(VirtualMachineScaleSetVMsOperations.deallocate, LongRunningOperation(L('Deallocating VM scale set VMs'), L('VM scale set VMs deallocated'))),
AutoCommandDefinition(VirtualMachineScaleSetVMsOperations.delete, LongRunningOperation(L('Deleting VM scale set VMs'), L('VM scale set VMs deleted'))),
AutoCommandDefinition(VirtualMachineScaleSetVMsOperations.get, 'VirtualMachineScaleSetVM'),
AutoCommandDefinition(VirtualMachineScaleSetVMsOperations.get_instance_view, 'VirtualMachineScaleSetVMInstanceView'),
AutoCommandDefinition(VirtualMachineScaleSetVMsOperations.list, '[VirtualMachineScaleSetVM]'),
AutoCommandDefinition(VirtualMachineScaleSetVMsOperations.power_off, LongRunningOperation(L('Powering off VM scale set VMs'), L('VM scale set VMs powered off'))),
AutoCommandDefinition(VirtualMachineScaleSetVMsOperations.restart, LongRunningOperation(L('Restarting VM scale set VMs'), L('VM scale set VMs restarted'))),
AutoCommandDefinition(VirtualMachineScaleSetVMsOperations.start, LongRunningOperation(L('Starting VM scale set VMs'), L('VM scale set VMs started'))),
],
command_table)

Просмотреть файл

@ -1,10 +1,6 @@
from ._event_dispatcher import EventDispatcher
from .query import register as register_query
from .query import register as register_query
from .transform import register as register_transform
from .experimental import register as register_experimental
event_dispatcher = EventDispatcher()
register_query(event_dispatcher)
register_transform(event_dispatcher)
register_experimental(event_dispatcher)
def register_extensions(application):
register_query(application)
register_transform(application)

Просмотреть файл

@ -1,46 +0,0 @@
from collections import defaultdict
class EventDispatcher(object):
"""Register for and raise events.
During the execution of a command, a set of events are raised
that allow extensions to change the flow of actions.
Clients can register handlers by calling the `EventDispatcher.register`
method passing in the event handler function.
"""
REGISTER_GLOBAL_PARAMETERS = 'RegisterGlobalParameters'
PARSING_PARAMETERS = 'ParsingParameters'
VALIDATING_PARAMETERS = 'ValidatingParameters'
EXECUTING_COMMAND = 'ExecutingCommand'
TRANSFORM_RESULT = 'TransformResult'
FILTER_RESULT = 'FilterResult'
def __init__(self):
self._handlers = defaultdict(lambda: [])
def raise_event(self, name, event_data):
for func in self._handlers[name]:
func(name, event_data)
def register(self, name, handler):
'''Register a callable that will be called when the
event `name` is raised.
param: name: The name of the event
param: handler: Function that takes two parameters;
name: name of the event raised
event_data: `dict` with event specific data.
'''
self._handlers[name].append(handler)
def event_handler(self, name):
'''Any function decorated by @event_handler will
be registered as a handler for the given event name
'''
def wrapper(func):
self.register(name, func)
return func
return wrapper

Просмотреть файл

@ -1,18 +1,26 @@
import collections
def register(event_dispatcher):
def handle_query_parameter(_, event_data):
def _register_global_parameter(parser):
# Let the program know that we are adding a parameter --query
parser.add_argument('--query', dest='_jmespath_query', metavar='JMESPATH',
help='JMESPath query string. See http://jmespath.org/ for more information and examples.') # pylint: disable=line-too-long
def register(application):
def handle_query_parameter(args):
try:
args = event_data['args']
query_index = args.index('--query')
query_value = args[query_index + 1]
del args[query_index:query_index + 1]
except ValueError:
query_value = args._jmespath_query # pylint: disable=protected-access
del args._jmespath_query
if query_value:
def filter_output(event_data):
from jmespath import search, Options
event_data['result'] = search(query_value, event_data['result'],
Options(collections.OrderedDict))
application.register(application.FILTER_RESULT, filter_output)
except AttributeError:
pass
else:
def filter_output(_, event_data):
import jmespath
event_data['result'] = jmespath.search(query_value, event_data['result'],
jmespath.Options(collections.OrderedDict))
event_dispatcher.register(event_dispatcher.FILTER_RESULT, filter_output)
event_dispatcher.register(event_dispatcher.REGISTER_GLOBAL_PARAMETERS, handle_query_parameter)
application.register(application.GLOBAL_PARSER_CREATED, _register_global_parameter)
application.register(application.COMMAND_PARSER_PARSED, handle_query_parameter)

Просмотреть файл

@ -1,27 +1,32 @@
import re
def register(event_dispatcher):
@event_dispatcher.event_handler(event_dispatcher.TRANSFORM_RESULT)
def resource_group_transform(_, event_data): # pylint: disable=unused-variable
def parse_id(strid):
parsed = {}
parts = re.split('/', strid)
parsed['resource-group'] = parts[4]
parsed['name'] = parts[8]
return parsed
def register(application):
application.register(application.TRANSFORM_RESULT, _resource_group_transform)
def add_resource_group(obj):
if isinstance(obj, list):
for array_item in obj:
add_resource_group(array_item)
elif isinstance(obj, dict):
try:
if 'resourceGroup' not in obj:
if obj['id']:
obj['resourceGroup'] = parse_id(obj['id'])['resource-group']
except (KeyError, IndexError):
pass
for item_key in obj:
add_resource_group(obj[item_key])
def _parse_id(strid):
parsed = {}
parts = re.split('/', strid)
if parts[3] != 'resourceGroups':
raise KeyError()
parsed['resource-group'] = parts[4]
parsed['name'] = parts[8]
return parsed
def _add_resource_group(obj):
if isinstance(obj, list):
for array_item in obj:
_add_resource_group(array_item)
elif isinstance(obj, dict):
try:
if 'resourceGroup' not in obj:
if obj['id']:
obj['resourceGroup'] = _parse_id(obj['id'])['resource-group']
except (KeyError, IndexError):
pass
for item_key in obj:
_add_resource_group(obj[item_key])
def _resource_group_transform(event_data):
_add_resource_group(event_data['result'])
add_resource_group(event_data['result'])

Просмотреть файл

@ -1,11 +1,11 @@
import os
import sys
from ._argparse import ArgumentParser
from .application import Application, Configuration
from ._logging import configure_logging, logger
from ._session import Session
from ._output import OutputProducer
from azure.cli.extensions import event_dispatcher
# CONFIG provides external configuration options
CONFIG = Session()
@ -24,29 +24,16 @@ def main(args, file=sys.stdout): #pylint: disable=redefined-builtin
'locale',
CONFIG.get('locale', 'en-US')))
event_dispatcher.raise_event(event_dispatcher.REGISTER_GLOBAL_PARAMETERS,
event_data={'args': args})
parser = ArgumentParser("az")
import azure.cli.commands as commands
# Find the first noun on the command line and only load commands from that
# module to improve startup time.
for a in args:
if not a.startswith('-'):
commands.add_to_parser(parser, a)
break
else:
# No noun found, so load all commands.
commands.add_to_parser(parser)
config = Configuration(args)
app = Application(config)
app.load_commands()
try:
cmd_result = parser.execute(args)
cmd_result = app.execute(args)
# Commands can return a dictionary/list of results
# If they do, we print the results.
if cmd_result.result:
formatter = OutputProducer.get_formatter(cmd_result.output_format)
OutputProducer(formatter=formatter, file=file).out(cmd_result.result)
if cmd_result:
formatter = OutputProducer.get_formatter(app.configuration.output_format)
OutputProducer(formatter=formatter, file=file).out(cmd_result)
except RuntimeError as ex:
logger.error(ex.args[0])
return ex.args[1] if len(ex.args) >= 2 else -1

65
src/azure/cli/parser.py Normal file
Просмотреть файл

@ -0,0 +1,65 @@
import argparse
class IncorrectUsageError(Exception):
'''Raised when a command is incorrectly used and the usage should be
displayed to the user.
'''
pass
class AzCliCommandParser(argparse.ArgumentParser):
"""ArgumentParser implementation specialized for the
Azure CLI utility.
"""
def __init__(self, **kwargs):
super(AzCliCommandParser, self).__init__(**kwargs)
self.subparsers = {}
self.parents = kwargs.get('parents', [])
def load_command_table(self, command_table):
"""Load a command table into our parser.
"""
# If we haven't already added a subparser, we
# better do it.
if not self.subparsers:
sp = self.add_subparsers(dest='_command_package')
sp.required = True
self.subparsers = {(): sp}
for handler, metadata in command_table.items():
subparser = self._get_subparser(metadata['name'].split())
command_name = metadata['name'].split()[-1]
# To work around http://bugs.python.org/issue9253, we artificially add any new
# parsers we add to the "choices" section of the subparser.
subparser.choices[command_name] = command_name
command_parser = subparser.add_parser(command_name,
description=metadata.get('description'),
parents=self.parents, conflict_handler='resolve')
for arg in metadata['arguments']:
names = arg.pop('name').split()
command_parser.add_argument(*names, **arg)
command_parser.set_defaults(func=handler)
def _get_subparser(self, path):
"""For each part of the path, walk down the tree of
subparsers, creating new ones if one doesn't already exist.
"""
for length in range(0, len(path)):
parent_subparser = self.subparsers.get(tuple(path[0:length]), None)
if not parent_subparser:
# No subparser exists for the given subpath - create and register
# a new subparser.
# Since we know that we always have a root subparser (we created)
# one when we started loading the command table, and we walk the
# path from left to right (i.e. for "cmd subcmd1 subcmd2", we start
# with ensuring that a subparser for cmd exists, then for subcmd1,
# subcmd2 and so on), we know we can always back up one step and
# add a subparser if one doesn't exist
grandparent_subparser = self.subparsers[tuple(path[0:length - 1])]
new_parser = grandparent_subparser.add_parser(path[length - 1])
# Due to http://bugs.python.org/issue9253, we have to give the subparser
# a destination and set it to required in order to get a meaningful error
parent_subparser = new_parser.add_subparsers(dest='subcommand')
parent_subparser.required = True
self.subparsers[tuple(path[0:length])] = parent_subparser
return parent_subparser

Просмотреть файл

@ -232,6 +232,29 @@ Primary Endpoints :
Tags :
None
Account Type : Standard_LRS
Creation Time : 2016-03-25T20:48:28.341265+00:00
Custom Domain : None
Id : /subscriptions/0b1f6471-1bf0-4dda-aec3-cb9272f09590/resourceGroups/ecvm1458938841925rg/providers/Microsoft.Storage/storageAccounts/ecvm1458938841925sa
Last Geo Failover Time : None
Location : southeastasia
Name : ecvm1458938841925sa
Primary Location : southeastasia
Provisioning State : Succeeded
Resource Group : ecvm1458938841925rg
Secondary Endpoints : None
Secondary Location : None
Status Of Primary : Available
Status Of Secondary : None
Type : Microsoft.Storage/storageAccounts
Primary Endpoints :
Blob : https://ecvm1458938841925sa.blob.core.windows.net/
File : https://ecvm1458938841925sa.file.core.windows.net/
Queue : https://ecvm1458938841925sa.queue.core.windows.net/
Table : https://ecvm1458938841925sa.table.core.windows.net/
Tags :
None
Account Type : Standard_RAGRS
Creation Time : 2016-03-05T00:25:04.704573+00:00
Custom Domain : None
@ -376,29 +399,6 @@ Tags :
None
"""
},
{
'test_name': 'test_storage_account_usage',
'command': 'storage account usage',
'expected_result': """Current Value : 16
Limit : 100
Unit : Count
Name :
Localized Value : Storage Accounts
Value : StorageAccounts
"""
},
{
'test_name': 'test_storage_account_check',
'command': 'storage account check-name-availability --name travisteststorageaccount',
'expected_result': """Message : None
Name Available : True
Reason : None
"""
}
]

Просмотреть файл

@ -37,24 +37,26 @@ interactions:
sfcefj2N/eDhg50D+ooGejNju7bU0YcyNmvsnU8f3aec3gHlPh4+fD/GVmTel7HNa/g2+hpz7vB7
/HX0xZYZYvBF/jryIvGNDtrjTTsBROifr4w9Ixpky7fVdku/7L4Xb5tXd/eQ1NjA2GFD6uJmrt6Q
/SCufvBm9+Ej+h9lPw4oVbv/6f4tudrD5NYs7b+Dr6LvMMMOvMTfRd9qmQnib/F3kbeIUXSgHjNa
ihNxN3MyfQZ2jDErQf45Zsb6/RiQ/r7IwBQ0MI/9/LUl5UC/JfVwM/+9Ot6gVyn/dv/Nzs6jvfuP
dvbHDyj/9uDe3i050GJya/5zb+CL6BvMYdFX+JvoOy3Peewd/ibyDnGFDtDjPJ/YRNnNzGd04K0I
tG2bD5OKh3fDy9wm+naLgd7wNreJvE3EsI08cohV2CPgmyTNfftzbxbikkhzSlMJm2C+eP+8eQzG
vZ3dfRq+J61WdamobniLOr9ZcjdYjnvbu/fZH6L/7Y0fPDzY3/n0tnI7hNYwb0JabwMA7aIAmHFv
A4EbRkEw994GBDeMgCA2V+J4TG6njCaEwNGHQwJPn4HLYzJAkNXafLTILoopfUkd0EtXVdXSHzP8
QCOZ3xvZjdp+c4bo3arMWpCJFoxm1fRtXn/6YOf+e3E/g2haefsBLajT0Dy2F0VBnynbx5pTdx/E
73vbew/f7O492qPA9uGYBrtz/8HOLfm9h8+tGb3/JhpE32TG3fgqt4i+2zIXbXqXW0TeJaZSAng8
beeDiP4N8DR+k4m6kVPorZ9Nxt17eP/9vCoGYQh5cH9nlwZ9W8bV5tTdBzLu7sGb3U8f7dx/dJ9y
6J/ef7D76W0D1x4+X49x+U00iL7JbLnxVW4RfbdlVtn0LreIvEtMpQT4/zPj5u/afIlx3H+4/37r
PgxDP3xHyZo9KNGbWTdsTx1+IO+S0t05AO/u7Y13Dx7QkG/rZPQRel/m9V9Fi+irzJub3+Um0Zdb
ZpiNL3OTyMvEW0qEnxf8+3Dn4Gu5DPzhu3b/03tIl3j8Sx22c9Ara4qMvoqysb5G/X4QG1OWZRe+
8r2H8JX3sSKy8/XYGAh9TTbmV9Ei+irz6OZ3uUn05Zb5ZuPL3CTyMrGYEsFj4+7cEP1/2Nz8/V+S
/D/tStZTzS0AAA==
ihNxN3MyfQZ2jDErQf45ZsZ8ermgPM7Bw3sHB/u7D/fu1xfvxZG995uMxuxxJvXWziG5WVPgK2XQ
+HvU8wcx6r3tvftv9sipOHi0dzC+t7+79+n926bpIhjdmmFj76JJ9F1mzhte5jbRt1tmnM1vc5vI
28RkSgiPkbsTRHPw/2F+rt9PodLfFxl4hwbmMa2/VqoM67ekHm5m01fHQ34CM+rO/Tc7O4/27j/a
2R8/oHzyg3t7t2RUi8mt2dO9gS+ibzDDRV/hb6LvtDznsXf4m8g7xBU6QI8BfWITZTczn7HptyLQ
tm0+TCoe3g0vc5vo2y0GesPb3CbyNhHDNvLIAVFcN3sEfJOkuW//3+rm0JzSVMLHMV+8/zpQDMa9
nd19Gr4nrdYUq6hueIs6v1lyNxqYXTIw5N/T//bGDx4e7O98elu5HUJrmDchrbcBgHZRAMy4t4HA
DaMgmHtvA4IbRkAQ/ylxPCa3U0YTQuDowyGBp8/A5TEZIMjK4x8tsotiSl9SB/TSVVW19McMP9BI
5vdGdqO23xz7v1uVWQsy0QLorJq+zetPH+zcfy/uZxBNK28/uPfgIQ3NY3tRFPSZsn2sOXX3Qfy+
t7338M3u3iP4VA/HNNid+w92bsnvPXxuzej9N9Eg+iYz7sZXuUX03Za5aNO73CLyLjGVEsDjaTsf
RPRvgKfxm0zUjZxCb/1sMu7ew/vv51UxCEPIg/s7uzTo2zKuNqfuPpBxdw/e7H76aOf+o/u0JvTp
/Qe7n942EdPD5+sxLr+JBtE3mS03vsotou+2zCqb3uUWkXeJqZQA/39m3Pxdmy8xjvsP999vHZNh
6IfvKPm4ByV6M+uG7anDD+RdUro7B+Ddvb3x7sEDGvJtnYw+Qu/LvP6raBF9lXlz87vcJPpyywyz
8WVuEnmZeEuJ8POCfx/uHHwtl4E/fNfuf3oP6T+Pf6nDINiPsrG+Rv1+EBtT1nAXvvK9h/CV97HC
t/P12BgIfU025lfRIvoq8+jmd7lJ9OWW+Wbjy9wk8jKxmBLBY+Pu3BD9f9jc/P1fkvw/03tlHp0w
AAA=
headers:
Cache-Control: [no-cache]
Content-Encoding: [gzip]
Content-Type: [application/json]
Date: ['Thu, 24 Mar 2016 21:57:15 GMT']
Date: ['Tue, 29 Mar 2016 18:49:12 GMT']
Expires: ['-1']
Pragma: [no-cache]
Server: [Microsoft-Azure-Storage-Resource-Provider/1.0, Microsoft-HTTPAPI/2.0]

Просмотреть файл

@ -1,33 +0,0 @@
interactions:
- request:
body: null
headers:
Accept: ['*/*']
Accept-Encoding: ['gzip, deflate']
Connection: [keep-alive]
Content-Type: [application/json; charset=utf-8]
User-Agent: [python/3.5.1 requests/2.9.1 msrest/0.1.3 msrest_azure/0.1.2 storagemanagementclient/2015-06-15
Azure-SDK-For-Python AZURECLI_0.0.1]
accept-language: [en-US]
method: GET
uri: https://management.azure.com/subscriptions/00000000-0000-0000-0000-000000000000/providers/Microsoft.Storage/usages?api-version=2015-06-15
response:
body:
string: !!binary |
H4sIAAAAAAAEAO29B2AcSZYlJi9tynt/SvVK1+B0oQiAYBMk2JBAEOzBiM3mkuwdaUcjKasqgcpl
VmVdZhZAzO2dvPfee++999577733ujudTif33/8/XGZkAWz2zkrayZ4hgKrIHz9+fB8/In7xb5yk
6UeXWbnOP3qUfg9/pSl/iOej9bJo6fOPTqr1sv1oZD+frus6X7Y/qe/tfuq+KosFv7O7s+M+XGYL
tLOA6SPT50ev26rOLvLj6RSdNK4XalRW06wsfpDPTE+mdWqbm9a/RH7hH9//jZNf8v8A3pREK9oA
AAA=
headers:
Cache-Control: [no-cache]
Content-Encoding: [gzip]
Content-Type: [application/json]
Date: ['Thu, 24 Mar 2016 21:57:20 GMT']
Expires: ['-1']
Pragma: [no-cache]
Server: [Microsoft-Azure-Storage-Resource-Provider/1.0, Microsoft-HTTPAPI/2.0]
Strict-Transport-Security: [max-age=31536000; includeSubDomains]
Vary: [Accept-Encoding]
status: {code: 200, message: OK}
version: 1

Просмотреть файл

@ -0,0 +1,80 @@
import unittest
from six import StringIO
from collections import namedtuple
from azure.cli.parser import AzCliCommandParser
from azure.cli.extensions.transform import _parse_id, _add_resource_group, _resource_group_transform
class TestResourceGroupTransform(unittest.TestCase):
@classmethod
def setUpClass(cls):
pass
@classmethod
def tearDownClass(cls):
pass
def setUp(self):
self.io = StringIO()
def tearDown(self):
self.io.close()
CORRECT_ID = "/subscriptions/00000000-0000-0000-0000-0000000000000/resourceGroups/REsourceGROUPname/providers/Microsoft.Compute/virtualMachines/vMName" # pylint: disable=line-too-long
NON_RG_ID = "/subscriptions/00000000-0000-0000-0000-0000000000000/somethingElse/REsourceGROUPname/providers/Microsoft.Compute/virtualMachines/vMName" # pylint: disable=line-too-long
BOGUS_ID = "|completely-bogus-id|"
def test_split_correct_id(self):
result = _parse_id(TestResourceGroupTransform.CORRECT_ID)
self.assertDictEqual(result, {
'resource-group': 'REsourceGROUPname',
'name': 'vMName'
})
def test_split_non_resourcegroup_id(self):
with self.assertRaises(KeyError):
_parse_id(TestResourceGroupTransform.NON_RG_ID)
def test_split_bogus_resourcegroup_id(self):
with self.assertRaises(IndexError):
_parse_id(TestResourceGroupTransform.BOGUS_ID)
def test_add_valid_resourcegroup_id(self):
instance = {
'id': TestResourceGroupTransform.CORRECT_ID,
'name': 'A name'
}
_add_resource_group(instance)
self.assertDictEqual(instance, {
'id': TestResourceGroupTransform.CORRECT_ID,
'resourceGroup': 'REsourceGROUPname',
'name': 'A name'
})
def test_dont_add_invalid_resourcegroup_id(self):
instance = {
'id': TestResourceGroupTransform.BOGUS_ID,
'name': 'A name'
}
_add_resource_group(instance)
self.assertDictEqual(instance, {
'id': TestResourceGroupTransform.BOGUS_ID,
'name': 'A name'
})
def test_dont_stomp_on_existing_resourcegroup_id(self):
instance = {
'id': TestResourceGroupTransform.CORRECT_ID,
'resourceGroup': 'SomethingElse',
'name': 'A name'
}
_add_resource_group(instance)
self.assertDictEqual(instance, {
'id': TestResourceGroupTransform.CORRECT_ID,
'resourceGroup': 'SomethingElse',
'name': 'A name'
})
if __name__ == '__main__':
unittest.main()

Просмотреть файл

@ -0,0 +1,96 @@
import unittest
from six import StringIO
from collections import namedtuple
from azure.cli.application import Application, Configuration
class TestApplication(unittest.TestCase):
@classmethod
def setUpClass(cls):
pass
@classmethod
def tearDownClass(cls):
pass
def setUp(self):
self.io = StringIO()
def tearDown(self):
self.io.close()
def test_application_todict_none(self):
input = None
actual = Application.todict(input)
expected = None
self.assertEqual(actual, expected)
def test_application_todict_dict_empty(self):
input = {}
actual = Application.todict(input)
expected = {}
self.assertEqual(actual, expected)
def test_application_todict_dict(self):
input = {'a': 'b'}
actual = Application.todict(input)
expected = {'a': 'b'}
self.assertEqual(actual, expected)
def test_application_todict_list(self):
input = [{'a': 'b'}]
actual = Application.todict(input)
expected = [{'a': 'b'}]
self.assertEqual(actual, expected)
def test_application_todict_list(self):
input = [{'a': 'b'}]
actual = Application.todict(input)
expected = [{'a': 'b'}]
self.assertEqual(actual, expected)
def test_application_todict_obj(self):
MyObject = namedtuple('MyObject', 'a b')
input = MyObject('x', 'y')
actual = Application.todict(input)
expected = {'a': 'x', 'b': 'y'}
self.assertEqual(actual, expected)
def test_application_todict_dict_with_obj(self):
MyObject = namedtuple('MyObject', 'a b')
mo = MyObject('x', 'y')
input = {'a': mo}
actual = Application.todict(input)
expected = {'a': {'a': 'x', 'b': 'y'}}
self.assertEqual(actual, expected)
def test_application_register_and_call_handlers(self):
handler_called = [False]
def handler(args):
args[0] = True
def other_handler(args):
self.assertEqual(args, 'secret sauce')
config = Configuration([])
app = Application(config)
app.raise_event('was_handler_called', handler_called)
self.assertFalse(handler_called[0], "Raising event with no handlers registered somehow failed...")
app.register('was_handler_called', handler)
self.assertFalse(handler_called[0])
# Registered handler won't get called if event with different name
# is raised...
app.raise_event('other_handler_called', handler_called)
self.assertFalse(handler_called[0], 'Wrong handler called!')
app.raise_event('was_handler_called', handler_called)
self.assertTrue(handler_called[0], "Handler didn't get called")
app.raise_event('other_handler_called', 'secret sauce')
if __name__ == '__main__':
unittest.main()

Просмотреть файл

@ -1,217 +0,0 @@
import unittest
from six import StringIO
from azure.cli._argparse import ArgumentParser, IncorrectUsageError
from azure.cli._logging import logger
import logging
import azure.cli._util as util
class Test_argparse(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Ensure initialization has occurred correctly
import azure.cli.main
logging.basicConfig(level=logging.DEBUG)
@classmethod
def tearDownClass(cls):
logging.shutdown()
def test_nouns(self):
p = ArgumentParser('test')
res = [False, False, False]
def set_n1(a, b): res[0] = True
def set_n2(a, b): res[1] = True
def set_n3(a, b): res[2] = True
p.add_command(set_n1, 'n1')
p.add_command(set_n2, 'n1 n2')
p.add_command(set_n3, 'n1 n2 n3')
p.execute('n1 n2 n3'.split())
self.assertSequenceEqual(res, (False, False, True))
p.execute('n1'.split())
self.assertSequenceEqual(res, (True, False, True))
res[0] = False
p.execute('n1 n2'.split())
self.assertSequenceEqual(res, (False, True, True))
def test_args(self):
p = ArgumentParser('test')
p.add_command(lambda a, b: (a, b),
'n1',
accepts_unexpected_args=True,
args=[('--arg -a', '', False, None), ('-b <v>', '', False, None)])
cmd_result = p.execute('n1 -a x'.split())
res, other = cmd_result.result
self.assertTrue(res.arg)
self.assertSequenceEqual(res.positional, ['x'])
# Should recognize args with alternate prefix
cmd_result = p.execute('n1 /a'.split())
res, other = cmd_result.result
self.assertTrue(res.arg)
cmd_result = p.execute('n1 /arg'.split())
res, other = cmd_result.result
self.assertTrue(res.arg)
# Should not recognize "------a"
cmd_result = p.execute('n1 ------a'.split())
res, other = cmd_result.result
self.assertNotIn('arg', res)
# First two '--' match, so '----a' is added to dict
self.assertIn('----a', other)
cmd_result = p.execute('n1 -a:x'.split())
res = cmd_result.result
self.assertIsNone(res)
cmd_result = p.execute('n1 -b -a x'.split())
res, other = cmd_result.result
self.assertEqual(res.b, '-a')
self.assertSequenceEqual(res.positional, ['x'])
self.assertRaises(IncorrectUsageError, lambda: res.arg)
cmd_result = p.execute('n1 -b:-a x'.split())
res, other = cmd_result.result
self.assertEqual(res.b, '-a')
self.assertSequenceEqual(res.positional, ['x'])
self.assertRaises(IncorrectUsageError, lambda: res.arg)
def test_unexpected_args(self):
p = ArgumentParser('test')
p.add_command(lambda a, b: (a, b),
'n1',
accepts_unexpected_args=True,
args=[('-a', '', False, None)])
cmd_result = p.execute('n1 -b=2'.split())
res, other = cmd_result.result
self.assertFalse(res)
self.assertEqual('2', other.b)
cmd_result = p.execute('n1 -b.c.d=2'.split())
res, other = cmd_result.result
self.assertFalse(res)
self.assertEqual('2', other.b.c.d)
cmd_result = p.execute('n1 -b.c.d 2 -b.c.e:3'.split())
res, other = cmd_result.result
self.assertFalse(res)
self.assertEqual('2', other.b.c.d)
self.assertEqual('3', other.b.c.e)
def test_required_args(self):
p = ArgumentParser('test')
p.add_command(lambda a, b: (a, b),
'n1',
accepts_unexpected_args=True,
args=[('--arg -a', '', True, None), ('-b <v>', '', False, None)])
cmd_result = p.execute('n1 -a x'.split())
res, other = cmd_result.result
self.assertTrue(res.arg)
self.assertSequenceEqual(res.positional, ['x'])
io = StringIO()
cmd_result = p.execute('n1 -b x'.split(), out=io)
self.assertIsNone(cmd_result.result)
self.assertTrue(io.getvalue().startswith("Missing required argument: --arg/-a"))
io.close()
def test_specify_output_format(self):
p = ArgumentParser('test')
p.add_command(lambda a, b: (a, b),
'n1',
accepts_unexpected_args=True,
args=[('--arg -a', '', True, None), ('-b <v>', '', False, None)])
cmd_result = p.execute('n1 -a x'.split())
self.assertEqual(cmd_result.output_format, None)
cmd_result = p.execute('n1 -a x --output json'.split())
self.assertEqual(cmd_result.output_format, 'json')
cmd_result = p.execute('n1 -a x --output table'.split())
self.assertEqual(cmd_result.output_format, 'table')
cmd_result = p.execute('n1 -a x --output text'.split())
self.assertEqual(cmd_result.output_format, 'text')
# Invalid format
io = StringIO()
cmd_res = p.execute('n1 -a x --output unknown'.split(), out=io)
self.assertIsNone(cmd_res.output_format)
self.assertTrue(io.getvalue().startswith("Invalid output format 'unknown'"))
io.close()
# Invalid format
cmd_result = p.execute('n1 -a x --output'.split())
self.assertEqual(cmd_result.output_format, None)
def test_args_completion(self):
p = ArgumentParser('test')
p.add_command(lambda a, b: (a, b), 'n1', args=[('--arg -a', '', True, None), ('-b <v>', '', False, None)])
# Can't use "with StringIO() as ...", as Python2/StringIO doesn't have __exit__.
io = StringIO()
p.execute('n1 - --complete'.split(),
show_usage=False,
show_completions=True,
out=io)
candidates = util.normalize_newlines(io.getvalue())
io.close()
self.assertEqual(candidates, '--arg\n-a\n-b\n')
#matching '--arg for '--a'
io=StringIO()
p.execute('n1 --a --complete'.split(),
show_usage=False,
out=io)
candidates = util.normalize_newlines(io.getvalue())
io.close()
self.assertEqual(candidates, '--arg\n')
#matching 'n1' for 'n'
io = StringIO()
p.execute('n --complete'.split(),
show_usage=False,
show_completions=True,
out=io)
candidates = util.normalize_newlines(io.getvalue())
io.close()
self.assertEqual(True, candidates.endswith('n1\n'))
#if --arg is used, then both '-a' and "--arg" should not be in the
#candidate list
io = StringIO()
p.execute('n1 --arg hello - --complete'.split(),
show_usage=False,
show_completions=True,
out=io)
candidates = util.normalize_newlines(io.getvalue())
io.close()
self.assertEqual(candidates, '-b\n')
#if all argument are used, candidate list is empty
io = StringIO()
p.execute('n1 -a -b --complete'.split(),
show_usage=False,
show_completions=True,
out=io)
candidates = util.normalize_newlines(io.getvalue())
io.close()
self.assertEqual(candidates, '\n')
#if at parameter value level, get nothing for N.Y.I.
io = StringIO()
p.execute('n1 -a --complete'.split(),
show_usage=False,
show_completions=True,
out=io)
candidates = util.normalize_newlines(io.getvalue())
io.close()
self.assertEqual(candidates, '\n')
if __name__ == '__main__':
unittest.main()

Просмотреть файл

@ -1,10 +1,9 @@
import logging
import unittest
from azure.cli.commands._auto_command import (_decorate_command,
_decorate_option)
from azure.cli.commands import _COMMANDS
from azure.cli.commands._auto_command import build_operation
from azure.cli.commands import CommandTable
from azure.cli.commands._auto_command import AutoCommandDefinition
class Test_autocommand(unittest.TestCase):
@classmethod
@ -17,63 +16,127 @@ class Test_autocommand(unittest.TestCase):
def tearDownClass(cls):
logging.shutdown()
def test_raw_register_command(self):
command_name = 'da command'
def testfunc():
return testfunc
def sample_vm_get(self, resource_group_name, vm_name, expand=None, custom_headers={}, raw=False, **operation_config):
"""
The operation to get a virtual machine.
# Run test code
_decorate_command(command_name, testfunc)
:param resource_group_name: The name of the resource group.
:type resource_group_name: str
:param vm_name: The name of the virtual machine.
:type vm_name: str
:param expand: The expand expression to apply on the operation.
:type expand: str
:param dict custom_headers: headers that will be added to the request
:param boolean raw: returns the direct response alongside the
deserialized response
:rtype: VirtualMachine
:rtype: msrest.pipeline.ClientRawResponse if raw=True
"""
# Verify
registered_command = _COMMANDS.get(testfunc, None)
self.assertIsNotNone(registered_command)
self.assertFalse('args' in registered_command.keys())
self.assertEqual(registered_command['name'], command_name)
def test_autocommand_basic(self):
command_table = CommandTable()
build_operation("test autocommand",
"",
None,
[
AutoCommandDefinition(Test_autocommand.sample_vm_get, None)
],
command_table)
def test_raw_register_command_with_one_option(self):
command_name = 'da command with one arg'
def testfunc():
return testfunc
self.assertEqual(len(command_table), 1, 'We expect exactly one command in the command table')
command_metadata = list(command_table.values())[0]
self.assertEqual(command_metadata['name'], 'test autocommand sample-vm-get', 'Unexpected command name...')
self.assertEqual(len(command_metadata['arguments']), 3, 'We expected exactly 3 arguments')
some_expected_arguments = [
{'name': '--resourcegroup -g', 'dest': 'resource_group_name', 'required': True},
{'name': '--vm-name', 'dest': 'vm_name', 'required': True},
]
# Run test code
func = _decorate_command(command_name, testfunc)
spec = '--tre <tre>'
desc = 'Kronor'
func = _decorate_option(spec, desc, None, func)
for probe in some_expected_arguments:
existing = [arg for arg in command_metadata['arguments'] if arg['name'] == probe['name']][0]
self.assertDictContainsSubset(probe, existing)
# Verify
registered_command = _COMMANDS.get(testfunc, None)
self.assertIsNotNone(registered_command)
self.assertEqual(registered_command['name'], command_name)
self.assertEqual(len(registered_command['args']), 1)
self.assertEqual(registered_command['args'][0], (spec, desc, False, None))
def test_autocommand_with_parameter_alias(self):
command_table = CommandTable()
VM_SPECIFIC_PARAMS= {
'vm_name': {
'name': '--wonky-name -n',
'metavar': 'VMNAME',
'help': 'Completely WONKY name...',
'required': False
}
}
build_operation("test autocommand",
"",
None,
[
AutoCommandDefinition(Test_autocommand.sample_vm_get, None)
],
command_table,
VM_SPECIFIC_PARAMS
)
def test_load_test_commands(self):
import sys
from azure.cli._argparse import ArgumentParser
from azure.cli.commands import add_to_parser
self.assertEqual(len(command_table), 1, 'We expect exactly one command in the command table')
command_metadata = list(command_table.values())[0]
self.assertEqual(command_metadata['name'], 'test autocommand sample-vm-get', 'Unexpected command name...')
self.assertEqual(len(command_metadata['arguments']), 3, 'We expected exactly 3 arguments')
some_expected_arguments = [
{'name': '--resourcegroup -g', 'dest': 'resource_group_name', 'required': True},
{'name': '--wonky-name -n', 'dest': 'vm_name', 'required': False},
]
# sneaky trick to avoid loading any command modules...
sys.modules['azure.cli.commands.test'] = sys
for probe in some_expected_arguments:
existing = [arg for arg in command_metadata['arguments'] if arg['name'] == probe['name']][0]
self.assertDictContainsSubset(probe, existing)
command_name = 'da command with one arg and unexpected with target'
def testfunc(args, _):
# Check that the argument passing actually works...
self.assertEqual(args['alternatetarget'], 'wombat')
return testfunc
def test_autocommand_with_extra_parameters(self):
command_table = CommandTable()
NEW_PARAMETERS= {
'new-param': {
'name': '--added-param',
'metavar': 'ADDED',
'help': 'Just added this right now!',
'required': True
}
}
build_operation("test autocommand",
"",
None,
[
AutoCommandDefinition(Test_autocommand.sample_vm_get, None)
],
command_table,
None, NEW_PARAMETERS
)
# Run test code
func = _decorate_command(command_name, testfunc)
spec = '--tre <tre>'
desc = 'Kronor'
func = _decorate_option(spec, desc, 'alternatetarget', func)
self.assertEqual(len(command_table), 1, 'We expect exactly one command in the command table')
command_metadata = list(command_table.values())[0]
self.assertEqual(command_metadata['name'], 'test autocommand sample-vm-get', 'Unexpected command name...')
self.assertEqual(len(command_metadata['arguments']), 4, 'We expected exactly 4 arguments')
some_expected_arguments = [
{'name': '--resourcegroup -g', 'dest': 'resource_group_name', 'required': True},
{'name': '--vm-name', 'dest': 'vm_name', 'required': True},
{'name': '--added-param', 'required': True},
]
p = ArgumentParser('automcommandtest')
add_to_parser(p, 'test')
for probe in some_expected_arguments:
existing = [arg for arg in command_metadata['arguments'] if arg['name'] == probe['name']][0]
self.assertDictContainsSubset(probe, existing)
cmd_result = p.execute(command_name.split(' ') + '--tre wombat'.split(' '))
self.assertEqual(cmd_result.result, func)
def test_autocommand_with_command_alias(self):
command_table = CommandTable()
build_operation("test autocommand",
"",
None,
[
AutoCommandDefinition(Test_autocommand.sample_vm_get, None, 'woot')
],
command_table
)
self.assertEqual(len(command_table), 1, 'We expect exactly one command in the command table')
command_metadata = list(command_table.values())[0]
self.assertEqual(command_metadata['name'], 'test autocommand woot', 'Unexpected command name...')
if __name__ == '__main__':
unittest.main()

Просмотреть файл

@ -5,7 +5,7 @@ try:
except ImportError:
from mock import MagicMock
from azure.cli._argparse import ArgumentParser, IncorrectUsageError
from azure.cli.parser import IncorrectUsageError
from azure.cli._logging import logger
import logging
import azure.cli._debug as _debug

Просмотреть файл

@ -1,344 +1,345 @@
import unittest
from six import StringIO
if False:
import unittest
from six import StringIO
from azure.cli._argparse import ArgumentParser, IncorrectUsageError
from azure.cli._logging import logger
from azure.cli.commands import command, description, option
import azure.cli._help_files
import logging
import mock
import azure.cli._util as util
from azure.cli._help import HelpAuthoringException
from azure.cli.parser import ArgumentParser, IncorrectUsageError
from azure.cli._logging import logger
from azure.cli.commands import command, description, option
import azure.cli._help_files
import logging
import mock
import azure.cli._util as util
from azure.cli._help import HelpAuthoringException
class Test_argparse(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Ensure initialization has occurred correctly
import azure.cli.main
logging.basicConfig(level=logging.DEBUG)
class Test_argparse(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Ensure initialization has occurred correctly
import azure.cli.main
logging.basicConfig(level=logging.DEBUG)
@classmethod
def tearDownClass(cls):
logging.shutdown()
@classmethod
def tearDownClass(cls):
logging.shutdown()
def test_help_param(self):
p = ArgumentParser('test')
p.add_command(lambda a, b: (a, b),
'n1',
args=[('--arg -a', '', False, None),
('-b <v>', '', False, None)])
def test_help_param(self):
p = ArgumentParser('test')
p.add_command(lambda a, b: (a, b),
'n1',
args=[('--arg -a', '', False, None),
('-b <v>', '', False, None)])
cmd_result = p.execute('n1 -h'.split())
self.assertIsNone(cmd_result.result)
cmd_result = p.execute('n1 -h'.split())
self.assertIsNone(cmd_result.result)
cmd_result = p.execute('n1 --help'.split())
self.assertIsNone(cmd_result.result)
cmd_result = p.execute('n1 --help'.split())
self.assertIsNone(cmd_result.result)
def test_help_plain_short_description(self):
p = ArgumentParser('test')
p.add_command(lambda a, b: (a, b),
'n1',
'the description',
args=[('--arg -a', '', False, None), ('-b <v>', '', False, None)])
def test_help_plain_short_description(self):
p = ArgumentParser('test')
p.add_command(lambda a, b: (a, b),
'n1',
'the description',
args=[('--arg -a', '', False, None), ('-b <v>', '', False, None)])
io = StringIO()
cmd_result = p.execute('n1 -h'.split(), out=io)
self.assertIsNone(cmd_result.result)
self.assertEqual(True, 'n1: the description' in io.getvalue())
io.close()
io = StringIO()
cmd_result = p.execute('n1 -h'.split(), out=io)
self.assertIsNone(cmd_result.result)
self.assertEqual(True, 'n1: the description' in io.getvalue())
io.close()
def test_help_plain_long_description(self):
p = ArgumentParser('test')
def fn(a, b):
'''
long description
'''
p.add_command(fn,
'n1',
args=[('--arg -a', '', False, None), ('-b <v>', '', False, None)])
def test_help_plain_long_description(self):
p = ArgumentParser('test')
def fn(a, b):
'''
long description
'''
p.add_command(fn,
'n1',
args=[('--arg -a', '', False, None), ('-b <v>', '', False, None)])
io = StringIO()
cmd_result = p.execute('n1 -h'.split(), out=io)
self.assertIsNone(cmd_result.result)
self.assertEqual(True, io.getvalue().startswith('\nCommand\nn1\n long description'))
io.close()
io = StringIO()
cmd_result = p.execute('n1 -h'.split(), out=io)
self.assertIsNone(cmd_result.result)
self.assertEqual(True, io.getvalue().startswith('\nCommand\nn1\n long description'))
io.close()
def test_help_long_description_and_short_description(self):
p = ArgumentParser('test')
def fn(a, b):
'''
long description
'''
p.add_command(fn,
'n1',
'short description',
args=[('--arg -a', '', False, None), ('-b <v>', '', False, None)])
def test_help_long_description_and_short_description(self):
p = ArgumentParser('test')
def fn(a, b):
'''
long description
'''
p.add_command(fn,
'n1',
'short description',
args=[('--arg -a', '', False, None), ('-b <v>', '', False, None)])
io = StringIO()
cmd_result = p.execute('n1 -h'.split(), out=io)
self.assertIsNone(cmd_result.result)
self.assertEqual(True, io.getvalue().startswith('\nCommand\nn1: short description\n long description'))
io.close()
io = StringIO()
cmd_result = p.execute('n1 -h'.split(), out=io)
self.assertIsNone(cmd_result.result)
self.assertEqual(True, io.getvalue().startswith('\nCommand\nn1: short description\n long description'))
io.close()
def test_help_docstring_description_overrides_short_description(self):
p = ArgumentParser('test')
def fn(a, b):
'''
short-summary: docstring summary
'''
p.add_command(fn,
'n1',
'short description',
args=[('--arg -a', '', False, None), ('-b <v>', '', False, None)])
def test_help_docstring_description_overrides_short_description(self):
p = ArgumentParser('test')
def fn(a, b):
'''
short-summary: docstring summary
'''
p.add_command(fn,
'n1',
'short description',
args=[('--arg -a', '', False, None), ('-b <v>', '', False, None)])
io = StringIO()
cmd_result = p.execute('n1 -h'.split(), out=io)
self.assertIsNone(cmd_result.result)
self.assertEqual(True, 'n1: docstring summary' in io.getvalue())
io.close()
io = StringIO()
cmd_result = p.execute('n1 -h'.split(), out=io)
self.assertIsNone(cmd_result.result)
self.assertEqual(True, 'n1: docstring summary' in io.getvalue())
io.close()
def test_help_long_description_multi_line(self):
p = ArgumentParser('test')
def fn(a, b):
'''
long-summary: |
line1
line2
'''
p.add_command(fn,
'n1',
args=[('--arg -a', '', False, None), ('-b <v>', '', False, None)])
def test_help_long_description_multi_line(self):
p = ArgumentParser('test')
def fn(a, b):
'''
long-summary: |
line1
line2
'''
p.add_command(fn,
'n1',
args=[('--arg -a', '', False, None), ('-b <v>', '', False, None)])
io = StringIO()
cmd_result = p.execute('n1 -h'.split(), out=io)
self.assertIsNone(cmd_result.result)
print('VALUE: ' + io.getvalue())
io = StringIO()
cmd_result = p.execute('n1 -h'.split(), out=io)
self.assertIsNone(cmd_result.result)
print('VALUE: ' + io.getvalue())
self.assertEqual(True, io.getvalue().startswith('\nCommand\nn1\n line1\n line2'))
io.close()
self.assertEqual(True, io.getvalue().startswith('\nCommand\nn1\n line1\n line2'))
io.close()
def test_help_params_documentations(self):
p = ArgumentParser('test')
def fn(a, b):
'''
parameters:
- name: --foobar/-fb
type: string
required: false
short-summary: one line partial sentence
long-summary: text, markdown, etc.
populator-commands:
- az vm list
- default
- name: --foobar2/-fb2
type: string
required: true
short-summary: one line partial sentence
long-summary: paragraph(s)
'''
p.add_command(fn,
'n1',
args=[('--foobar -fb <v>', 'the foobar', False, None),
('--foobar2 -fb2 <v>', 'the foobar2', True, None),
('--foobar3 -fb3 <v>', 'the foobar3', False, None)])
def test_help_params_documentations(self):
p = ArgumentParser('test')
def fn(a, b):
'''
parameters:
- name: --foobar/-fb
type: string
required: false
short-summary: one line partial sentence
long-summary: text, markdown, etc.
populator-commands:
- az vm list
- default
- name: --foobar2/-fb2
type: string
required: true
short-summary: one line partial sentence
long-summary: paragraph(s)
'''
p.add_command(fn,
'n1',
args=[('--foobar -fb <v>', 'the foobar', False, None),
('--foobar2 -fb2 <v>', 'the foobar2', True, None),
('--foobar3 -fb3 <v>', 'the foobar3', False, None)])
io = StringIO()
cmd_result = p.execute('n1 -h'.split(), out=io)
self.assertIsNone(cmd_result.result)
s = '''
Command
n1
Arguments
--foobar/-fb : one line partial sentence
text, markdown, etc.
Values from: az vm list, default
--foobar2/-fb2 [Required]: one line partial sentence
paragraph(s)
--foobar3/-fb3 : the foobar3
'''
self.assertEqual(s, io.getvalue())
io.close()
def test_help_full_documentations(self):
p = ArgumentParser('test')
def fn(a, b):
'''
short-summary: this module does xyz one-line or so
long-summary: |
this module.... kjsdflkj... klsfkj paragraph1
this module.... kjsdflkj... klsfkj paragraph2
parameters:
- name: --foobar/-fb
type: string
required: false
short-summary: one line partial sentence
long-summary: text, markdown, etc.
populator-commands:
- az vm list
- default
- name: --foobar2/-fb2
type: string
required: true
short-summary: one line partial sentence
long-summary: paragraph(s)
examples:
- name: foo example
text: example details
'''
p.add_command(fn,
'n1',
args=[('--foobar -fb <v>', 'the foobar', False, None),
('--foobar2 -fb2 <v>', 'the foobar2', True, None)])
io = StringIO()
cmd_result = p.execute('n1 -h'.split(), out=io)
self.assertIsNone(cmd_result.result)
s = '''
Command
n1: this module does xyz one-line or so
this module.... kjsdflkj... klsfkj paragraph1
this module.... kjsdflkj... klsfkj paragraph2
Arguments
--foobar/-fb : one line partial sentence
text, markdown, etc.
Values from: az vm list, default
--foobar2/-fb2 [Required]: one line partial sentence
paragraph(s)
Examples
foo example
example details
'''
self.assertEqual(s, io.getvalue())
io.close()
def test_help_mismatched_required_params(self):
p = ArgumentParser('test')
def fn(a, b):
'''
parameters:
- name: --foobar/-fb
type: string
required: false
short-summary: one line partial sentence
long-summary: text, markdown, etc.
populator-commands:
- az vm list
- default
'''
p.add_command(fn,
'n1',
args=[('--foobar -fb <v>', 'the foobar', True, None)])
io = StringIO()
self.assertRaisesRegexp(HelpAuthoringException,
'.*mismatched required True vs\. False, --foobar/-fb.*',
lambda: p.execute('n1 -h'.split(), out=io))
io.close()
def test_help_extra_help_params(self):
p = ArgumentParser('test')
def fn(a, b):
'''
parameters:
- name: --foobar/-fb
type: string
required: false
short-summary: one line partial sentence
long-summary: text, markdown, etc.
populator-commands:
- az vm list
- default
'''
p.add_command(fn,
'n1',
args=[('--foobar2 -fb2 <v>', 'the foobar', True, None)])
io = StringIO()
self.assertRaisesRegexp(HelpAuthoringException,
'.*Extra help param --foobar/-fb.*',
lambda: p.execute('n1 -h'.split(), out=io))
io.close()
def test_help_with_param_specified(self):
p = ArgumentParser('test')
def fn(a, b):
pass
p.add_command(fn,
'n1',
args=[('--arg -a', '', False, None), ('-b <v>', '', False, None)])
io = StringIO()
cmd_result = p.execute('n1 --arg -h'.split(), out=io)
self.assertIsNone(cmd_result.result)
s = '''
Command
n1
Arguments
--arg/-a
-b
'''
self.assertEqual(s, io.getvalue())
io.close()
def test_help_group_children(self):
p = ArgumentParser('test')
def fn(a, b):
pass
p.add_command(fn,
'group1 group2 n1',
args=[('--foobar -fb <v>', 'the foobar', False, None),
('--foobar2 -fb2 <v>', 'the foobar2', True, None)])
p.add_command(fn,
'group1 group3 n1',
args=[('--foobar -fb <v>', 'the foobar', False, None),
('--foobar2 -fb2 <v>', 'the foobar2', True, None)])
io = StringIO()
cmd_result = p.execute('group1'.split(), out=io)
self.assertIsNone(cmd_result.result)
s = 'Group\n group1\n\nSub-Commands\n group2\n group3\n'
self.assertEqual(s, io.getvalue())
io.close()
def test_help_group_help(self):
p = ArgumentParser('test')
def fn(a, b):
pass
p.add_command(fn, 'test_group1 test_group2 n1')
io = StringIO()
cmd_result = p.execute('test_group1 test_group2 --help'.split(), out=io)
self.assertIsNone(cmd_result.result)
s = '''
Group
test_group1 test_group2: this module does xyz one-line or so
this module.... kjsdflkj... klsfkj paragraph1
this module.... kjsdflkj... klsfkj paragraph2
Sub-Commands
io = StringIO()
cmd_result = p.execute('n1 -h'.split(), out=io)
self.assertIsNone(cmd_result.result)
s = '''
Command
n1
Examples
foo example
example details
'''
self.assertEqual(s, io.getvalue())
io.close()
Arguments
--foobar/-fb : one line partial sentence
text, markdown, etc.
Values from: az vm list, default
--foobar2/-fb2 [Required]: one line partial sentence
paragraph(s)
--foobar3/-fb3 : the foobar3
'''
self.assertEqual(s, io.getvalue())
io.close()
def test_help_full_documentations(self):
p = ArgumentParser('test')
def fn(a, b):
'''
short-summary: this module does xyz one-line or so
long-summary: |
this module.... kjsdflkj... klsfkj paragraph1
this module.... kjsdflkj... klsfkj paragraph2
parameters:
- name: --foobar/-fb
type: string
required: false
short-summary: one line partial sentence
long-summary: text, markdown, etc.
populator-commands:
- az vm list
- default
- name: --foobar2/-fb2
type: string
required: true
short-summary: one line partial sentence
long-summary: paragraph(s)
examples:
- name: foo example
text: example details
'''
p.add_command(fn,
'n1',
args=[('--foobar -fb <v>', 'the foobar', False, None),
('--foobar2 -fb2 <v>', 'the foobar2', True, None)])
io = StringIO()
cmd_result = p.execute('n1 -h'.split(), out=io)
self.assertIsNone(cmd_result.result)
s = '''
Command
n1: this module does xyz one-line or so
this module.... kjsdflkj... klsfkj paragraph1
this module.... kjsdflkj... klsfkj paragraph2
Arguments
--foobar/-fb : one line partial sentence
text, markdown, etc.
Values from: az vm list, default
--foobar2/-fb2 [Required]: one line partial sentence
paragraph(s)
Examples
foo example
example details
'''
self.assertEqual(s, io.getvalue())
io.close()
def test_help_mismatched_required_params(self):
p = ArgumentParser('test')
def fn(a, b):
'''
parameters:
- name: --foobar/-fb
type: string
required: false
short-summary: one line partial sentence
long-summary: text, markdown, etc.
populator-commands:
- az vm list
- default
'''
p.add_command(fn,
'n1',
args=[('--foobar -fb <v>', 'the foobar', True, None)])
io = StringIO()
self.assertRaisesRegexp(HelpAuthoringException,
'.*mismatched required True vs\. False, --foobar/-fb.*',
lambda: p.execute('n1 -h'.split(), out=io))
io.close()
def test_help_extra_help_params(self):
p = ArgumentParser('test')
def fn(a, b):
'''
parameters:
- name: --foobar/-fb
type: string
required: false
short-summary: one line partial sentence
long-summary: text, markdown, etc.
populator-commands:
- az vm list
- default
'''
p.add_command(fn,
'n1',
args=[('--foobar2 -fb2 <v>', 'the foobar', True, None)])
io = StringIO()
self.assertRaisesRegexp(HelpAuthoringException,
'.*Extra help param --foobar/-fb.*',
lambda: p.execute('n1 -h'.split(), out=io))
io.close()
def test_help_with_param_specified(self):
p = ArgumentParser('test')
def fn(a, b):
pass
p.add_command(fn,
'n1',
args=[('--arg -a', '', False, None), ('-b <v>', '', False, None)])
io = StringIO()
cmd_result = p.execute('n1 --arg -h'.split(), out=io)
self.assertIsNone(cmd_result.result)
s = '''
Command
n1
Arguments
--arg/-a
-b
'''
self.assertEqual(s, io.getvalue())
io.close()
def test_help_group_children(self):
p = ArgumentParser('test')
def fn(a, b):
pass
p.add_command(fn,
'group1 group2 n1',
args=[('--foobar -fb <v>', 'the foobar', False, None),
('--foobar2 -fb2 <v>', 'the foobar2', True, None)])
p.add_command(fn,
'group1 group3 n1',
args=[('--foobar -fb <v>', 'the foobar', False, None),
('--foobar2 -fb2 <v>', 'the foobar2', True, None)])
io = StringIO()
cmd_result = p.execute('group1'.split(), out=io)
self.assertIsNone(cmd_result.result)
s = 'Group\n group1\n\nSub-Commands\n group2\n group3\n'
self.assertEqual(s, io.getvalue())
io.close()
def test_help_group_help(self):
p = ArgumentParser('test')
def fn(a, b):
pass
p.add_command(fn, 'test_group1 test_group2 n1')
io = StringIO()
cmd_result = p.execute('test_group1 test_group2 --help'.split(), out=io)
self.assertIsNone(cmd_result.result)
s = '''
Group
test_group1 test_group2: this module does xyz one-line or so
this module.... kjsdflkj... klsfkj paragraph1
this module.... kjsdflkj... klsfkj paragraph2
Sub-Commands
n1
Examples
foo example
example details
'''
self.assertEqual(s, io.getvalue())
io.close()
if __name__ == '__main__':
unittest.main()
if __name__ == '__main__':
unittest.main()

Просмотреть файл

@ -5,7 +5,7 @@ from six import StringIO
from collections import namedtuple
from azure.cli._output import (OutputProducer, OutputFormatException, format_json, format_table, format_list, format_text,
ListOutput)
format_tsv, ListOutput)
import azure.cli._util as util
class TestOutput(unittest.TestCase):
@ -37,6 +37,12 @@ class TestOutput(unittest.TestCase):
}
"""))
def test_out_boolean_valid(self):
output_producer = OutputProducer(formatter=format_list, file=self.io)
output_producer.out(True)
self.assertEqual(util.normalize_newlines(self.io.getvalue()),
util.normalize_newlines("""True\n\n\n"""))
def test_out_table_valid(self):
output_producer = OutputProducer(formatter=format_table, file=self.io)
output_producer.out({'active': True, 'id': '0b1f6472'})
@ -149,50 +155,40 @@ Myarray :
lo._get_formatted_key('locationIdState')
self.assertEqual(lo._formatted_keys_cache, {'locationIdState': 'Location Id State'})
def test_out_todict_none(self):
input = None
actual = OutputProducer.todict(input)
expected = None
self.assertEqual(actual, expected)
# TSV output tests
def test_output_format_dict(self):
obj = {}
obj['A'] = 1
obj['B'] = 2
result = format_tsv(obj)
self.assertEquals(result, '1\t2\n')
def test_out_todict_dict_empty(self):
input = {}
actual = OutputProducer.todict(input)
expected = {}
self.assertEqual(actual, expected)
def test_output_format_dict_sort(self):
obj = {}
obj['B'] = 1
obj['A'] = 2
result = format_tsv(obj)
self.assertEquals(result, '2\t1\n')
def test_out_todict_dict(self):
input = {'a': 'b'}
actual = OutputProducer.todict(input)
expected = {'a': 'b'}
self.assertEqual(actual, expected)
def test_output_format_ordereddict_not_sorted(self):
from collections import OrderedDict
obj = OrderedDict()
obj['B'] = 1
obj['A'] = 2
result = format_tsv(obj)
self.assertEquals(result, '1\t2\n')
def test_out_todict_list(self):
input = [{'a': 'b'}]
actual = OutputProducer.todict(input)
expected = [{'a': 'b'}]
self.assertEqual(actual, expected)
def test_output_format_ordereddict_list_not_sorted(self):
from collections import OrderedDict
obj1 = OrderedDict()
obj1['B'] = 1
obj1['A'] = 2
def test_out_todict_list(self):
input = [{'a': 'b'}]
actual = OutputProducer.todict(input)
expected = [{'a': 'b'}]
self.assertEqual(actual, expected)
def test_out_todict_obj(self):
MyObject = namedtuple('MyObject', 'a b')
input = MyObject('x', 'y')
actual = OutputProducer.todict(input)
expected = {'a': 'x', 'b': 'y'}
self.assertEqual(actual, expected)
def test_out_todict_dict_with_obj(self):
MyObject = namedtuple('MyObject', 'a b')
mo = MyObject('x', 'y')
input = {'a': mo}
actual = OutputProducer.todict(input)
expected = {'a': {'a': 'x', 'b': 'y'}}
self.assertEqual(actual, expected)
obj2 = OrderedDict()
obj2['A'] = 3
obj2['B'] = 4
result = format_tsv([obj1, obj2])
self.assertEquals(result, '1\t2\n3\t4\n')
if __name__ == '__main__':
unittest.main()

Просмотреть файл

@ -0,0 +1,109 @@
import unittest
from six import StringIO
from collections import namedtuple
from azure.cli.parser import AzCliCommandParser
class TestParser(unittest.TestCase):
@classmethod
def setUpClass(cls):
pass
@classmethod
def tearDownClass(cls):
pass
def setUp(self):
self.io = StringIO()
def tearDown(self):
self.io.close()
def test_register_simple_commands(self):
def test_handler1(args):
pass
def test_handler2(args):
pass
command_table = {
test_handler1: {
'name': 'command the-name',
'arguments': []
},
test_handler2: {
'name': 'sub-command the-second-name',
'arguments': []
}
}
parser = AzCliCommandParser()
parser.load_command_table(command_table)
args = parser.parse_args('command the-name'.split())
self.assertIs(args.func, test_handler1)
args = parser.parse_args('sub-command the-second-name'.split())
self.assertIs(args.func, test_handler2)
AzCliCommandParser.error = VerifyError(self,)
parser.parse_args('sub-command'.split())
self.assertTrue(AzCliCommandParser.error.called)
def test_required_parameter(self):
def test_handler(args):
pass
command_table = {
test_handler: {
'name': 'test command',
'arguments': [
{'name': '--req', 'required': True}
]
}
}
parser = AzCliCommandParser()
parser.load_command_table(command_table)
args = parser.parse_args('test command --req yep'.split())
self.assertIs(args.func, test_handler)
AzCliCommandParser.error = VerifyError(self)
parser.parse_args('test command'.split())
self.assertTrue(AzCliCommandParser.error.called)
def test_nargs_parameter(self):
def test_handler(args):
pass
command_table = {
test_handler: {
'name': 'test command',
'arguments': [
{'name': '--req', 'required': True, 'nargs': 2}
]
}
}
parser = AzCliCommandParser()
parser.load_command_table(command_table)
args = parser.parse_args('test command --req yep nope'.split())
self.assertIs(args.func, test_handler)
AzCliCommandParser.error = VerifyError(self)
parser.parse_args('test command -req yep'.split())
self.assertTrue(AzCliCommandParser.error.called)
class VerifyError(object):
def __init__(self, test, substr=None):
self.test = test
self.substr= substr
self.called = False
def __call__(self, message):
if self.substr:
self.test.assertTrue(message.find(self.substr) >= 0)
self.called = True
if __name__ == '__main__':
unittest.main()