Tab completion for Alias (#127)
* Tab completion for Alias * Use tab completion table approach * Address PR comments * Change 2.0.31.dev0 * Fix CI error
This commit is contained in:
Родитель
ec53600289
Коммит
eb787963ed
|
@ -3,27 +3,28 @@
|
|||
# Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
# --------------------------------------------------------------------------------------------
|
||||
|
||||
import timeit
|
||||
|
||||
from knack.log import get_logger
|
||||
|
||||
from azure.cli.core import AzCommandsLoader
|
||||
from azure.cli.core.decorators import Completer
|
||||
from azure.cli.core.commands.events import EVENT_INVOKER_PRE_CMD_TBL_TRUNCATE
|
||||
from azext_alias.alias import GLOBAL_ALIAS_PATH, AliasManager
|
||||
from azext_alias.util import get_config_parser, is_alias_create_command, cache_reserved_commands
|
||||
from azext_alias._const import DEBUG_MSG_WITH_TIMING
|
||||
from azext_alias._validators import process_alias_create_namespace
|
||||
from azext_alias import telemetry
|
||||
from azure.cli.core.commands.events import EVENT_INVOKER_PRE_CMD_TBL_TRUNCATE, EVENT_INVOKER_ON_TAB_COMPLETION
|
||||
from azure.cli.command_modules.interactive.events import (
|
||||
EVENT_INTERACTIVE_PRE_COMPLETER_TEXT_PARSING,
|
||||
EVENT_INTERACTIVE_POST_SUB_TREE_CREATE
|
||||
)
|
||||
from azext_alias import _help # pylint: disable=unused-import
|
||||
from azext_alias.hooks import (
|
||||
alias_event_handler,
|
||||
enable_aliases_autocomplete,
|
||||
transform_cur_commands_interactive,
|
||||
enable_aliases_autocomplete_interactive
|
||||
)
|
||||
from azext_alias.util import get_alias_table
|
||||
from azext_alias._validators import process_alias_create_namespace
|
||||
|
||||
logger = get_logger(__name__)
|
||||
"""
|
||||
We don't have access to load_cmd_tbl_func in custom.py (need the entire command table
|
||||
for alias and command validation when the user invokes alias create).
|
||||
This cache saves the entire command table globally so custom.py can have access to it.
|
||||
Alter this cache through cache_reserved_commands(load_cmd_tbl_func) in util.py
|
||||
"""
|
||||
|
||||
# We don't have access to load_cmd_tbl_func in custom.py (need the entire command table
|
||||
# for alias and command validation when the user invokes alias create).
|
||||
# This cache saves the entire command table globally so custom.py can have access to it.
|
||||
# Alter this cache through cache_reserved_commands(load_cmd_tbl_func) in util.py
|
||||
cached_reserved_commands = []
|
||||
|
||||
|
||||
|
@ -35,6 +36,9 @@ class AliasExtCommandLoader(AzCommandsLoader):
|
|||
super(AliasExtCommandLoader, self).__init__(cli_ctx=cli_ctx,
|
||||
custom_command_type=custom_command_type)
|
||||
self.cli_ctx.register_event(EVENT_INVOKER_PRE_CMD_TBL_TRUNCATE, alias_event_handler)
|
||||
self.cli_ctx.register_event(EVENT_INVOKER_ON_TAB_COMPLETION, enable_aliases_autocomplete)
|
||||
self.cli_ctx.register_event(EVENT_INTERACTIVE_PRE_COMPLETER_TEXT_PARSING, transform_cur_commands_interactive)
|
||||
self.cli_ctx.register_event(EVENT_INTERACTIVE_POST_SUB_TREE_CREATE, enable_aliases_autocomplete_interactive)
|
||||
|
||||
def load_command_table(self, _):
|
||||
with self.command_group('alias') as g:
|
||||
|
@ -59,41 +63,7 @@ def get_alias_completer(cmd, prefix, namespace, **kwargs): # pylint: disable=un
|
|||
"""
|
||||
An argument completer for alias name.
|
||||
"""
|
||||
try:
|
||||
alias_table = get_config_parser()
|
||||
alias_table.read(GLOBAL_ALIAS_PATH)
|
||||
return alias_table.sections()
|
||||
except Exception: # pylint: disable=broad-except
|
||||
return []
|
||||
|
||||
|
||||
def alias_event_handler(_, **kwargs):
|
||||
"""
|
||||
An event handler for alias transformation when EVENT_INVOKER_PRE_TRUNCATE_CMD_TBL event is invoked
|
||||
"""
|
||||
try:
|
||||
telemetry.start()
|
||||
|
||||
start_time = timeit.default_timer()
|
||||
args = kwargs.get('args')
|
||||
alias_manager = AliasManager(**kwargs)
|
||||
|
||||
# [:] will keep the reference of the original args
|
||||
args[:] = alias_manager.transform(args)
|
||||
|
||||
if is_alias_create_command(args):
|
||||
load_cmd_tbl_func = kwargs.get('load_cmd_tbl_func', lambda _: {})
|
||||
cache_reserved_commands(load_cmd_tbl_func)
|
||||
|
||||
elapsed_time = (timeit.default_timer() - start_time) * 1000
|
||||
logger.debug(DEBUG_MSG_WITH_TIMING, args, elapsed_time)
|
||||
|
||||
telemetry.set_execution_time(round(elapsed_time, 2))
|
||||
except Exception as client_exception: # pylint: disable=broad-except
|
||||
telemetry.set_exception(client_exception)
|
||||
raise
|
||||
finally:
|
||||
telemetry.conclude()
|
||||
return get_alias_table().sections()
|
||||
|
||||
|
||||
COMMAND_LOADER_CLS = AliasExtCommandLoader
|
||||
|
|
|
@ -3,12 +3,16 @@
|
|||
# Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
# --------------------------------------------------------------------------------------------
|
||||
|
||||
import os
|
||||
|
||||
from azure.cli.core._environment import get_config_dir
|
||||
|
||||
GLOBAL_CONFIG_DIR = get_config_dir()
|
||||
ALIAS_FILE_NAME = 'alias'
|
||||
ALIAS_HASH_FILE_NAME = 'alias.sha1'
|
||||
COLLIDED_ALIAS_FILE_NAME = 'collided_alias'
|
||||
ALIAS_TAB_COMP_TABLE_FILE_NAME = 'alias_tab_completion'
|
||||
GLOBAL_ALIAS_TAB_COMP_TABLE_PATH = os.path.join(GLOBAL_CONFIG_DIR, ALIAS_TAB_COMP_TABLE_FILE_NAME)
|
||||
COLLISION_CHECK_LEVEL_DEPTH = 5
|
||||
|
||||
INSUFFICIENT_POS_ARG_ERROR = 'alias: "{}" takes exactly {} positional argument{} ({} given)'
|
||||
|
|
|
@ -16,23 +16,26 @@ helps['alias create'] = """
|
|||
type: command
|
||||
short-summary: Create an alias.
|
||||
examples:
|
||||
- name: Create a simple alias.
|
||||
text: >
|
||||
az alias create --name rg --command group\n
|
||||
- name: Create simple alias commands.
|
||||
text: |
|
||||
az alias create --name rg --command group
|
||||
|
||||
az alias create --name ls --command list
|
||||
- name: Create a complex alias.
|
||||
text: >
|
||||
text: |
|
||||
az alias create --name list-vm --command 'vm list --resource-group myResourceGroup'
|
||||
|
||||
- name: Create an alias with positional arguments.
|
||||
text: >
|
||||
az alias create --name 'list-vm {{ resource_group }}' --command 'vm list --resource-group {{ resource_group }}'
|
||||
- name: Create an alias command with arguments.
|
||||
text: |
|
||||
az alias create --name 'list-vm {{ resource_group }}' \\
|
||||
--command 'vm list --resource-group {{ resource_group }}'
|
||||
|
||||
- name: Create an alias with positional arguments and additional string processing.
|
||||
text: >
|
||||
az alias create --name 'storage-ls {{ url }}' --command 'storage blob list \n
|
||||
--account-name {{ url.replace("https://", "").split(".")[0] }}\n
|
||||
--container-name {{ url.replace("https://", "").split("/")[1] }}'
|
||||
- name: Process arguments using Jinja2 templates.
|
||||
text: |
|
||||
az alias create --name 'storage-ls {{ url }}' \\
|
||||
--command 'storage blob list
|
||||
--account-name {{ url.replace("https://", "").split(".")[0] }}
|
||||
--container-name {{ url.replace("https://", "").split("/")[1] }}'
|
||||
"""
|
||||
|
||||
|
||||
|
|
|
@ -107,13 +107,13 @@ def _validate_alias_command_level(alias, command):
|
|||
alias: The name of the alias.
|
||||
command: The command that the alias points to.
|
||||
"""
|
||||
alias_collision_table = AliasManager.build_collision_table([alias], azext_alias.cached_reserved_commands)
|
||||
alias_collision_table = AliasManager.build_collision_table([alias])
|
||||
|
||||
# Alias is not a reserved command, so it can point to any command
|
||||
if not alias_collision_table:
|
||||
return
|
||||
|
||||
command_collision_table = AliasManager.build_collision_table([command], azext_alias.cached_reserved_commands)
|
||||
command_collision_table = AliasManager.build_collision_table([command])
|
||||
alias_collision_levels = alias_collision_table.get(alias.split()[0], [])
|
||||
command_collision_levels = command_collision_table.get(command.split()[0], [])
|
||||
|
||||
|
|
|
@ -25,7 +25,12 @@ from azext_alias._const import (
|
|||
POS_ARG_DEBUG_MSG
|
||||
)
|
||||
from azext_alias.argument import build_pos_args_table, render_template
|
||||
from azext_alias.util import is_alias_create_command, cache_reserved_commands, get_config_parser
|
||||
from azext_alias.util import (
|
||||
is_alias_create_command,
|
||||
cache_reserved_commands,
|
||||
get_config_parser,
|
||||
build_tab_completion_table
|
||||
)
|
||||
|
||||
|
||||
GLOBAL_ALIAS_PATH = os.path.join(GLOBAL_CONFIG_DIR, ALIAS_FILE_NAME)
|
||||
|
@ -121,8 +126,8 @@ class AliasManager(object):
|
|||
# Only load the entire command table if it detects changes in the alias config
|
||||
if self.detect_alias_config_change():
|
||||
self.load_full_command_table()
|
||||
self.collided_alias = AliasManager.build_collision_table(self.alias_table.sections(),
|
||||
azext_alias.cached_reserved_commands)
|
||||
self.collided_alias = AliasManager.build_collision_table(self.alias_table.sections())
|
||||
build_tab_completion_table(self.alias_table)
|
||||
else:
|
||||
self.load_collided_alias()
|
||||
|
||||
|
@ -220,7 +225,7 @@ class AliasManager(object):
|
|||
return not self.alias_table.sections() and self.alias_config_str
|
||||
|
||||
@staticmethod
|
||||
def build_collision_table(aliases, reserved_commands, levels=COLLISION_CHECK_LEVEL_DEPTH):
|
||||
def build_collision_table(aliases, levels=COLLISION_CHECK_LEVEL_DEPTH):
|
||||
"""
|
||||
Build the collision table according to the alias configuration file against the entire command table.
|
||||
|
||||
|
@ -246,7 +251,8 @@ class AliasManager(object):
|
|||
word = alias.split()[0]
|
||||
for level in range(1, levels + 1):
|
||||
collision_regex = r'^{}{}($|\s)'.format(r'([a-z\-]*\s)' * (level - 1), word.lower())
|
||||
if list(filter(re.compile(collision_regex).match, reserved_commands)):
|
||||
if list(filter(re.compile(collision_regex).match, azext_alias.cached_reserved_commands)) \
|
||||
and level not in collided_alias[word]:
|
||||
collided_alias[word].append(level)
|
||||
|
||||
telemetry.set_collided_aliases(list(collided_alias.keys()))
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
{
|
||||
"azext.minCliCoreVersion": "2.0.28",
|
||||
"azext.minCliCoreVersion": "2.0.31.dev0",
|
||||
"azext.isPreview": true
|
||||
}
|
|
@ -7,10 +7,9 @@ import hashlib
|
|||
|
||||
from knack.util import CLIError
|
||||
|
||||
import azext_alias
|
||||
from azext_alias._const import ALIAS_NOT_FOUND_ERROR
|
||||
from azext_alias.alias import GLOBAL_ALIAS_PATH, AliasManager
|
||||
from azext_alias.util import get_alias_table
|
||||
from azext_alias.util import get_alias_table, build_tab_completion_table
|
||||
|
||||
|
||||
def create_alias(alias_name, alias_command):
|
||||
|
@ -78,6 +77,6 @@ def _commit_change(alias_table):
|
|||
alias_config_hash = hashlib.sha1(alias_config_file.read().encode('utf-8')).hexdigest()
|
||||
AliasManager.write_alias_config_hash(alias_config_hash)
|
||||
|
||||
collided_alias = AliasManager.build_collision_table(alias_table.sections(),
|
||||
azext_alias.cached_reserved_commands)
|
||||
collided_alias = AliasManager.build_collision_table(alias_table.sections())
|
||||
AliasManager.write_collided_alias(collided_alias)
|
||||
build_tab_completion_table(alias_table)
|
||||
|
|
|
@ -0,0 +1,143 @@
|
|||
# --------------------------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
# --------------------------------------------------------------------------------------------
|
||||
|
||||
import json
|
||||
import timeit
|
||||
|
||||
from knack.log import get_logger
|
||||
|
||||
from azure.cli.command_modules.interactive.azclishell.command_tree import CommandBranch
|
||||
from azext_alias import telemetry
|
||||
from azext_alias.alias import AliasManager
|
||||
from azext_alias.util import (
|
||||
is_alias_create_command,
|
||||
cache_reserved_commands,
|
||||
get_alias_table,
|
||||
filter_aliases
|
||||
)
|
||||
from azext_alias._const import DEBUG_MSG_WITH_TIMING, GLOBAL_ALIAS_TAB_COMP_TABLE_PATH
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
def alias_event_handler(_, **kwargs):
|
||||
"""
|
||||
An event handler for alias transformation when EVENT_INVOKER_PRE_TRUNCATE_CMD_TBL event is invoked.
|
||||
"""
|
||||
try:
|
||||
telemetry.start()
|
||||
|
||||
start_time = timeit.default_timer()
|
||||
args = kwargs.get('args')
|
||||
alias_manager = AliasManager(**kwargs)
|
||||
|
||||
# [:] will keep the reference of the original args
|
||||
args[:] = alias_manager.transform(args)
|
||||
|
||||
if is_alias_create_command(args):
|
||||
load_cmd_tbl_func = kwargs.get('load_cmd_tbl_func', lambda _: {})
|
||||
cache_reserved_commands(load_cmd_tbl_func)
|
||||
|
||||
elapsed_time = (timeit.default_timer() - start_time) * 1000
|
||||
logger.debug(DEBUG_MSG_WITH_TIMING, args, elapsed_time)
|
||||
|
||||
telemetry.set_execution_time(round(elapsed_time, 2))
|
||||
except Exception as client_exception: # pylint: disable=broad-except
|
||||
telemetry.set_exception(client_exception)
|
||||
raise
|
||||
finally:
|
||||
telemetry.conclude()
|
||||
|
||||
|
||||
def enable_aliases_autocomplete(_, **kwargs):
|
||||
"""
|
||||
Enable aliases autocomplete by injecting aliases into Azure CLI tab completion list.
|
||||
"""
|
||||
external_completions = kwargs.get('external_completions', [])
|
||||
prefix = kwargs.get('cword_prefix', [])
|
||||
cur_commands = kwargs.get('comp_words', [])
|
||||
alias_table = get_alias_table()
|
||||
# Transform aliases if they are in current commands,
|
||||
# so parser can get the correct subparser when chaining aliases
|
||||
_transform_cur_commands(cur_commands, alias_table=alias_table)
|
||||
|
||||
for alias, alias_command in filter_aliases(alias_table):
|
||||
if alias.startswith(prefix) and alias.strip() != prefix and _is_autocomplete_valid(cur_commands, alias_command):
|
||||
# Only autocomplete the first word because alias is space-delimited
|
||||
external_completions.append(alias)
|
||||
|
||||
# Append spaces if necessary (https://github.com/kislyuk/argcomplete/blob/master/argcomplete/__init__.py#L552-L559)
|
||||
prequote = kwargs.get('cword_prequote', '')
|
||||
continuation_chars = "=/:"
|
||||
if len(external_completions) == 1 and external_completions[0][-1] not in continuation_chars and not prequote:
|
||||
external_completions[0] += ' '
|
||||
|
||||
|
||||
def transform_cur_commands_interactive(_, **kwargs):
|
||||
"""
|
||||
Transform any aliases in current commands in interactive into their respective commands.
|
||||
"""
|
||||
event_payload = kwargs.get('event_payload', {})
|
||||
# text_split = current commands typed in the interactive shell without any unfinished word
|
||||
# text = current commands typed in the interactive shell
|
||||
cur_commands = event_payload.get('text', '').split(' ')
|
||||
_transform_cur_commands(cur_commands)
|
||||
|
||||
event_payload.update({
|
||||
'text': ' '.join(cur_commands)
|
||||
})
|
||||
|
||||
|
||||
def enable_aliases_autocomplete_interactive(_, **kwargs):
|
||||
"""
|
||||
Enable aliases autocomplete on interactive mode by injecting aliases in the command tree.
|
||||
"""
|
||||
subtree = kwargs.get('subtree', None)
|
||||
if not subtree or not hasattr(subtree, 'children'):
|
||||
return
|
||||
|
||||
for alias, alias_command in filter_aliases(get_alias_table()):
|
||||
# Only autocomplete the first word because alias is space-delimited
|
||||
if subtree.in_tree(alias_command.split()):
|
||||
subtree.add_child(CommandBranch(alias))
|
||||
|
||||
|
||||
def _is_autocomplete_valid(cur_commands, alias_command):
|
||||
"""
|
||||
Determine whether autocomplete can be performed at the current state.
|
||||
|
||||
Args:
|
||||
parser: The current CLI parser.
|
||||
cur_commands: The current commands typed in the console.
|
||||
alias_command: The alias command.
|
||||
|
||||
Returns:
|
||||
True if autocomplete can be performed.
|
||||
"""
|
||||
parent_command = ' '.join(cur_commands[1:])
|
||||
with open(GLOBAL_ALIAS_TAB_COMP_TABLE_PATH, 'r') as tab_completion_table_file:
|
||||
try:
|
||||
tab_completion_table = json.loads(tab_completion_table_file.read())
|
||||
return alias_command in tab_completion_table and parent_command in tab_completion_table[alias_command]
|
||||
except Exception: # pylint: disable=broad-except
|
||||
return False
|
||||
|
||||
|
||||
def _transform_cur_commands(cur_commands, alias_table=None):
|
||||
"""
|
||||
Transform any aliases in cur_commands into their respective commands.
|
||||
|
||||
Args:
|
||||
alias_table: The alias table.
|
||||
cur_commands: current commands typed in the console.
|
||||
"""
|
||||
transformed = []
|
||||
alias_table = alias_table if alias_table else get_alias_table()
|
||||
for cmd in cur_commands:
|
||||
if cmd in alias_table.sections() and alias_table.has_option(cmd, 'command'):
|
||||
transformed += alias_table.get(cmd, 'command').split()
|
||||
else:
|
||||
transformed.append(cmd)
|
||||
cur_commands[:] = transformed
|
|
@ -51,6 +51,7 @@ class AliasExtensionTelemetrySession(object):
|
|||
'Reserved.DataModel.Fault.Exception.StackTrace': _get_stack_trace(),
|
||||
}
|
||||
self.set_custom_properties(properties, 'ActionType', 'Exception')
|
||||
self.set_custom_properties(properties, 'Version', VERSION)
|
||||
events.append(properties)
|
||||
|
||||
return events
|
||||
|
@ -133,7 +134,7 @@ def set_number_of_aliases_registered(num_aliases):
|
|||
|
||||
@decorators.suppress_all_exceptions(raise_in_diagnostics=True)
|
||||
def conclude():
|
||||
if not _session.aliases_hit:
|
||||
if not _session.aliases_hit and not _session.exceptions:
|
||||
return
|
||||
|
||||
_session.end_time = datetime.datetime.now()
|
||||
|
|
|
@ -84,4 +84,4 @@ aodfgojadofgjaojdfog
|
|||
|
||||
TEST_RESERVED_COMMANDS = ['account list-locations',
|
||||
'network dns',
|
||||
'storage account']
|
||||
'storage account create']
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
# Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
# --------------------------------------------------------------------------------------------
|
||||
|
||||
# pylint: disable=line-too-long,import-error,no-self-use,deprecated-method,pointless-string-statement,relative-import,no-member,redefined-outer-name,too-many-return-statements
|
||||
# pylint: disable=line-too-long,import-error,no-self-use,deprecated-method,pointless-string-statement,relative-import,no-member,redefined-outer-name,too-many-return-statements,,anomalous-backslash-in-string
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
@ -54,7 +54,8 @@ TEST_DATA = {
|
|||
('create-vm --image ubtuntults --generate-ssh-key --no-wait', 'vm create -g test-group -n test-vm --image ubtuntults --generate-ssh-key --no-wait'),
|
||||
('cp mn diag', 'storage blob copy start-batch --source-uri mn --destination-container diag'),
|
||||
('storage-ls azurecliprod.blob.core.windows.net/cli-extensions', 'storage blob list --account-name azurecliprod --container-name cli-extensions'),
|
||||
('storage-ls-2 https://azurecliprod.blob.core.windows.net/cli-extensions', 'storage blob list --account-name azurecliprod --container-name cli-extensions')
|
||||
('storage-ls-2 https://azurecliprod.blob.core.windows.net/cli-extensions', 'storage blob list --account-name azurecliprod --container-name cli-extensions'),
|
||||
('alias create -n mkrgrp -c "group create -n test --tags owner=\\$USER"', 'alias create -n mkrgrp -c "group create -n test --tags owner=\\$USER"')
|
||||
],
|
||||
TEST_TRANSFORM_COLLIDED_ALIAS: [
|
||||
('account list -otable', 'account list -otable'),
|
||||
|
@ -91,7 +92,7 @@ def test_transform_alias(self, test_case):
|
|||
|
||||
def test_transform_collided_alias(self, test_case):
|
||||
alias_manager = self.get_alias_manager(COLLISION_MOCK_ALIAS_STRING, TEST_RESERVED_COMMANDS)
|
||||
alias_manager.collided_alias = azext_alias.alias.AliasManager.build_collision_table(alias_manager.alias_table.sections(), azext_alias.cached_reserved_commands)
|
||||
alias_manager.collided_alias = azext_alias.alias.AliasManager.build_collision_table(alias_manager.alias_table.sections())
|
||||
self.assertEqual(shlex.split(test_case[1]), alias_manager.transform(shlex.split(test_case[0])))
|
||||
|
||||
|
||||
|
@ -111,8 +112,9 @@ def test_post_transform_env_var(self, test_case):
|
|||
|
||||
def test_inconsistent_placeholder_index(self, test_case):
|
||||
alias_manager = self.get_alias_manager()
|
||||
with self.assertRaises(CLIError):
|
||||
with self.assertRaises(CLIError) as cm:
|
||||
alias_manager.transform(test_case)
|
||||
self.assertEqual(str(cm.exception), 'alias: "cp {{ arg_1 }} {{ arg_2 }}" takes exactly 2 positional arguments (%s given)' % str(len(test_case) - 1))
|
||||
|
||||
|
||||
def test_parse_error_python_3(self, test_case):
|
||||
|
@ -152,12 +154,12 @@ class TestAlias(unittest.TestCase):
|
|||
|
||||
def test_build_empty_collision_table(self):
|
||||
alias_manager = self.get_alias_manager(DEFAULT_MOCK_ALIAS_STRING, TEST_RESERVED_COMMANDS)
|
||||
test_case = azext_alias.alias.AliasManager.build_collision_table(alias_manager.alias_table.sections(), azext_alias.cached_reserved_commands)
|
||||
test_case = azext_alias.alias.AliasManager.build_collision_table(alias_manager.alias_table.sections())
|
||||
self.assertDictEqual(dict(), test_case)
|
||||
|
||||
def test_build_non_empty_collision_table(self):
|
||||
alias_manager = self.get_alias_manager(COLLISION_MOCK_ALIAS_STRING, TEST_RESERVED_COMMANDS)
|
||||
test_case = azext_alias.alias.AliasManager.build_collision_table(alias_manager.alias_table.sections(), azext_alias.cached_reserved_commands, levels=2)
|
||||
test_case = azext_alias.alias.AliasManager.build_collision_table(alias_manager.alias_table.sections(), levels=2)
|
||||
self.assertDictEqual({'account': [1, 2], 'dns': [2], 'list-locations': [2]}, test_case)
|
||||
|
||||
def test_non_parse_error(self):
|
||||
|
|
|
@ -3,22 +3,21 @@
|
|||
# Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
# --------------------------------------------------------------------------------------------
|
||||
|
||||
# pylint: disable=line-too-long,anomalous-backslash-in-string
|
||||
# pylint: disable=line-too-long
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import unittest
|
||||
import mock
|
||||
|
||||
from azure.cli.testsdk import ScenarioTest
|
||||
from azext_alias import (
|
||||
alias,
|
||||
custom
|
||||
)
|
||||
from azext_alias import alias
|
||||
from azext_alias._const import (
|
||||
ALIAS_FILE_NAME,
|
||||
ALIAS_HASH_FILE_NAME,
|
||||
COLLIDED_ALIAS_FILE_NAME
|
||||
COLLIDED_ALIAS_FILE_NAME,
|
||||
ALIAS_TAB_COMP_TABLE_FILE_NAME
|
||||
)
|
||||
|
||||
|
||||
|
@ -26,13 +25,19 @@ class AliasTests(ScenarioTest):
|
|||
|
||||
def setUp(self):
|
||||
self.mock_config_dir = tempfile.mkdtemp()
|
||||
alias.GLOBAL_CONFIG_DIR = self.mock_config_dir
|
||||
alias.GLOBAL_ALIAS_PATH = os.path.join(self.mock_config_dir, ALIAS_FILE_NAME)
|
||||
alias.GLOBAL_ALIAS_HASH_PATH = os.path.join(self.mock_config_dir, ALIAS_HASH_FILE_NAME)
|
||||
alias.GLOBAL_COLLIDED_ALIAS_PATH = os.path.join(self.mock_config_dir, COLLIDED_ALIAS_FILE_NAME)
|
||||
custom.GLOBAL_ALIAS_PATH = os.path.join(self.mock_config_dir, ALIAS_FILE_NAME)
|
||||
self.patchers = []
|
||||
self.patchers.append(mock.patch('azext_alias.alias.GLOBAL_CONFIG_DIR', self.mock_config_dir))
|
||||
self.patchers.append(mock.patch('azext_alias.alias.GLOBAL_ALIAS_PATH', os.path.join(self.mock_config_dir, ALIAS_FILE_NAME)))
|
||||
self.patchers.append(mock.patch('azext_alias.alias.GLOBAL_ALIAS_HASH_PATH', os.path.join(self.mock_config_dir, ALIAS_HASH_FILE_NAME)))
|
||||
self.patchers.append(mock.patch('azext_alias.alias.GLOBAL_COLLIDED_ALIAS_PATH', os.path.join(self.mock_config_dir, COLLIDED_ALIAS_FILE_NAME)))
|
||||
self.patchers.append(mock.patch('azext_alias.util.GLOBAL_ALIAS_TAB_COMP_TABLE_PATH', os.path.join(self.mock_config_dir, ALIAS_TAB_COMP_TABLE_FILE_NAME)))
|
||||
self.patchers.append(mock.patch('azext_alias.custom.GLOBAL_ALIAS_PATH', os.path.join(self.mock_config_dir, ALIAS_FILE_NAME)))
|
||||
for patcher in self.patchers:
|
||||
patcher.start()
|
||||
|
||||
def tearDown(self):
|
||||
for patcher in self.patchers:
|
||||
patcher.stop()
|
||||
shutil.rmtree(self.mock_config_dir)
|
||||
|
||||
def test_create_and_list_alias(self):
|
||||
|
@ -40,55 +45,19 @@ class AliasTests(ScenarioTest):
|
|||
'alias_name': 'c',
|
||||
'alias_command': 'create'
|
||||
})
|
||||
self.cmd('az alias create -n \'{alias_name}\' -c \'{alias_command}\'')
|
||||
self.cmd('az alias create -n "{alias_name}" -c "{alias_command}"')
|
||||
self.cmd('az alias list', checks=[
|
||||
self.check('[0].alias', '{alias_name}'),
|
||||
self.check('[0].command', '{alias_command}'),
|
||||
self.check('length(@)', 1)
|
||||
])
|
||||
|
||||
def test_create_and_list_alias_env_var(self):
|
||||
self.kwargs.update({
|
||||
'alias_name': 'mkrgrp',
|
||||
'alias_command': 'group create -n test --tags owner=\$USER'
|
||||
})
|
||||
self.cmd('az alias create -n \'{alias_name}\' -c \'{alias_command}\'')
|
||||
self.cmd('az alias list', checks=[
|
||||
self.check('[0].alias', '{alias_name}'),
|
||||
self.check('[0].command', '{alias_command}'),
|
||||
self.check('length(@)', 1)
|
||||
])
|
||||
alias_command = self.cmd('az alias list').get_output_in_json()[0]['command']
|
||||
assert '\\$USER' in alias_command
|
||||
|
||||
def test_create_and_list_alias_with_pos_arg(self):
|
||||
self.kwargs.update({
|
||||
'alias_name': 'list-vm {{ resource_group }}',
|
||||
'alias_command': 'vm list - -resource-group {{ resource_group }}'
|
||||
})
|
||||
self.cmd('az alias create -n \'{alias_name}\' -c \'{alias_command}\'')
|
||||
self.cmd('az alias list', checks=[
|
||||
self.check('[0].alias', '{alias_name}'),
|
||||
self.check('[0].command', '{alias_command}'),
|
||||
self.check('length(@)', 1)
|
||||
])
|
||||
self.kwargs.update({
|
||||
'alias_name': 'storage-ls {{ url }}',
|
||||
'alias_command': 'storage blob list --account-name {{ url.replace("https://", "").split(".")[0] }} --container-name {{ url.replace("https://", "").split("/")[1] }}'
|
||||
})
|
||||
self.cmd('az alias create -n \'{alias_name}\' -c \'{alias_command}\'')
|
||||
self.cmd('az alias list', checks=[
|
||||
self.check('[1].alias', '{alias_name}'),
|
||||
self.check('[1].command', '{alias_command}'),
|
||||
self.check('length(@)', 2)
|
||||
])
|
||||
|
||||
def test_create_alias_error(self):
|
||||
self.kwargs.update({
|
||||
'alias_name': 'c',
|
||||
'alias_command': 'will_fail'
|
||||
})
|
||||
self.cmd('az alias create -n \'{alias_name}\' -c \'{alias_command}\'', expect_failure=True)
|
||||
self.cmd('az alias create -n "{alias_name}" -c "{alias_command}"', expect_failure=True)
|
||||
self.cmd('az alias list', checks=[
|
||||
self.check('length(@)', 0)
|
||||
])
|
||||
|
@ -98,7 +67,7 @@ class AliasTests(ScenarioTest):
|
|||
'alias_name': 'c',
|
||||
'alias_command': 'create'
|
||||
})
|
||||
self.cmd('az alias create -n \'{alias_name}\' -c \'{alias_command}\'')
|
||||
self.cmd('az alias create -n "{alias_name}" -c "{alias_command}"')
|
||||
self.cmd('az alias list', checks=[
|
||||
self.check('[0].alias', '{alias_name}'),
|
||||
self.check('[0].command', '{alias_command}'),
|
||||
|
@ -123,7 +92,7 @@ class AliasTests(ScenarioTest):
|
|||
'alias_name': 'c',
|
||||
'alias_command': 'create'
|
||||
})
|
||||
self.cmd('az alias create -n \'{alias_name}\' -c \'{alias_command}\'')
|
||||
self.cmd('az alias create -n "{alias_name}" -c "{alias_command}"')
|
||||
expected_alias_string = '''[c]
|
||||
command = create
|
||||
|
||||
|
@ -136,7 +105,7 @@ command = create
|
|||
'alias_name': 'c',
|
||||
'alias_command': 'create'
|
||||
})
|
||||
self.cmd('az alias create -n \'{alias_name}\' -c \'{alias_command}\'')
|
||||
self.cmd('az alias create -n "{alias_name}" -c "{alias_command}"')
|
||||
self.cmd('az alias list', checks=[
|
||||
self.check('[0].alias', '{alias_name}'),
|
||||
self.check('[0].command', '{alias_command}'),
|
||||
|
|
|
@ -30,24 +30,29 @@ class TestArgument(unittest.TestCase):
|
|||
self.assertListEqual(['_0', '_1', 'arg_1', 'arg_2'], get_placeholders('{{ 0 }} {{ 1 }} {{ arg_1 }} {{ arg_2 }}'))
|
||||
|
||||
def test_get_placeholders_duplicate(self):
|
||||
with self.assertRaises(CLIError):
|
||||
with self.assertRaises(CLIError) as cm:
|
||||
get_placeholders('{{ arg_1 }} {{ arg_1 }}', check_duplicates=True)
|
||||
self.assertEqual(str(cm.exception), 'alias: Duplicated placeholders found when transforming "{{ arg_1 }} {{ arg_1 }}"')
|
||||
|
||||
def test_get_placeholders_no_opening_bracket(self):
|
||||
with self.assertRaises(CLIError):
|
||||
with self.assertRaises(CLIError) as cm:
|
||||
get_placeholders('arg_1 }}')
|
||||
self.assertEqual(str(cm.exception), 'alias: Brackets in "arg_1 }}" are not enclosed properly')
|
||||
|
||||
def test_get_placeholders_double_opening_bracket(self):
|
||||
with self.assertRaises(CLIError):
|
||||
with self.assertRaises(CLIError) as cm:
|
||||
get_placeholders('{{ {{ arg_1')
|
||||
self.assertEqual(str(cm.exception), 'alias: Brackets in "{{ {{ arg_1" are not enclosed properly')
|
||||
|
||||
def test_get_placeholders_double_closing_bracket(self):
|
||||
with self.assertRaises(CLIError):
|
||||
with self.assertRaises(CLIError) as cm:
|
||||
get_placeholders('{{ arg_1 }} }}')
|
||||
self.assertEqual(str(cm.exception), 'alias: Brackets in "{{ arg_1 }} }}" are not enclosed properly')
|
||||
|
||||
def test_get_placeholders_no_closing_bracket(self):
|
||||
with self.assertRaises(CLIError):
|
||||
with self.assertRaises(CLIError) as cm:
|
||||
get_placeholders('{{ arg_1 ')
|
||||
self.assertEqual(str(cm.exception), 'alias: Brackets in "{{ arg_1 " are not enclosed properly')
|
||||
|
||||
def test_normalize_placeholders(self):
|
||||
self.assertEqual('"{{ arg_1 }}" "{{ arg_2 }}"', normalize_placeholders('{{ arg_1 }} {{ arg_2 }}', inject_quotes=True))
|
||||
|
@ -75,8 +80,9 @@ class TestArgument(unittest.TestCase):
|
|||
self.assertDictEqual(expected, build_pos_args_table('{{ 0 }} {{ arg_1 }} {{ arg_2 }} {{ arg_3 }}', ['{"test": "test"}', 'test1 test2', 'arg with spaces', '"azure cli"'], 0))
|
||||
|
||||
def test_build_pos_args_table_not_enough_arguments(self):
|
||||
with self.assertRaises(CLIError):
|
||||
with self.assertRaises(CLIError) as cm:
|
||||
build_pos_args_table('{{ arg_1 }} {{ arg_2 }}', ['test_1', 'test_2'], 1)
|
||||
self.assertEqual(str(cm.exception), 'alias: "{{ arg_1 }} {{ arg_2 }}" takes exactly 2 positional arguments (1 given)')
|
||||
|
||||
def test_render_template(self):
|
||||
pos_args_table = {
|
||||
|
@ -105,12 +111,13 @@ class TestArgument(unittest.TestCase):
|
|||
self.assertListEqual(['argument with spaces'.upper()], render_template('{{ arg_1.upper() }}', pos_args_table))
|
||||
|
||||
def test_render_template_error(self):
|
||||
with self.assertRaises(CLIError):
|
||||
with self.assertRaises(CLIError) as cm:
|
||||
pos_args_table = {
|
||||
'arg_1': 'test_1',
|
||||
'arg_2': 'test_2'
|
||||
}
|
||||
render_template('{{ arg_1 }} {{ arg_2 }', pos_args_table)
|
||||
self.assertEqual(str(cm.exception), 'alias: Encounted the following error when injecting positional arguments to ""{{ arg_1 }}" "{{ arg_2 }" - unexpected \'}\'')
|
||||
|
||||
def test_check_runtime_errors_no_error(self):
|
||||
pos_args_table = {
|
||||
|
@ -120,12 +127,13 @@ class TestArgument(unittest.TestCase):
|
|||
check_runtime_errors('{{ arg_1.split("_")[0] }} {{ arg_2.split("_")[1] }}', pos_args_table)
|
||||
|
||||
def test_check_runtime_errors_has_error(self):
|
||||
with self.assertRaises(CLIError):
|
||||
with self.assertRaises(CLIError) as cm:
|
||||
pos_args_table = {
|
||||
'arg_1': 'test_1',
|
||||
'arg_2': 'test_2'
|
||||
}
|
||||
check_runtime_errors('{{ arg_1.split("_")[2] }} {{ arg_2.split("_")[1] }}', pos_args_table)
|
||||
self.assertEqual(str(cm.exception), 'alias: Encounted the following error when evaluating "arg_1.split("_")[2]" - list index out of range')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -73,8 +73,9 @@ class AliasCustomCommandTest(unittest.TestCase):
|
|||
mock_alias_table.add_section('ac')
|
||||
mock_alias_table.set('ac', 'command', 'account')
|
||||
azext_alias.custom.get_alias_table = Mock(return_value=mock_alias_table)
|
||||
with self.assertRaises(CLIError):
|
||||
with self.assertRaises(CLIError) as cm:
|
||||
remove_alias('dns')
|
||||
self.assertEqual(str(cm.exception), 'alias: "dns" alias not found')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
# --------------------------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
# --------------------------------------------------------------------------------------------
|
||||
|
||||
# pylint: disable=line-too-long
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import unittest
|
||||
import mock
|
||||
|
||||
import azext_alias
|
||||
from azext_alias.util import remove_pos_arg_placeholders, build_tab_completion_table, get_config_parser
|
||||
from azext_alias._const import ALIAS_TAB_COMP_TABLE_FILE_NAME
|
||||
from azext_alias.tests._const import TEST_RESERVED_COMMANDS
|
||||
|
||||
|
||||
class TestUtil(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.mock_config_dir = tempfile.mkdtemp()
|
||||
self.patcher = mock.patch('azext_alias.util.GLOBAL_ALIAS_TAB_COMP_TABLE_PATH', os.path.join(self.mock_config_dir, ALIAS_TAB_COMP_TABLE_FILE_NAME))
|
||||
self.patcher.start()
|
||||
azext_alias.cached_reserved_commands = TEST_RESERVED_COMMANDS
|
||||
|
||||
def tearDown(self):
|
||||
self.patcher.stop()
|
||||
shutil.rmtree(self.mock_config_dir)
|
||||
|
||||
def test_remove_pos_arg_placeholders(self):
|
||||
self.assertEqual('webapp create', remove_pos_arg_placeholders('webapp create'))
|
||||
|
||||
def test_remove_pos_arg_placeholders_with_pos_arg(self):
|
||||
self.assertEqual('network dns', remove_pos_arg_placeholders('network dns {{ arg_1 }}'))
|
||||
|
||||
def test_remove_pos_arg_placeholders_with_args(self):
|
||||
self.assertEqual('vm create', remove_pos_arg_placeholders('vm create -g test -n test'))
|
||||
|
||||
def test_remove_pos_arg_placeholders_with_query(self):
|
||||
self.assertEqual('group list', remove_pos_arg_placeholders('group list --query "[].{Name:name, Location:location}" --output table'))
|
||||
|
||||
def test_build_tab_completion_table(self):
|
||||
mock_alias_table = get_config_parser()
|
||||
mock_alias_table.add_section('ac')
|
||||
mock_alias_table.set('ac', 'command', 'account')
|
||||
mock_alias_table.add_section('ll')
|
||||
mock_alias_table.set('ll', 'command', 'list-locations')
|
||||
mock_alias_table.add_section('n')
|
||||
mock_alias_table.set('n', 'command', 'network')
|
||||
mock_alias_table.add_section('al')
|
||||
mock_alias_table.set('al', 'command', 'account list-locations')
|
||||
tab_completion_table = build_tab_completion_table(mock_alias_table)
|
||||
self.assertDictEqual({
|
||||
'account': ['', 'storage'],
|
||||
'list-locations': ['account'],
|
||||
'network': [''],
|
||||
'account list-locations': ['']
|
||||
}, tab_completion_table)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -3,10 +3,15 @@
|
|||
# Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
# --------------------------------------------------------------------------------------------
|
||||
|
||||
import re
|
||||
import sys
|
||||
import json
|
||||
import shlex
|
||||
from collections import defaultdict
|
||||
from six.moves import configparser
|
||||
|
||||
import azext_alias
|
||||
from azext_alias._const import COLLISION_CHECK_LEVEL_DEPTH, GLOBAL_ALIAS_TAB_COMP_TABLE_PATH
|
||||
|
||||
|
||||
def get_config_parser():
|
||||
|
@ -52,3 +57,76 @@ def cache_reserved_commands(load_cmd_tbl_func):
|
|||
"""
|
||||
if not azext_alias.cached_reserved_commands:
|
||||
azext_alias.cached_reserved_commands = list(load_cmd_tbl_func([]).keys())
|
||||
|
||||
|
||||
def remove_pos_arg_placeholders(alias_command):
|
||||
"""
|
||||
Remove positional argument placeholders from alias_command.
|
||||
|
||||
Args:
|
||||
alias_command: The alias command to remove from.
|
||||
"""
|
||||
# Boundary index is the index at which named argument or positional argument starts
|
||||
split_command = shlex.split(alias_command)
|
||||
boundary_index = len(split_command)
|
||||
for i, subcommand in enumerate(split_command):
|
||||
if not re.match('^[a-z]', subcommand.lower()) or i > COLLISION_CHECK_LEVEL_DEPTH:
|
||||
boundary_index = i
|
||||
break
|
||||
|
||||
return ' '.join(split_command[:boundary_index]).lower()
|
||||
|
||||
|
||||
def filter_aliases(alias_table):
|
||||
"""
|
||||
Filter aliases that does not have a command field in the configuration file.
|
||||
|
||||
Args:
|
||||
alias_table: The alias table.
|
||||
|
||||
Yield:
|
||||
A tuple with [0] being the first word of the alias and
|
||||
[1] being the command that the alias points to.
|
||||
"""
|
||||
for alias in alias_table.sections():
|
||||
if alias_table.has_option(alias, 'command'):
|
||||
yield (alias.split()[0], remove_pos_arg_placeholders(alias_table.get(alias, 'command')))
|
||||
|
||||
|
||||
def build_tab_completion_table(alias_table):
|
||||
"""
|
||||
Build a dictionary where the keys are all the alias commands (without positional argument placeholders)
|
||||
and the values are all the parent commands of the keys. After that, write the table into a file.
|
||||
The purpose of the dictionary is to validate the alias tab completion state.
|
||||
|
||||
For example:
|
||||
{
|
||||
"group": ["", "ad"],
|
||||
"dns": ["network"]
|
||||
}
|
||||
|
||||
Args:
|
||||
alias_table: The alias table.
|
||||
|
||||
Returns:
|
||||
The tab completion table.
|
||||
"""
|
||||
alias_commands = [t[1] for t in filter_aliases(alias_table)]
|
||||
tab_completion_table = defaultdict(list)
|
||||
for alias_command in alias_commands:
|
||||
for reserved_command in azext_alias.cached_reserved_commands:
|
||||
# Check if alias_command has no parent command
|
||||
if reserved_command == alias_command or reserved_command.startswith(alias_command + ' ') \
|
||||
and '' not in tab_completion_table[alias_command]:
|
||||
tab_completion_table[alias_command].append('')
|
||||
elif ' {} '.format(alias_command) in reserved_command or reserved_command.endswith(' ' + alias_command):
|
||||
# Extract parent commands
|
||||
index = reserved_command.index(alias_command)
|
||||
parent_command = reserved_command[:index - 1]
|
||||
if parent_command not in tab_completion_table[alias_command]:
|
||||
tab_completion_table[alias_command].append(parent_command)
|
||||
|
||||
with open(GLOBAL_ALIAS_TAB_COMP_TABLE_PATH, 'w') as f:
|
||||
f.write(json.dumps(tab_completion_table))
|
||||
|
||||
return tab_completion_table
|
||||
|
|
|
@ -3,4 +3,4 @@
|
|||
# Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
# --------------------------------------------------------------------------------------------
|
||||
|
||||
VERSION = '0.3.0'
|
||||
VERSION = '0.4.0'
|
||||
|
|
|
@ -460,12 +460,12 @@
|
|||
],
|
||||
"alias": [
|
||||
{
|
||||
"filename": "alias-0.3.0-py2.py3-none-any.whl",
|
||||
"sha256Digest": "d76471db272dec5df441d2b5242f8be8200ae3e47b97f4e4c4cbbd53b2a91ba3",
|
||||
"downloadUrl": "https://azurecliprod.blob.core.windows.net/cli-extensions/alias-0.3.0-py2.py3-none-any.whl",
|
||||
"filename": "alias-0.4.0-py2.py3-none-any.whl",
|
||||
"sha256Digest": "fdc818e1b814c7f687f4cda4718e6a1c16eb827eac72b22afc5368dce9cdd358",
|
||||
"downloadUrl": "https://azurecliprod.blob.core.windows.net/cli-extensions/alias-0.4.0-py2.py3-none-any.whl",
|
||||
"metadata": {
|
||||
"azext.isPreview": true,
|
||||
"azext.minCliCoreVersion": "2.0.28",
|
||||
"azext.minCliCoreVersion": "2.0.31.dev0",
|
||||
"classifiers": [
|
||||
"Development Status :: 4 - Beta",
|
||||
"Intended Audience :: Developers",
|
||||
|
@ -509,7 +509,7 @@
|
|||
}
|
||||
],
|
||||
"summary": "Support for command aliases",
|
||||
"version": "0.3.0"
|
||||
"version": "0.4.0"
|
||||
}
|
||||
}
|
||||
],
|
||||
|
|
Загрузка…
Ссылка в новой задаче