зеркало из https://github.com/mozilla/gecko-dev.git
Bug 1197370 - Add chown method to adb.py, refactor chmod r=bc
Behavior changes: - added method for ADBDevice class called chown - when initializing ADBDevice class, check if recursive flag is supported (similar to chmod -R) Other changes: - handling for situation where recursive is desired but -R flag is not supported is in place - changed behavior above situation to mirror chmod (creation of temporary file based on `self.ls` output, then executing script on device using adb Unit Tests: - unit tests to exercise attributes and common paths created. Would need further expansion of tests at some point. - additional mocking fixtures created. Differential Revision: https://phabricator.services.mozilla.com/D8128 --HG-- extra : moz-landing-system : lando
This commit is contained in:
Родитель
e36de7239c
Коммит
b4f0f75efe
|
@ -673,12 +673,23 @@ class ADBDevice(ADBCommand):
|
|||
match = re_recurse.search(chmod_output)
|
||||
if match:
|
||||
self._chmod_R = True
|
||||
except (ADBError, ADBTimeoutError) as e:
|
||||
self._logger.debug('Check chmod -R: %s' % e)
|
||||
except ADBError as e:
|
||||
self._logger.debug("Check chmod -R: {}".format(e))
|
||||
match = re_recurse.search(e.message)
|
||||
if match:
|
||||
self._chmod_R = True
|
||||
self._logger.info("Native chmod -R support: %s" % self._chmod_R)
|
||||
self._logger.info("Native chmod -R support: {}".format(self._chmod_R))
|
||||
|
||||
# Do we have chown -R?
|
||||
try:
|
||||
self._chown_R = False
|
||||
chown_output = self.shell_output("chown --help", timeout=timeout)
|
||||
match = re_recurse.search(chown_output)
|
||||
if match:
|
||||
self._chown_R = True
|
||||
except ADBError as e:
|
||||
self._logger.debug("Check chown -R: {}".format(e))
|
||||
self._logger.info("Native chown -R support: {}".format(self._chown_R))
|
||||
|
||||
try:
|
||||
cleared = self.shell_bool('logcat -P ""', timeout=timeout)
|
||||
|
@ -1610,6 +1621,37 @@ class ADBDevice(ADBCommand):
|
|||
if not rv.startswith("remount succeeded"):
|
||||
raise ADBError("Unable to remount device")
|
||||
|
||||
def batch_execute(self, commands, timeout=None, root=False):
|
||||
"""Writes commands to a temporary file then executes on the device.
|
||||
|
||||
:param list commands_list: List of commands to be run by the shell.
|
||||
:param timeout: The maximum time in
|
||||
seconds for any spawned adb process to complete before throwing
|
||||
an ADBTimeoutError.
|
||||
This timeout is per adb call. The total time spent
|
||||
may exceed this value. If it is not specified, the value
|
||||
set in the ADBDevice constructor is used.
|
||||
:type timeout: integer or None
|
||||
:param bool root: Flag specifying if the command should
|
||||
be executed as root.
|
||||
:raises: * ADBTimeoutError
|
||||
* ADBRootError
|
||||
* ADBError
|
||||
"""
|
||||
try:
|
||||
tmpf = tempfile.NamedTemporaryFile(delete=False)
|
||||
tmpf.write('\n'.join(commands))
|
||||
tmpf.close()
|
||||
script = '/sdcard/{}'.format(os.path.basename(tmpf.name))
|
||||
self.push(tmpf.name, script)
|
||||
self.shell_output('sh {}'.format(script), timeout=timeout,
|
||||
root=root)
|
||||
finally:
|
||||
if tmpf:
|
||||
os.unlink(tmpf.name)
|
||||
if script:
|
||||
self.rm(script, timeout=timeout, root=root)
|
||||
|
||||
def chmod(self, path, recursive=False, mask="777", timeout=None, root=False):
|
||||
"""Recursively changes the permissions of a directory on the
|
||||
device.
|
||||
|
@ -1650,44 +1692,78 @@ class ADBDevice(ADBCommand):
|
|||
self._logger.warning('Ignoring attempt to chmod external storage')
|
||||
return
|
||||
|
||||
if not recursive:
|
||||
self.shell_output("chmod %s %s" % (mask, path),
|
||||
timeout=timeout, root=root)
|
||||
# build up the command to be run based on capabilities.
|
||||
command = ['chmod']
|
||||
|
||||
if recursive and self._chmod_R:
|
||||
command.append('-R')
|
||||
|
||||
command.append(mask)
|
||||
|
||||
if recursive and not self._chmod_R:
|
||||
paths = self.ls(path, recursive=True, timeout=timeout, root=root)
|
||||
base = ' '.join(command)
|
||||
commands = [' '.join([base, entry]) for entry in paths]
|
||||
self.batch_execute(commands, timeout, root)
|
||||
else:
|
||||
command.append(path)
|
||||
self.shell_output(cmd=' '.join(command), timeout=timeout, root=root)
|
||||
|
||||
def chown(self, path, owner, group=None, recursive=False, timeout=None, root=False):
|
||||
"""Run the chown command on the provided path.
|
||||
|
||||
:param str path: path name on the device.
|
||||
:param str owner: new owner of the path.
|
||||
:param group: optional parameter specifying the new group the path
|
||||
should belong to.
|
||||
:type group: str or None
|
||||
:param bool recursive: optional value specifying whether the command should
|
||||
operate on files and directories recursively.
|
||||
:param timeout: The maximum time in
|
||||
seconds for any spawned adb process to complete before throwing
|
||||
an ADBTimeoutError.
|
||||
This timeout is per adb call. The total time spent
|
||||
may exceed this value. If it is not specified, the value
|
||||
set in the ADBDevice constructor is used.
|
||||
:type timeout: integer or None
|
||||
:param bool root: Flag specifying if the command should
|
||||
be executed as root.
|
||||
:raises: * ADBTimeoutError
|
||||
* ADBRootError
|
||||
* ADBError
|
||||
"""
|
||||
path = posixpath.normpath(path.strip())
|
||||
if self.is_path_internal_storage(path, timeout=timeout):
|
||||
self._logger.warning('Ignoring attempt to chown external storage')
|
||||
return
|
||||
|
||||
if self._chmod_R:
|
||||
try:
|
||||
self.shell_output("chmod -R %s %s" % (mask, path),
|
||||
timeout=timeout, root=root)
|
||||
except ADBError as e:
|
||||
if e.message.find('No such file or directory') == -1:
|
||||
raise
|
||||
self._logger.warning('chmod -R %s %s: Ignoring Error: %s' %
|
||||
(mask, path, e.message))
|
||||
return
|
||||
# Obtain a list of the directories and files which match path
|
||||
# and construct a shell script which explictly calls chmod on
|
||||
# each of them.
|
||||
entries = self.ls(path, recursive=recursive, timeout=timeout,
|
||||
root=root)
|
||||
tmpf = None
|
||||
chmodsh = None
|
||||
try:
|
||||
tmpf = tempfile.NamedTemporaryFile(delete=False)
|
||||
for entry in entries:
|
||||
tmpf.write('chmod %s %s\n' % (mask, entry))
|
||||
tmpf.close()
|
||||
chmodsh = '/data/local/tmp/%s' % os.path.basename(tmpf.name)
|
||||
self.push(tmpf.name, chmodsh)
|
||||
self.shell_output('chmod 777 %s' % chmodsh, timeout=timeout,
|
||||
root=root)
|
||||
self.shell_output('sh -c %s' % chmodsh, timeout=timeout,
|
||||
root=root)
|
||||
finally:
|
||||
if tmpf:
|
||||
os.unlink(tmpf.name)
|
||||
if chmodsh:
|
||||
self.rm(chmodsh, timeout=timeout, root=root)
|
||||
# build up the command to be run based on capabilities.
|
||||
command = ['chown']
|
||||
|
||||
if recursive and self._chown_R:
|
||||
command.append('-R')
|
||||
|
||||
if group:
|
||||
# officially supported notation is : but . has been checked with
|
||||
# sdk 17 and it works.
|
||||
command.append('{owner}.{group}'.format(owner=owner, group=group))
|
||||
else:
|
||||
command.append(owner)
|
||||
|
||||
if recursive and not self._chown_R:
|
||||
# recursive desired, but chown -R is not supported natively.
|
||||
# like with chmod, get the list of subpaths, put them into a script
|
||||
# then run it with adb with one call.
|
||||
paths = self.ls(path, recursive=True, timeout=timeout, root=root)
|
||||
base = ' '.join(command)
|
||||
commands = [' '.join([base, entry]) for entry in paths]
|
||||
|
||||
self.batch_execute(commands, timeout, root)
|
||||
else:
|
||||
# recursive or not, and chown -R is supported natively.
|
||||
# command can simply be run as provided by the user.
|
||||
command.append(path)
|
||||
self.shell_output(cmd=' '.join(command), timeout=timeout, root=root)
|
||||
|
||||
def _test_path(self, argument, path, timeout=None, root=False):
|
||||
"""Performs path and file type checking.
|
||||
|
|
|
@ -76,10 +76,72 @@ def mock_shell_output(monkeypatch):
|
|||
elif 'pm list package' in cmd:
|
||||
apps = ["org.mozilla.fennec", "org.mozilla.geckoview_example"]
|
||||
return ('package:{}\n' * len(apps)).format(*apps)
|
||||
else:
|
||||
print(str(cmd))
|
||||
return str(cmd)
|
||||
|
||||
monkeypatch.setattr(mozdevice.ADBDevice, 'shell_output', shell_output_wrapper)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_is_path_internal_storage(monkeypatch):
|
||||
"""Monkeypatches the ADBDevice.is_path_internal_storage() method call.
|
||||
|
||||
Instead of returning the outcome of whether the path provided is
|
||||
internal storage or external, this will always return True.
|
||||
|
||||
:param object monkeypatch: pytest provided fixture for mocking.
|
||||
"""
|
||||
def is_path_internal_storage_wrapper(object, path, timeout=None):
|
||||
"""Actual monkeypatch implementation of the is_path_internal_storage() call.
|
||||
|
||||
:param str path: The path to test.
|
||||
:param timeout: The maximum time in
|
||||
seconds for any spawned adb process to complete before
|
||||
throwing an ADBTimeoutError. This timeout is per adb call. The
|
||||
total time spent may exceed this value. If it is not
|
||||
specified, the value set in the ADBDevice constructor is used.
|
||||
:returns: boolean
|
||||
|
||||
:raises: * ADBTimeoutError
|
||||
* ADBError
|
||||
"""
|
||||
if 'internal_storage' in path:
|
||||
return True
|
||||
return False
|
||||
|
||||
monkeypatch.setattr(mozdevice.ADBDevice,
|
||||
'is_path_internal_storage', is_path_internal_storage_wrapper)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_shell_bool(monkeypatch):
|
||||
"""Monkeypatches the ADBDevice.shell_bool() method call.
|
||||
|
||||
Instead of returning the output of an adb call, this method will
|
||||
return appropriate string output. Content of the string output is
|
||||
in line with the calling method's expectations.
|
||||
|
||||
:param object monkeypatch: pytest provided fixture for mocking.
|
||||
"""
|
||||
def shell_bool_wrapper(object, cmd, env=None, cwd=None, timeout=None, root=False):
|
||||
"""Actual monkeypatch implementation of the shell_bool method call.
|
||||
|
||||
:param object object: placeholder object representing ADBDevice
|
||||
:param str cmd: command to be executed
|
||||
:param env: contains the environment variable
|
||||
:type env: dict or None
|
||||
:param cwd: The directory from which to execute.
|
||||
:type cwd: str or None
|
||||
:param timeout: unused parameter tp represent timeout threshold
|
||||
:returns: string - string representation of a simulated call to adb
|
||||
"""
|
||||
print(cmd)
|
||||
return str(cmd)
|
||||
|
||||
monkeypatch.setattr(mozdevice.ADBDevice, 'shell_bool', shell_bool_wrapper)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_adb_object():
|
||||
"""Patches the __init__ method call when instantiating ADBAndroid.
|
||||
|
|
|
@ -3,3 +3,4 @@ subsuite = mozbase, os == "linux"
|
|||
skip-if = python == 3
|
||||
[test_socket_connection.py]
|
||||
[test_is_app_installed.py]
|
||||
[test_chown.py]
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
import logging
|
||||
|
||||
from mock import patch
|
||||
|
||||
import mozunit
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.parametrize('boolean_value', [True, False])
|
||||
def test_set_chown_r_attribute(mock_adb_object, redirect_stdout_and_assert, boolean_value):
|
||||
mock_adb_object._chown_R = boolean_value
|
||||
assert mock_adb_object._chown_R == boolean_value
|
||||
|
||||
|
||||
def test_chown_path_internal(mock_adb_object, redirect_stdout_and_assert):
|
||||
"""Tests whether attempt to chown internal path is ignored"""
|
||||
with patch.object(logging, 'getLogger') as mock_log:
|
||||
mock_adb_object._logger = mock_log
|
||||
|
||||
testing_parameters = {
|
||||
"owner": "someuser",
|
||||
"path": "internal_storage",
|
||||
}
|
||||
expected = 'Ignoring attempt to chown external storage'
|
||||
mock_adb_object.chown(**testing_parameters)
|
||||
assert ''.join(mock_adb_object._logger.method_calls[0][1]) != ''
|
||||
assert ''.join(mock_adb_object._logger.method_calls[0][1]) == expected
|
||||
|
||||
|
||||
def test_chown_one_path(mock_adb_object, redirect_stdout_and_assert):
|
||||
"""Tests the path where only one path is provided."""
|
||||
# set up mock logging and self._chown_R attribute.
|
||||
with patch.object(logging, 'getLogger') as mock_log:
|
||||
mock_adb_object._logger = mock_log
|
||||
mock_adb_object._chown_R = True
|
||||
|
||||
testing_parameters = {
|
||||
"owner": "someuser",
|
||||
"path": "/system",
|
||||
}
|
||||
command = 'chown {owner} {path}'.format(**testing_parameters)
|
||||
testing_parameters['text'] = command
|
||||
redirect_stdout_and_assert(mock_adb_object.chown, **testing_parameters)
|
||||
|
||||
|
||||
def test_chown_one_path_with_group(mock_adb_object, redirect_stdout_and_assert):
|
||||
"""Tests the path where group is provided."""
|
||||
# set up mock logging and self._chown_R attribute.
|
||||
with patch.object(logging, 'getLogger') as mock_log:
|
||||
mock_adb_object._logger = mock_log
|
||||
mock_adb_object._chown_R = True
|
||||
|
||||
testing_parameters = {
|
||||
"owner": "someuser",
|
||||
"path": "/system",
|
||||
"group": "group_2",
|
||||
}
|
||||
command = 'chown {owner}.{group} {path}'.format(**testing_parameters)
|
||||
testing_parameters['text'] = command
|
||||
redirect_stdout_and_assert(mock_adb_object.chown, **testing_parameters)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
mozunit.main()
|
Загрузка…
Ссылка в новой задаче