10x throughput gained on writes by allowing simultaneous write. Implement flush. Also stripped getting modified for now as it is very expensive

Bring changes made by @crwilcox and @dinov in
This commit is contained in:
Christopher Wilcox 2017-08-16 11:51:21 -07:00
Родитель bb90be61c2
Коммит f549bc4845
1 изменённых файлов: 230 добавлений и 79 удалений

Просмотреть файл

@ -1,30 +1,38 @@
#!/usr/bin/env python
from __future__ import print_function, absolute_import, division
#!/usr/bin/env python
from __future__ import absolute_import, division, print_function
import concurrent.futures
import errno
import io
import logging
import logging.handlers
import requests
from dateutil import parser
from sys import argv, exit
import sys
from time import time
import os
import stat
from fuse import FUSE, FuseOSError, Operations, LoggingMixIn, fuse_get_context
import errno
from errno import ENOENT
import io
import traceback
from collections import defaultdict, namedtuple
from time import time
import azure.storage.file as file
from azure.common import AzureMissingResourceHttpError, AzureConflictHttpError, AzureHttpError
import platform
import stat
import sys
import threading
import traceback
import urllib.parse
from collections import defaultdict, deque, namedtuple
from errno import ENOENT
from sys import argv, exit
from time import time
import azure.storage.file as file
import requests
from azure.common import (AzureConflictHttpError, AzureHttpError,
AzureMissingResourceHttpError)
from azure.storage.file import models
from dateutil import parser
from fuse import FUSE, FuseOSError, LoggingMixIn, Operations, fuse_get_context
from requests import Session
executor = concurrent.futures.ThreadPoolExecutor(4)
#import ptvsd
#ptvsd.enable_attach(secret='my_secret')
# The minimum level to log, NOTSET,DEBUG,INFO,WARNING,ERROR,CRITICAL.
LOGGING_LEVEL=logging.INFO
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
@ -39,6 +47,55 @@ if platform.system() is not 'Windows':
syslog_handler.setFormatter(formatter)
logger.addHandler(syslog_handler)
class WriteInfo(object):
'''Used to track writes to a file and coalesce them into a single write into
Azure files. We'll track the destination and whether or not the write has
been processed. We'll then combine sequential writes.'''
def __init__(self, files, directory, filename, offset, data, orig_path):
self.files = files
self.directory = directory
self.filename = filename
self.offset = offset
self.data = data
self.processing = False
self.orig_path = orig_path
def write(self):
try:
with self.files.file_cache[self.orig_path].append_write_lock:
self.processing = True
with self.files.file_cache[self.orig_path].write_lock:
max_size = self.files.file_cache[self.orig_path].max_size
logger.debug('current max size {} is {}'.format(path, max_size))
data_length = len(self.data)
if max_size < self.offset + data_length:
f = self.files._files_service.get_file_properties(self.files._azure_file_share_name,
self.directory, self.filename)
file_length = f.properties.content_length
if file_length < self.offset + data_length:
logger.debug('resizing file {} to {} from {}'.format(path, self.offset + data_length, file_length))
self.files._files_service.resize_file(self.files._azure_file_share_name, self.directory, self.filename, self.offset + data_length)
self.files.file_cache[self.orig_path].max_size = self.offset + data_length
# update the range specified by this write.
logger.debug('updating {} range {} to {}'.format(path, self.offset, self.offset+data_length-1))
self.files._files_service.update_range(self.files._azure_file_share_name, self.directory, self.filename, self.data, start_range=self.offset, end_range=self.offset+data_length-1)
logger.debug('write committed ' + path)
except Exception as e:
logger.warning('error writing ' + str(e))
class FileCache:
'''Tracks information that we've cached locally about an individual file. Currently we track writes and use a couple
of locks to protect that as well as the max file size that we've resized to'''
def __init__(self):
self.write_lock = threading.Lock()
self.append_write_lock = threading.Lock()
self.max_size = 0
self.writes = deque()
self.pending_writes = set()
class AzureFiles(LoggingMixIn, Operations):
'''
@ -51,7 +108,13 @@ class AzureFiles(LoggingMixIn, Operations):
self._azure_storage_account_name = azure_storage_account_name
self._azure_file_share_name = azure_file_share_name
self._sas_token = sas_token
self._files_service = file.FileService(self._azure_storage_account_name, sas_token=self._sas_token)
self._files_service = file.FileService(self._azure_storage_account_name, sas_token=self._sas_token, request_session=Session())
self.writes = deque()
self.dir_cache = {}
self.file_cache = defaultdict(FileCache)
def _get_separated_path(self, path):
path = path.lstrip('/')
@ -78,14 +141,18 @@ class AzureFiles(LoggingMixIn, Operations):
TODO: Mode is not respected at this time. Support could be added
'''
path = path.lstrip('/')
logger.info("create operation begin: path:{!r} mode:{}".format(path, mode))
logger.debug("create operation begin: path:{!r} mode:{}".format(path, mode))
try:
if not path:
raise FuseOSError(errno.EINVAL)
directory, filename = self._get_separated_path(path)
self._files_service.create_file(self._azure_file_share_name, directory, filename, 0)
cached = self._get_cached_dir(directory, False)
if cached is not None:
props = models.FileProperties()
props.content_length = 0
cached[filename] = models.File(filename, None, props)
logger.debug("create operation end: path:{!r} mode:{}".format(path, mode))
return 0;
@ -104,7 +171,7 @@ class AzureFiles(LoggingMixIn, Operations):
st_uid; /* user-id of owner */
st_gid; /* group-id of owner */
'''
logger.info("getattr operation begin: path:{!r} fh:{}".format(path, fh))
logger.debug("getattr operation begin: path:{!r} fh:{}".format(path, fh))
try:
path = path.lstrip('/')
logger.debug('getattr request: {}'.format(path))
@ -120,38 +187,33 @@ class AzureFiles(LoggingMixIn, Operations):
st['st_nlink'] = 2
return st
try:
properties = self._files_service.get_file_properties(
self._azure_file_share_name, directory, filename).properties
item_type = 'file'
except Exception:
properties = self._files_service.get_directory_properties(
self._azure_file_share_name, path).properties
item_type = 'dir'
if item_type == 'dir':
st['st_mode'] = stat.S_IFDIR | 0o777
st['st_nlink'] = 2
elif item_type == 'file':
st['st_mode'] = stat.S_IFREG | 0o777
st['st_nlink'] = 1
st['st_size'] = properties.content_length
else:
directory_listing = self._get_cached_dir(directory)
item = directory_listing.get(filename)
if item is None:
logger.debug("item doesn't exist: path:{!r} fh:{} return:{}".format(path, fh, st))
raise FuseOSError(ENOENT)
# Setting Modified Time
try:
st['st_mtime'] = properties.last_modified.timestamp()
except Exception:
logger.warning(
"getattr operation setting modified time failed: path:{!r} fh:{} st:{}".format(path, fh, st))
if isinstance(item, models.Directory):
st['st_mode'] = stat.S_IFDIR | 0o777
st['st_nlink'] = 2
else:
st['st_mode'] = stat.S_IFREG | 0o777
st['st_nlink'] = 1
st['st_size'] = item.properties.content_length
# Setting Created Time
try:
st['st_ctime'] = properties.last_modified.timestamp()
except Exception:
logger.warning(
"getattr operation setting create time failed: path:{!r} fh:{} st:{}".format(path, fh, st))
# Setting Modified Time
#try:
# st['st_mtime'] = properties.last_modified.timestamp()
#except Exception:
# logger.warning(
# "getattr operation setting modified time failed: path:{!r} fh:{} st:{}".format(path, fh, st))
#
## Setting Created Time
#try:
# st['st_ctime'] = properties.last_modified.timestamp()
#except Exception:
# logger.warning(
# "getattr operation setting create time failed: path:{!r} fh:{} st:{}".format(path, fh, st))
logger.debug("getattr operation end: path:{!r} fh:{} return:{}".format(path, fh, st))
return st
@ -167,10 +229,16 @@ class AzureFiles(LoggingMixIn, Operations):
TODO: Mode is not respected at this time. Support could be added
'''
path = path.lstrip('/')
logger.info("mkdir operation begin: path:{!r} mode:{}".format(path, mode))
logger.debug("mkdir operation begin: path:{!r} mode:{}".format(path, mode))
try:
self._files_service.create_directory(
self._azure_file_share_name, path, fail_on_exist=True)
directory, filename = self._get_separated_path(path)
cached = self._get_cached_dir(directory, False)
if cached is not None:
cached[filename] = models.Directory(filename)
logger.debug("mkdir operation: {} {}".format(filename, cached))
logger.debug("mkdir operation end: path:{!r} mode:{}".format(path, mode))
return 0
except Exception as e:
@ -182,13 +250,13 @@ class AzureFiles(LoggingMixIn, Operations):
'''
read a file and return a buffer containing that area of the file
'''
logger.info("read operation begin: path:{!r} size:{} offset:{} fh:{}".format(path, size, offset, fh))
logger.debug("read operation begin: path:{!r} size:{} offset:{} fh:{}".format(path, size, offset, fh))
try:
dir_path, file_path = self._get_separated_path(path)
data_to_return = self._files_service.get_file_to_bytes(
self._azure_file_share_name, dir_path, file_path, offset, offset + size - 1).content
#logger.info('read the following: "{}"'.format(data_to_return))
logger.debug('read the following: "{}"'.format(data_to_return))
logger.debug(
"read operation end: path:{!r} size:{} offset:{} fh:{} data-to-return-length:{}".format(
path, size, offset, fh, len(data_to_return)))
@ -199,19 +267,34 @@ class AzureFiles(LoggingMixIn, Operations):
path, size, offset, fh, e))
raise e
def _get_cached_dir(self, path, force = True):
cached = self.dir_cache.get(path)
if (cached is None or cached[1] + 5 < time()) and force:
directory_listing = { item.name:item for item in
self._files_service.list_directories_and_files(self._azure_file_share_name, path)
}
self.dir_cache[path] = directory_listing, time()
return directory_listing
return cached[0]
def _clear_dir_cache(self, path, reason):
try:
del self.dir_cache[path]
except KeyError:
pass
def readdir(self, path, fh):
'''
returns a directory listing for this directory
'''
path = path.lstrip('/')
logger.info("readdir operation begin: path:{!r} fh:{}".format(path, fh))
logger.debug("readdir operation begin: path:{!r} fh:{}".format(path, fh))
try:
directory_listing = self._files_service.list_directories_and_files(
self._azure_file_share_name, path)
directory_listing = self._get_cached_dir(path)
readdir_return = ['.', '..']
readdir_return.extend([i.name for i in directory_listing])
readdir_return.extend(directory_listing.keys())
logger.debug(
"readdir operation end: path:{!r} fh:{} return:{}".format(path, fh, readdir_return))
return readdir_return
@ -225,8 +308,9 @@ class AzureFiles(LoggingMixIn, Operations):
Rename a file or directory.
TODO: Currently this implementation does not support renaming directories. Support needed.
"""
logger.info("rename operation begin: old:{} new:{}".format(old, new))
logger.debug("rename operation begin: old:{} new:{}".format(old, new))
try:
old_orig_path = old
old_path = old.strip('/')
new_path = new.strip('/')
@ -234,7 +318,32 @@ class AzureFiles(LoggingMixIn, Operations):
# file exists at path. Would cause name collision
raise FuseOSError(errno.EALREADY)
self._rename(old_path, new_path, self._discover_item_type(old_path))
with self.file_cache[old_orig_path].write_lock:
new_length = self._rename(old_path, new_path, self._discover_item_type(old_path))
self.file_cache[old_orig_path].max_size = 0
if new_length is None:
self._clear_dir_cache(self._get_separated_path(old_path)[0],'rename old')
self._clear_dir_cache(self._get_separated_path(new_path)[0],'rename new')
else:
directory, filename = self._get_separated_path(old_path)
cached = self._get_cached_dir(directory, False)
if cached is not None:
try:
del cached[filename]
except KeyError:
pass
directory, filename = self._get_separated_path(new_path)
cached = self._get_cached_dir(directory, False)
if cached is not None:
try:
if new_length is None:
cached[filename] = models.Directory(filename)
else:
props = models.FileProperties()
props.content_length = new_length
cached[filename] = models.File(filename, None, props)
except KeyError:
pass
logger.debug("rename operation end: old:{} new:{}".format(old, new))
return 0
except Exception as e:
@ -268,6 +377,8 @@ class AzureFiles(LoggingMixIn, Operations):
self._files_service.create_file_from_bytes(self._azure_file_share_name, new_path_dir, new_path_file, file_contents)
self._files_service.delete_file(self._azure_file_share_name, old_path_dir, old_path_file)
return len(file_contents)
else:
raise ValueError("item_type must be 'file' or 'directory'")
@ -275,12 +386,19 @@ class AzureFiles(LoggingMixIn, Operations):
'''
removes a directory at specified path
'''
logger.info("rmdir operation begin: path:{!r}".format(path))
logger.debug("rmdir operation begin: path:{!r}".format(path))
try:
path = path.strip('/')
try:
self._files_service.delete_directory(self._azure_file_share_name, path)
directory, filename = self._get_separated_path(path)
cached = self._get_cached_dir(directory, False)
if cached is not None:
try:
del cached[filename]
except KeyError:
pass
except AzureConflictHttpError as error:
logger.debug("rmdir operation: path:{!r} directory not empty")
raise FuseOSError(errno.ENOTEMPTY)
@ -307,11 +425,22 @@ class AzureFiles(LoggingMixIn, Operations):
'''
Delete file.
'''
logger.info("unlink operation begin: path:{!r}".format(path))
logger.debug("unlink operation begin: path:{!r}".format(path))
self.flush(path)
try:
orig_path = path
path = path.strip('/')
directory, filename = self._get_separated_path(path)
self._files_service.delete_file(self._azure_file_share_name, directory, filename)
with self.file_cache[orig_path].write_lock:
self._files_service.delete_file(self._azure_file_share_name, directory, filename)
logger.debug('unlink resetting to 0 {}'.format(orig_path))
self.file_cache[orig_path].max_size = 0
cached = self._get_cached_dir(directory, False)
if cached is not None:
try:
del cached[filename]
except KeyError:
pass
logger.debug("unlink operation end: path:{!r}".format(path))
return 0
except Exception as e:
@ -322,25 +451,39 @@ class AzureFiles(LoggingMixIn, Operations):
'''
write
'''
logger.info("write operation begin: path:{!r} len(data):{} offset:{} fh:{}".format(path, len(data), offset, fh))
logger.debug("write operation begin: path:{!r} len(data):{} offset:{} fh:{}".format(path, len(data), offset, fh))
try:
orig_path = path
path = path.lstrip('/')
directory, filename = self._get_separated_path(path)
f = self._files_service.get_file_properties(self._azure_file_share_name, directory, filename)
file_length = f.properties.content_length
if offset < 0 or offset > file_length:
if offset < 0:
logger.debug("write operation offset negative or exceeds file length: path:{!r} len(data):{} offset:{} fh:{}".format(path, len(data), offset, fh))
raise FuseOSError(errno.EINVAL)
# write the data at the range adding old data to the front and back of it.
data_length = len(data)
# update range fails if the file isn't as long as the range
if file_length < offset + data_length:
self._files_service.resize_file(self._azure_file_share_name, directory, filename, offset + data_length)
# update the range specified by this write.
self._files_service.update_range(self._azure_file_share_name, directory, filename, data, start_range=offset, end_range=offset+len(data)-1)
# Take the write lock to see if we can coalesce
with self.file_cache[orig_path].append_write_lock:
found = False
if self.file_cache[orig_path].writes:
last = self.file_cache[orig_path].writes[-1]
if (not last.processing and
(last.offset + len(last.data)) == offset and
len(last.data) + len(data) < file.FileService.MAX_RANGE_SIZE):
# let's piggy back on this write...
last.data += data
found = True
if not found:
wi = WriteInfo(self, directory, filename, offset, data, orig_path)
self.file_cache[orig_path].writes.append(wi)
future = executor.submit(wi.write)
self.file_cache[orig_path].pending_writes.add(future)
def done(future):
self.file_cache[orig_path].pending_writes.remove(future)
future.add_done_callback(done)
# TODO: if we ever try to cache attrs, we would have to update the st_mtime.
logger.debug("write operation end: path:{!r} len(data):{} offset:{} fh:{} return-data-length:{}".format(path, len(data), offset, fh, data_length))
@ -353,23 +496,32 @@ class AzureFiles(LoggingMixIn, Operations):
logger.exception("write operation AzureHTTPError: path:{!r} len(data):{} offset:{} fh:{} exception:{}".format(path, len(data), offset, fh, ahe))
raise ahe
except Exception as e:
logger.exception("write operation exception: path:{!r} len(data):{} offset:{} fh:{} exception:{}".format(path, len(data), offset, fh, e))
logger.debug("write operation exception: path:{!r} len(data):{} offset:{} fh:{} exception:{}".format(path, len(data), offset, fh, e))
raise e
def flush(self, path, fh = None):
w = concurrent.futures.wait(self.file_cache[path].pending_writes)
def release(self, path, fh):
self.file_cache[path].max_size = 0
def truncate(self, path, length, fh=None):
'''
Truncate or extend the given file so that it is precisely size bytes long.
See truncate(2) for details. This call is required for read/write filesystems,
because recreating a file will first truncate it.
'''
logger.info("truncate operation begin: path:{!r} length:{} fh:{}".format(path, length, fh))
logger.debug("truncate operation begin: path:{!r} length:{} fh:{}".format(path, length, fh))
# length must be positive
if length < 0:
raise FuseOSError(errno.EINVAL)
try:
orig_path = path
path = path.lstrip('/')
directory, filename = self._get_separated_path(path)
self._files_service.resize_file(self._azure_file_share_name, directory, filename, length)
with self.file_cache[orig_path].write_lock:
self._files_service.resize_file(self._azure_file_share_name, directory, filename, length)
self.file_cache[orig_path].max_size = length
except Exception as e:
logger.exception("truncate operation exception: path:{!r} length:{} fh:{} e:{}".format(path, length, fh, e))
raise e
@ -382,7 +534,7 @@ class AzureFiles(LoggingMixIn, Operations):
chmod. This command is a NOP right now.
If it is missing this is interpreted as a read-only file system though.
'''
logger.info("chmod operation: path:{!r} mode:{}".format(path, mode))
logger.debug("chmod operation: path:{!r} mode:{}".format(path, mode))
return
def chown(self, path, uid, gid):
@ -390,7 +542,7 @@ class AzureFiles(LoggingMixIn, Operations):
chown. This command is a NOP right now.
If it is missing this is interpreted as a read-only file system though.
'''
logger.info("chown operation: path:{!r} uid:{} gid:{}".format(path, uid, gid))
logger.debug("chown operation: path:{!r} uid:{} gid:{}".format(path, uid, gid))
return
if __name__ == '__main__':
@ -422,4 +574,3 @@ if __name__ == '__main__':
except Exception as e:
logger.error("Python Fuse Top-Level Exception: {}".format(e))
logger.error("Python Fuse Top-Level Trace Exception: {}".format(traceback.format_exc()))