Merge pull request #86 from Azure/security_fix

temp_dir access fix + pytest
This commit is contained in:
iwang2 2022-09-21 12:22:28 -04:00 коммит произвёл GitHub
Родитель 43baf65f84 d92ab5c729
Коммит f3064d34ec
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 67 добавлений и 6 удалений

Просмотреть файл

@ -7,6 +7,7 @@ from __future__ import unicode_literals
import logging
from datetime import datetime
import shutil
import threading
import os
import sys
@ -14,7 +15,6 @@ import glob
import pkgutil
import inspect
import importlib
import tempfile
import re
from Queue import Queue
@ -57,11 +57,15 @@ class AzureBatchAssets(object):
self._assets = None
self._tab_index = index
self._upload_threads = None
self._temp_dir = utils.create_temp_dir()
self.batch = None
self.modules = self._collect_modules()
self.ui = AssetsUI(self, frame)
self.frame = frame
def __del__(self):
shutil.rmtree(self._temp_dir)
def _callback_refresh(self, *args):
"""Called by Maya when a new scene file is loaded, so we reset
@ -160,7 +164,7 @@ class AzureBatchAssets(object):
:param str os_flavor: The chosen operating system of the render nodes, used
to determine the formatting of the path remapping.
"""
proj_file = os.path.join(tempfile.gettempdir(), "workspace.mel")
proj_file = os.path.join(self._temp_dir, "workspace.mel")
with open(proj_file, 'w') as handle:
for rule in maya.workspace(fileRuleList=True):
project_dir = maya.workspace(fileRuleEntry=rule)
@ -185,7 +189,7 @@ class AzureBatchAssets(object):
:param str os_flavor: The chosen operating system of the render nodes, used
to determine the formatting of the path remapping.
"""
map_file = os.path.join(tempfile.gettempdir(), "asset_map.mel")
map_file = os.path.join(self._temp_dir, "asset_map.mel")
pathmap = dict(self._assets.pathmaps)
for asset in self._assets.refs:
pathmap.update(asset.pathmap)

Просмотреть файл

@ -8,6 +8,7 @@ import os
import logging
import platform
import subprocess
import tempfile
from azurebatchmayaapi import MayaAPI as maya
from exception import CancellationException, FileUploadException
@ -145,6 +146,13 @@ def build_template_filename(render_engine, maya_version, operating_system, conta
return template_file
def create_temp_dir():
"""Create a temporary private local directory.
Should be deleted manually.
"""
return tempfile.mkdtemp()
class OperatingSystem(Enum):
windows = 'Windows'
linux = 'Linux'

Просмотреть файл

@ -19,6 +19,7 @@ from azurebatchmayaapi import MayaAPI as maya
from ui_jobhistory import JobHistoryUI
import azure.batch as batch
import azurebatchutils as utils
class AzureBatchJobHistory(object):
@ -86,9 +87,9 @@ class AzureBatchJobHistory(object):
maya.refresh()
return
try:
temp_dir = os.path.join(tempfile.gettempdir(), job.id)
if not os.path.isdir(temp_dir):
os.mkdir(temp_dir)
temp_dir = os.path.join(utils.create_temp_dir(), job.id)
# if not os.path.isdir(temp_dir):
# os.mkdir(temp_dir)
except Exception as exp:
self._log.warning(exp)
self.selected_job.set_thumbnail(thumb, self._get_image_height(thumb))

Просмотреть файл

@ -9,6 +9,17 @@ import logging
import json
from Queue import Queue
# win32-specific imports
try:
import pywintypes
import win32con
import win32net
import win32netcon
import win32security
import winerror
except ImportError:
pass
if sys.version_info >= (3, 3):
import unittest2 as unittest
from unittest.mock import MagicMock
@ -529,6 +540,43 @@ class TestAzureBatchAssets(unittest.TestCase):
self.mock_self._assets = mock.create_autospec(Assets)
AzureBatchAssets.upload(self.mock_self)
@mock.patch.object(AzureBatchAssets, "_collect_modules")
@mock.patch("assets.callback")
@mock.patch("assets.AssetsUI")
def test_temp_dir_access(self, mock_ui, mock_call, mock_collect):
if sys.platform != "win32":
return
aba = AzureBatchAssets(0, "frame", "call")
# create temporary user
username = "tempuser"
password = "Password123"
user_info = {
"name": username,
"password": password,
"priv": win32netcon.USER_PRIV_USER,
"flags": 0
}
try:
win32net.NetUserAdd(None, 1, user_info)
except pywintypes.error as e:
if e.winerror == winerror.ERROR_ACCESS_DENIED:
raise Exception("test must be run as admin to create temp user")
else:
raise e
# make temp user admin
sid, domain, at = win32security.LookupAccountName(None, username)
win32net.NetLocalGroupAddMembers(None, "Administrators", 0, [{"sid": sid}])
# impersonate temporary user
handle = win32security.LogonUser(username, None, password, win32con.LOGON32_LOGON_INTERACTIVE, win32con.LOGON32_PROVIDER_DEFAULT)
win32security.ImpersonateLoggedOnUser(handle)
# try to access the temp directory
accessed = os.access(aba._temp_dir, os.W_OK)
# revert impersonation and delete temp user
win32security.RevertToSelf()
handle.close()
win32net.NetUserDel(None, username)
self.assertFalse(accessed, "_temp_dir should be inaccessible for other users")
if __name__ == '__main__':
unittest.main()