From f549bc4845a6c89a6e3d23e4096087afcc7f1e34 Mon Sep 17 00:00:00 2001 From: Christopher Wilcox Date: Wed, 16 Aug 2017 11:51:21 -0700 Subject: [PATCH] 10x throughput gained on writes by allowing simultaneous write. Implement flush. Also stripped getting modified for now as it is very expensive Bring changes made by @crwilcox and @dinov in --- azfilesfuse.py | 309 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 230 insertions(+), 79 deletions(-) diff --git a/azfilesfuse.py b/azfilesfuse.py index 4f1fa17..d72b75c 100644 --- a/azfilesfuse.py +++ b/azfilesfuse.py @@ -1,30 +1,38 @@ -#!/usr/bin/env python -from __future__ import print_function, absolute_import, division +#!/usr/bin/env python +from __future__ import absolute_import, division, print_function +import concurrent.futures +import errno +import io import logging import logging.handlers -import requests -from dateutil import parser -from sys import argv, exit -import sys -from time import time import os -import stat -from fuse import FUSE, FuseOSError, Operations, LoggingMixIn, fuse_get_context -import errno -from errno import ENOENT -import io -import traceback -from collections import defaultdict, namedtuple -from time import time -import azure.storage.file as file -from azure.common import AzureMissingResourceHttpError, AzureConflictHttpError, AzureHttpError - import platform +import stat +import sys +import threading +import traceback import urllib.parse +from collections import defaultdict, deque, namedtuple +from errno import ENOENT +from sys import argv, exit +from time import time + +import azure.storage.file as file +import requests +from azure.common import (AzureConflictHttpError, AzureHttpError, + AzureMissingResourceHttpError) +from azure.storage.file import models +from dateutil import parser +from fuse import FUSE, FuseOSError, LoggingMixIn, Operations, fuse_get_context +from requests import Session + +executor = concurrent.futures.ThreadPoolExecutor(4) + #import ptvsd #ptvsd.enable_attach(secret='my_secret') +# The minimum level to log, NOTSET,DEBUG,INFO,WARNING,ERROR,CRITICAL. LOGGING_LEVEL=logging.INFO console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) @@ -39,6 +47,55 @@ if platform.system() is not 'Windows': syslog_handler.setFormatter(formatter) logger.addHandler(syslog_handler) +class WriteInfo(object): + '''Used to track writes to a file and coalesce them into a single write into + Azure files. We'll track the destination and whether or not the write has + been processed. We'll then combine sequential writes.''' + def __init__(self, files, directory, filename, offset, data, orig_path): + self.files = files + self.directory = directory + self.filename = filename + self.offset = offset + self.data = data + self.processing = False + self.orig_path = orig_path + + def write(self): + try: + with self.files.file_cache[self.orig_path].append_write_lock: + self.processing = True + with self.files.file_cache[self.orig_path].write_lock: + max_size = self.files.file_cache[self.orig_path].max_size + logger.debug('current max size {} is {}'.format(path, max_size)) + data_length = len(self.data) + if max_size < self.offset + data_length: + f = self.files._files_service.get_file_properties(self.files._azure_file_share_name, + self.directory, self.filename) + file_length = f.properties.content_length + + if file_length < self.offset + data_length: + logger.debug('resizing file {} to {} from {}'.format(path, self.offset + data_length, file_length)) + self.files._files_service.resize_file(self.files._azure_file_share_name, self.directory, self.filename, self.offset + data_length) + self.files.file_cache[self.orig_path].max_size = self.offset + data_length + + # update the range specified by this write. + logger.debug('updating {} range {} to {}'.format(path, self.offset, self.offset+data_length-1)) + self.files._files_service.update_range(self.files._azure_file_share_name, self.directory, self.filename, self.data, start_range=self.offset, end_range=self.offset+data_length-1) + + logger.debug('write committed ' + path) + except Exception as e: + logger.warning('error writing ' + str(e)) + +class FileCache: + '''Tracks information that we've cached locally about an individual file. Currently we track writes and use a couple + of locks to protect that as well as the max file size that we've resized to''' + def __init__(self): + self.write_lock = threading.Lock() + self.append_write_lock = threading.Lock() + self.max_size = 0 + self.writes = deque() + self.pending_writes = set() + class AzureFiles(LoggingMixIn, Operations): ''' @@ -51,7 +108,13 @@ class AzureFiles(LoggingMixIn, Operations): self._azure_storage_account_name = azure_storage_account_name self._azure_file_share_name = azure_file_share_name self._sas_token = sas_token - self._files_service = file.FileService(self._azure_storage_account_name, sas_token=self._sas_token) + self._files_service = file.FileService(self._azure_storage_account_name, sas_token=self._sas_token, request_session=Session()) + + self.writes = deque() + + self.dir_cache = {} + + self.file_cache = defaultdict(FileCache) def _get_separated_path(self, path): path = path.lstrip('/') @@ -78,14 +141,18 @@ class AzureFiles(LoggingMixIn, Operations): TODO: Mode is not respected at this time. Support could be added ''' path = path.lstrip('/') - logger.info("create operation begin: path:{!r} mode:{}".format(path, mode)) + logger.debug("create operation begin: path:{!r} mode:{}".format(path, mode)) try: if not path: raise FuseOSError(errno.EINVAL) directory, filename = self._get_separated_path(path) self._files_service.create_file(self._azure_file_share_name, directory, filename, 0) - + cached = self._get_cached_dir(directory, False) + if cached is not None: + props = models.FileProperties() + props.content_length = 0 + cached[filename] = models.File(filename, None, props) logger.debug("create operation end: path:{!r} mode:{}".format(path, mode)) return 0; @@ -104,7 +171,7 @@ class AzureFiles(LoggingMixIn, Operations): st_uid; /* user-id of owner */ st_gid; /* group-id of owner */ ''' - logger.info("getattr operation begin: path:{!r} fh:{}".format(path, fh)) + logger.debug("getattr operation begin: path:{!r} fh:{}".format(path, fh)) try: path = path.lstrip('/') logger.debug('getattr request: {}'.format(path)) @@ -120,38 +187,33 @@ class AzureFiles(LoggingMixIn, Operations): st['st_nlink'] = 2 return st - try: - properties = self._files_service.get_file_properties( - self._azure_file_share_name, directory, filename).properties - item_type = 'file' - except Exception: - properties = self._files_service.get_directory_properties( - self._azure_file_share_name, path).properties - item_type = 'dir' - - if item_type == 'dir': - st['st_mode'] = stat.S_IFDIR | 0o777 - st['st_nlink'] = 2 - elif item_type == 'file': - st['st_mode'] = stat.S_IFREG | 0o777 - st['st_nlink'] = 1 - st['st_size'] = properties.content_length - else: + directory_listing = self._get_cached_dir(directory) + item = directory_listing.get(filename) + if item is None: + logger.debug("item doesn't exist: path:{!r} fh:{} return:{}".format(path, fh, st)) raise FuseOSError(ENOENT) - # Setting Modified Time - try: - st['st_mtime'] = properties.last_modified.timestamp() - except Exception: - logger.warning( - "getattr operation setting modified time failed: path:{!r} fh:{} st:{}".format(path, fh, st)) + if isinstance(item, models.Directory): + st['st_mode'] = stat.S_IFDIR | 0o777 + st['st_nlink'] = 2 + else: + st['st_mode'] = stat.S_IFREG | 0o777 + st['st_nlink'] = 1 + st['st_size'] = item.properties.content_length - # Setting Created Time - try: - st['st_ctime'] = properties.last_modified.timestamp() - except Exception: - logger.warning( - "getattr operation setting create time failed: path:{!r} fh:{} st:{}".format(path, fh, st)) + # Setting Modified Time + #try: + # st['st_mtime'] = properties.last_modified.timestamp() + #except Exception: + # logger.warning( + # "getattr operation setting modified time failed: path:{!r} fh:{} st:{}".format(path, fh, st)) + # + ## Setting Created Time + #try: + # st['st_ctime'] = properties.last_modified.timestamp() + #except Exception: + # logger.warning( + # "getattr operation setting create time failed: path:{!r} fh:{} st:{}".format(path, fh, st)) logger.debug("getattr operation end: path:{!r} fh:{} return:{}".format(path, fh, st)) return st @@ -167,10 +229,16 @@ class AzureFiles(LoggingMixIn, Operations): TODO: Mode is not respected at this time. Support could be added ''' path = path.lstrip('/') - logger.info("mkdir operation begin: path:{!r} mode:{}".format(path, mode)) + logger.debug("mkdir operation begin: path:{!r} mode:{}".format(path, mode)) try: self._files_service.create_directory( self._azure_file_share_name, path, fail_on_exist=True) + directory, filename = self._get_separated_path(path) + cached = self._get_cached_dir(directory, False) + if cached is not None: + cached[filename] = models.Directory(filename) + logger.debug("mkdir operation: {} {}".format(filename, cached)) + logger.debug("mkdir operation end: path:{!r} mode:{}".format(path, mode)) return 0 except Exception as e: @@ -182,13 +250,13 @@ class AzureFiles(LoggingMixIn, Operations): ''' read a file and return a buffer containing that area of the file ''' - logger.info("read operation begin: path:{!r} size:{} offset:{} fh:{}".format(path, size, offset, fh)) + logger.debug("read operation begin: path:{!r} size:{} offset:{} fh:{}".format(path, size, offset, fh)) try: dir_path, file_path = self._get_separated_path(path) data_to_return = self._files_service.get_file_to_bytes( self._azure_file_share_name, dir_path, file_path, offset, offset + size - 1).content - #logger.info('read the following: "{}"'.format(data_to_return)) + logger.debug('read the following: "{}"'.format(data_to_return)) logger.debug( "read operation end: path:{!r} size:{} offset:{} fh:{} data-to-return-length:{}".format( path, size, offset, fh, len(data_to_return))) @@ -199,19 +267,34 @@ class AzureFiles(LoggingMixIn, Operations): path, size, offset, fh, e)) raise e + def _get_cached_dir(self, path, force = True): + cached = self.dir_cache.get(path) + if (cached is None or cached[1] + 5 < time()) and force: + directory_listing = { item.name:item for item in + self._files_service.list_directories_and_files(self._azure_file_share_name, path) + } + self.dir_cache[path] = directory_listing, time() + return directory_listing + return cached[0] + + def _clear_dir_cache(self, path, reason): + try: + del self.dir_cache[path] + except KeyError: + pass + def readdir(self, path, fh): ''' returns a directory listing for this directory ''' path = path.lstrip('/') - logger.info("readdir operation begin: path:{!r} fh:{}".format(path, fh)) + logger.debug("readdir operation begin: path:{!r} fh:{}".format(path, fh)) try: - directory_listing = self._files_service.list_directories_and_files( - self._azure_file_share_name, path) + directory_listing = self._get_cached_dir(path) readdir_return = ['.', '..'] - readdir_return.extend([i.name for i in directory_listing]) + readdir_return.extend(directory_listing.keys()) logger.debug( "readdir operation end: path:{!r} fh:{} return:{}".format(path, fh, readdir_return)) return readdir_return @@ -225,8 +308,9 @@ class AzureFiles(LoggingMixIn, Operations): Rename a file or directory. TODO: Currently this implementation does not support renaming directories. Support needed. """ - logger.info("rename operation begin: old:{} new:{}".format(old, new)) + logger.debug("rename operation begin: old:{} new:{}".format(old, new)) try: + old_orig_path = old old_path = old.strip('/') new_path = new.strip('/') @@ -234,7 +318,32 @@ class AzureFiles(LoggingMixIn, Operations): # file exists at path. Would cause name collision raise FuseOSError(errno.EALREADY) - self._rename(old_path, new_path, self._discover_item_type(old_path)) + with self.file_cache[old_orig_path].write_lock: + new_length = self._rename(old_path, new_path, self._discover_item_type(old_path)) + self.file_cache[old_orig_path].max_size = 0 + if new_length is None: + self._clear_dir_cache(self._get_separated_path(old_path)[0],'rename old') + self._clear_dir_cache(self._get_separated_path(new_path)[0],'rename new') + else: + directory, filename = self._get_separated_path(old_path) + cached = self._get_cached_dir(directory, False) + if cached is not None: + try: + del cached[filename] + except KeyError: + pass + directory, filename = self._get_separated_path(new_path) + cached = self._get_cached_dir(directory, False) + if cached is not None: + try: + if new_length is None: + cached[filename] = models.Directory(filename) + else: + props = models.FileProperties() + props.content_length = new_length + cached[filename] = models.File(filename, None, props) + except KeyError: + pass logger.debug("rename operation end: old:{} new:{}".format(old, new)) return 0 except Exception as e: @@ -268,6 +377,8 @@ class AzureFiles(LoggingMixIn, Operations): self._files_service.create_file_from_bytes(self._azure_file_share_name, new_path_dir, new_path_file, file_contents) self._files_service.delete_file(self._azure_file_share_name, old_path_dir, old_path_file) + + return len(file_contents) else: raise ValueError("item_type must be 'file' or 'directory'") @@ -275,12 +386,19 @@ class AzureFiles(LoggingMixIn, Operations): ''' removes a directory at specified path ''' - logger.info("rmdir operation begin: path:{!r}".format(path)) + logger.debug("rmdir operation begin: path:{!r}".format(path)) try: path = path.strip('/') try: self._files_service.delete_directory(self._azure_file_share_name, path) + directory, filename = self._get_separated_path(path) + cached = self._get_cached_dir(directory, False) + if cached is not None: + try: + del cached[filename] + except KeyError: + pass except AzureConflictHttpError as error: logger.debug("rmdir operation: path:{!r} directory not empty") raise FuseOSError(errno.ENOTEMPTY) @@ -307,11 +425,22 @@ class AzureFiles(LoggingMixIn, Operations): ''' Delete file. ''' - logger.info("unlink operation begin: path:{!r}".format(path)) + logger.debug("unlink operation begin: path:{!r}".format(path)) + self.flush(path) try: + orig_path = path path = path.strip('/') directory, filename = self._get_separated_path(path) - self._files_service.delete_file(self._azure_file_share_name, directory, filename) + with self.file_cache[orig_path].write_lock: + self._files_service.delete_file(self._azure_file_share_name, directory, filename) + logger.debug('unlink resetting to 0 {}'.format(orig_path)) + self.file_cache[orig_path].max_size = 0 + cached = self._get_cached_dir(directory, False) + if cached is not None: + try: + del cached[filename] + except KeyError: + pass logger.debug("unlink operation end: path:{!r}".format(path)) return 0 except Exception as e: @@ -322,25 +451,39 @@ class AzureFiles(LoggingMixIn, Operations): ''' write ''' - logger.info("write operation begin: path:{!r} len(data):{} offset:{} fh:{}".format(path, len(data), offset, fh)) + logger.debug("write operation begin: path:{!r} len(data):{} offset:{} fh:{}".format(path, len(data), offset, fh)) try: + orig_path = path path = path.lstrip('/') directory, filename = self._get_separated_path(path) - f = self._files_service.get_file_properties(self._azure_file_share_name, directory, filename) - file_length = f.properties.content_length - if offset < 0 or offset > file_length: + if offset < 0: logger.debug("write operation offset negative or exceeds file length: path:{!r} len(data):{} offset:{} fh:{}".format(path, len(data), offset, fh)) raise FuseOSError(errno.EINVAL) # write the data at the range adding old data to the front and back of it. data_length = len(data) + - # update range fails if the file isn't as long as the range - if file_length < offset + data_length: - self._files_service.resize_file(self._azure_file_share_name, directory, filename, offset + data_length) - - # update the range specified by this write. - self._files_service.update_range(self._azure_file_share_name, directory, filename, data, start_range=offset, end_range=offset+len(data)-1) + # Take the write lock to see if we can coalesce + with self.file_cache[orig_path].append_write_lock: + found = False + if self.file_cache[orig_path].writes: + last = self.file_cache[orig_path].writes[-1] + if (not last.processing and + (last.offset + len(last.data)) == offset and + len(last.data) + len(data) < file.FileService.MAX_RANGE_SIZE): + # let's piggy back on this write... + last.data += data + found = True + if not found: + wi = WriteInfo(self, directory, filename, offset, data, orig_path) + self.file_cache[orig_path].writes.append(wi) + future = executor.submit(wi.write) + self.file_cache[orig_path].pending_writes.add(future) + def done(future): + self.file_cache[orig_path].pending_writes.remove(future) + future.add_done_callback(done) + # TODO: if we ever try to cache attrs, we would have to update the st_mtime. logger.debug("write operation end: path:{!r} len(data):{} offset:{} fh:{} return-data-length:{}".format(path, len(data), offset, fh, data_length)) @@ -353,23 +496,32 @@ class AzureFiles(LoggingMixIn, Operations): logger.exception("write operation AzureHTTPError: path:{!r} len(data):{} offset:{} fh:{} exception:{}".format(path, len(data), offset, fh, ahe)) raise ahe except Exception as e: - logger.exception("write operation exception: path:{!r} len(data):{} offset:{} fh:{} exception:{}".format(path, len(data), offset, fh, e)) + logger.debug("write operation exception: path:{!r} len(data):{} offset:{} fh:{} exception:{}".format(path, len(data), offset, fh, e)) raise e + def flush(self, path, fh = None): + w = concurrent.futures.wait(self.file_cache[path].pending_writes) + + def release(self, path, fh): + self.file_cache[path].max_size = 0 + def truncate(self, path, length, fh=None): ''' Truncate or extend the given file so that it is precisely size bytes long. See truncate(2) for details. This call is required for read/write filesystems, because recreating a file will first truncate it. ''' - logger.info("truncate operation begin: path:{!r} length:{} fh:{}".format(path, length, fh)) + logger.debug("truncate operation begin: path:{!r} length:{} fh:{}".format(path, length, fh)) # length must be positive if length < 0: raise FuseOSError(errno.EINVAL) try: + orig_path = path path = path.lstrip('/') directory, filename = self._get_separated_path(path) - self._files_service.resize_file(self._azure_file_share_name, directory, filename, length) + with self.file_cache[orig_path].write_lock: + self._files_service.resize_file(self._azure_file_share_name, directory, filename, length) + self.file_cache[orig_path].max_size = length except Exception as e: logger.exception("truncate operation exception: path:{!r} length:{} fh:{} e:{}".format(path, length, fh, e)) raise e @@ -382,7 +534,7 @@ class AzureFiles(LoggingMixIn, Operations): chmod. This command is a NOP right now. If it is missing this is interpreted as a read-only file system though. ''' - logger.info("chmod operation: path:{!r} mode:{}".format(path, mode)) + logger.debug("chmod operation: path:{!r} mode:{}".format(path, mode)) return def chown(self, path, uid, gid): @@ -390,7 +542,7 @@ class AzureFiles(LoggingMixIn, Operations): chown. This command is a NOP right now. If it is missing this is interpreted as a read-only file system though. ''' - logger.info("chown operation: path:{!r} uid:{} gid:{}".format(path, uid, gid)) + logger.debug("chown operation: path:{!r} uid:{} gid:{}".format(path, uid, gid)) return if __name__ == '__main__': @@ -422,4 +574,3 @@ if __name__ == '__main__': except Exception as e: logger.error("Python Fuse Top-Level Exception: {}".format(e)) logger.error("Python Fuse Top-Level Trace Exception: {}".format(traceback.format_exc())) -