Attribute Cache and better Error return when one is not given (#298)

* Better error return when not curl code returns ok

* Adding in attr-cache feature
This commit is contained in:
Amanda Nguyen 2019-08-07 10:52:57 -07:00 коммит произвёл GitHub
Родитель 44d0deaf2a
Коммит 0c42315f5d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
14 изменённых файлов: 1708 добавлений и 17 удалений

Просмотреть файл

@ -103,6 +103,7 @@ set(AZURE_STORAGE_SOURCE
azure-storage-cpp-lite/src/blob/blob_client.cpp
azure-storage-cpp-lite/src/blob/blob_client_wrapper.cpp
azure-storage-cpp-lite/src/blob/blob_client_attr_cache_wrapper.cpp
)
set (BLOBFUSE_HEADER
@ -123,7 +124,7 @@ if(UNIX)
find_package(Threads REQUIRED)
find_package(CURL REQUIRED)
find_package(GnuTLS REQUIRED)
add_definitions(-std=c++11)
set(CMAKE_CXX_STANDARD 14)
add_definitions(-D_FILE_OFFSET_BITS=64)
set(WARNING "-Wall -Wextra -Werror -pedantic -pedantic-errors")
set(CMAKE_CXX_FLAGS "${CMAKE_THREAD_LIBS_INIT} ${WARNING} ${CMAKE_CXX_FLAGS}")
@ -155,7 +156,7 @@ if(UNIX)
set(CPACK_GENERATOR "DEB")
set(CPACK_DEBIAN_PACKAGE_MAINTAINER "Microsoft - Azure Storage")
set(CPACK_DEBIAN_PACKAGE_DESCRIPTION "blobfuse 1.0.3 - FUSE adapter for Azure Blob Storage")
set(CPACK_DEBIAN_PACKAGE_DESCRIPTION "blobfuse 1.1.0 - FUSE adapter for Azure Blob Storage")
include(CPack)
endif(UNIX)
@ -200,8 +201,9 @@ if(INCLUDE_TESTS)
project(blobfusetests)
set(CMAKE_CXX_STANDARD 14)
pkg_search_module(UUID REQUIRED uuid)
add_executable(blobfusetests ${BLOBFUSE_HEADER} ${BLOBFUSE_SOURCE} ${AZURE_STORAGE_HEADER} ${AZURE_STORAGE_SOURCE} blobfuse/blobfuse.cpp test/cpplitetests.cpp)
target_link_libraries(blobfusetests ${CURL_LIBRARIES} ${GNUTLS_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} fuse gcrypt gtest_main)
add_executable(blobfusetests ${BLOBFUSE_HEADER} ${BLOBFUSE_SOURCE} ${AZURE_STORAGE_HEADER} ${AZURE_STORAGE_SOURCE} blobfuse/blobfuse.cpp test/cpplitetests.cpp test/attribcachetests.cpp test/attribcachesynchronizationtests.cpp)
target_link_libraries(blobfusetests ${CURL_LIBRARIES} ${GNUTLS_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} fuse gcrypt gmock_main)
endif()

Просмотреть файл

@ -3,6 +3,8 @@
#include <iostream>
#include <memory>
#include <string>
#include <mutex>
#include <shared_mutex>
#include <syslog.h>
#include "storage_EXPORTS.h"
@ -255,11 +257,116 @@ namespace microsoft_azure { namespace storage {
std::shared_ptr<executor_context> m_context;
};
/// <summary>
/// Abstract layer of the blob_client class for the attribute cache layer,
/// Provides a client-side logical representation of blob storage service on Windows Azure.
//// This client is used to configure and execute requests against the service with caching the attributes in mind.
/// </summary>
/// <remarks>The service client encapsulates the base URI for the service. If the service client will be used for authenticated access, it also encapsulates the credentials for accessing the storage account.</remarks>
class sync_blob_client
{
public:
virtual ~sync_blob_client() = 0;
virtual bool is_valid() const = 0;
/// <summary>
/// List blobs in segments.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="delimiter">The delimiter used to designate the virtual directories.</param>
/// <param name="continuation_token">A continuation token returned by a previous listing operation.</param>
/// <param name="prefix">The blob name prefix.</param>
/// <param name="maxresults">Maximum amount of results to receive</param>
/// <returns>A response from list_blobs_hierarchical that contains a list of blobs and their details</returns>
virtual list_blobs_hierarchical_response list_blobs_hierarchical(const std::string &container, const std::string &delimiter, const std::string &continuation_token, const std::string &prefix, int maxresults = 10000) = 0;
/// <summary>
/// Uploads the contents of a blob from a local file, file size need to be equal or smaller than 64MB.
/// </summary>
/// <param name="sourcePath">The source file path.</param>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <param name="metadata">A <see cref="std::vector"> that respresents metadatas.</param>
virtual void put_blob(const std::string &sourcePath, const std::string &container, const std::string blob, const std::vector<std::pair<std::string, std::string>> &metadata = std::vector<std::pair<std::string, std::string>>()) = 0;
/// <summary>
/// Uploads the contents of a blob from a stream.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <param name="is">The source stream.</param>
/// <param name="metadata">A <see cref="std::vector"> that respresents metadatas.</param>
virtual void upload_block_blob_from_stream(const std::string &container, const std::string blob, std::istream &is, const std::vector<std::pair<std::string, std::string>> &metadata = std::vector<std::pair<std::string, std::string>>()) = 0;
/// <summary>
/// Uploads the contents of a blob from a local file.
/// </summary>
/// <param name="sourcePath">The source file path.</param>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <param name="metadata">A <see cref="std::vector"> that respresents metadatas.</param>
/// <param name="parallel">A size_t value indicates the maximum parallelism can be used in this request.</param>
virtual void upload_file_to_blob(const std::string &sourcePath, const std::string &container, const std::string blob, const std::vector<std::pair<std::string, std::string>> &metadata = std::vector<std::pair<std::string, std::string>>(), size_t parallel = 8) = 0;
/// <summary>
/// Downloads the contents of a blob to a stream.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <param name="offset">The offset at which to begin downloading the blob, in bytes.</param>
/// <param name="size">The size of the data to download from the blob, in bytes.</param>
/// <param name="os">The target stream.</param>
virtual void download_blob_to_stream(const std::string &container, const std::string &blob, unsigned long long offset, unsigned long long size, std::ostream &os) = 0;
/// <summary>
/// Downloads the contents of a blob to a local file.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <param name="offset">The offset at which to begin downloading the blob, in bytes.</param>
/// <param name="size">The size of the data to download from the blob, in bytes.</param>
/// <param name="destPath">The target file path.</param>
/// <param name="parallel">A size_t value indicates the maximum parallelism can be used in this request.</param>
virtual void download_blob_to_file(const std::string &container, const std::string &blob, const std::string &destPath, time_t &returned_last_modified, size_t parallel = 9) = 0;
/// <summary>
/// Gets the property of a blob.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
virtual blob_property get_blob_property(const std::string &container, const std::string &blob) = 0;
/// <summary>
/// Examines the existance of a blob.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <returns>Return true if the blob does exist, otherwise, return false.</returns>
virtual bool blob_exists(const std::string &container, const std::string &blob) = 0;
/// <summary>
/// Deletes a blob.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
virtual void delete_blob(const std::string &container, const std::string &blob) = 0;
/// <summary>
/// Copy a blob to another.
/// </summary>
/// <param name="sourceContainer">The source container name.</param>
/// <param name="sourceBlob">The source blob name.</param>
/// <param name="destContainer">The destination container name.</param>
/// <param name="destBlob">The destination blob name.</param>
virtual void start_copy(const std::string &sourceContainer, const std::string &sourceBlob, const std::string &destContainer, const std::string &destBlob) = 0;
};
/// <summary>
/// Provides a wrapper for client-side logical representation of blob storage service on Windows Azure. This wrappered client is used to configure and execute requests against the service.
/// </summary>
/// <remarks>This wrappered client could limit a concurrency per client objects. And it will not throw exceptions, instead, it will set errno to return error codes.</remarks>
class blob_client_wrapper
class blob_client_wrapper : public sync_blob_client
{
public:
/// <summary>
@ -427,6 +534,7 @@ namespace microsoft_azure { namespace storage {
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <returns> A <see cref="blob_property"/> object that represents the proerty of a particular blob
blob_property get_blob_property(const std::string &container, const std::string &blob);
/// <summary>
@ -461,4 +569,223 @@ namespace microsoft_azure { namespace storage {
bool m_valid;
};
// A wrapper around the "blob_client_wrapper" that provides in-memory caching for "get_blob_properties" calls.
class blob_client_attr_cache_wrapper : public sync_blob_client
{
public:
/// <summary>
/// Constructs a blob client wrapper from a blob client instance.
/// </summary>
/// <param name="blobClient">A <see cref="microsoft_azure::storage::blob_client"> object stored in shared_ptr.</param>
explicit blob_client_attr_cache_wrapper(std::shared_ptr<sync_blob_client> blob_client_wrapper)
: m_blob_client_wrapper(blob_client_wrapper), attr_cache()
{
}
/// <summary>
/// Constructs a blob client wrapper from another blob client wrapper instance.
/// </summary>
/// <param name="other">A <see cref="microsoft_azure::storage::blob_client_attr_cache_wrapper"> object.</param>
blob_client_attr_cache_wrapper(blob_client_attr_cache_wrapper &&other)
{
m_blob_client_wrapper = other.m_blob_client_wrapper;
}
blob_client_attr_cache_wrapper& operator=(blob_client_attr_cache_wrapper&& other)
{
m_blob_client_wrapper = other.m_blob_client_wrapper;
return *this;
}
bool is_valid() const
{
return m_blob_client_wrapper != NULL;
}
// Represents a blob on the service
class blob_cache_item
{
public:
blob_cache_item(std::string name, blob_property props) : m_confirmed(false), m_mutex(), m_name(name), m_props(props)
{
}
// True if this item should accurately represent a blob on the service.
// False if not (or unknown). Marking an item as not confirmed is invalidating the cache.
bool m_confirmed;
// A mutex that can be locked in shared or unique mode (reader/writer lock)
// TODO: Consider switching this to be a regular mutex
std::shared_timed_mutex m_mutex;
// Name of the blob
std::string m_name;
// The (cached) properties of the blob
blob_property m_props;
};
// A thread-safe cache of the properties of the blobs in a container on the service.
// In order to access or update a single cache item, you must lock on the mutex in the relevant blob_cache_item, and also on the mutex representing the parent directory.
// This is due to the single cache item being linked to the directory
// The directory mutex must always be locked before the blob mutex, and no thread should ever have more than one blob mutex (or directory) held at once - this will prevent deadlocks.
// For example, to access the properties of a blob "dir1/dir2/blobname", you need to access and lock the mutex returned by get_dir_item("dir1/dir2"), and then the mutex in the blob_cache_item
// returned by get_blob_item("dir1/dir2/blobname").
//
// To read the properties of the blob from the cache, lock both mutexes in shared mode.
// To update the properties of a single blob (or to invalidate a cache item), grab the directory mutex in shared mode, and the blob mutex in unique mode. The mutexes must be held during both the
// relevant service call and the following cache update.
// For a 'list blobs' request, first grab the mutex for the directory in unique mode. Then, make the request and parse the response. For each blob in the response, grab the blob mutex for that item in unique mode
// before updating it. Don't release the directory mutex until all blobs have been updated.
//
// TODO: Currently, the maps holding the cached information grow without bound; this should be fixed.
// TODO: Implement a cache timeout
// TODO: When we no longer use an internal copy of cpplite, the attrib cache code should stay with blobfuse - it's not really applicable in the general cpplite use case.
class attribute_cache
{
public:
attribute_cache() : blob_cache(), blobs_mutex(), dir_cache(), dirs_mutex()
{
}
std::shared_ptr<std::shared_timed_mutex> get_dir_item(const std::string& path);
std::shared_ptr<blob_cache_item> get_blob_item(const std::string& path);
private:
std::map<std::string, std::shared_ptr<blob_cache_item>> blob_cache;
std::mutex blobs_mutex; // Used to protect the blob_cache map itself, not items in the map.
std::map<std::string, std::shared_ptr<std::shared_timed_mutex>> dir_cache;
std::mutex dirs_mutex;// Used to protect the dir_cache map itself, not items in the map.
};
/// <summary>
/// Constructs a blob client wrapper from storage account credential.
/// </summary>
/// <param name="account_name">The storage account name.</param>
/// <param name="account_key">The storage account key.</param>
/// <param name="sas_token">A sas token for the container.</param>
/// <param name="concurrency">The maximum number requests could be executed in the same time.</param>
/// <returns>Return a <see cref="microsoft_azure::storage::blob_client_wrapper"> object.</returns>
static blob_client_attr_cache_wrapper blob_client_attr_cache_wrapper_init(const std::string &account_name, const std::string &account_key, const std::string &sas_token, const unsigned int concurrency);
/// <summary>
/// Constructs a blob client wrapper from storage account credential.
/// </summary>
/// <param name="account_name">The storage account name.</param>
/// <param name="account_key">The storage account key.</param>
/// <param name="sas_token">A sas token for the container.</param>
/// <param name="concurrency">The maximum number requests could be executed in the same time.</param>
/// <param name="use_https">True if https should be used (instead of HTTP). Note that this may cause a sizable perf loss, due to issues in libcurl.</param>
/// <param name="blob_endpoint">Blob endpoint URI to allow non-public clouds as well as custom domains.</param>
/// <returns>Return a <see cref="microsoft_azure::storage::blob_client_wrapper"> object.</returns>
static blob_client_attr_cache_wrapper blob_client_attr_cache_wrapper_init(const std::string &account_name, const std::string &account_key, const std::string &sas_token, const unsigned int concurrency, bool use_https,
const std::string &blob_endpoint);
/// <summary>
/// List blobs in segments.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="delimiter">The delimiter used to designate the virtual directories.</param>
/// <param name="continuation_token">A continuation token returned by a previous listing operation.</param>
/// <param name="prefix">The blob name prefix.</param>
/// <param name="maxresults">Maximum amount of results to receive</param>
/// <returns>A response from list_blobs_hierarchical that contains a list of blobs and their details</returns>
list_blobs_hierarchical_response list_blobs_hierarchical(const std::string &container, const std::string &delimiter, const std::string &continuation_token, const std::string &prefix, int maxresults = 10000);
/// <summary>
/// Uploads the contents of a blob from a local file, file size need to be equal or smaller than 64MB.
/// </summary>
/// <param name="sourcePath">The source file path.</param>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <param name="metadata">A <see cref="std::vector"> that respresents metadatas.</param>
void put_blob(const std::string &sourcePath, const std::string &container, const std::string blob, const std::vector<std::pair<std::string, std::string>> &metadata = std::vector<std::pair<std::string, std::string>>());
/// <summary>
/// Uploads the contents of a blob from a stream.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <param name="is">The source stream.</param>
/// <param name="metadata">A <see cref="std::vector"> that respresents metadatas.</param>
void upload_block_blob_from_stream(const std::string &container, const std::string blob, std::istream &is, const std::vector<std::pair<std::string, std::string>> &metadata = std::vector<std::pair<std::string, std::string>>());
/// <summary>
/// Uploads the contents of a blob from a local file.
/// </summary>
/// <param name="sourcePath">The source file path.</param>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <param name="metadata">A <see cref="std::vector"> that respresents metadatas.</param>
/// <param name="parallel">A size_t value indicates the maximum parallelism can be used in this request.</param>
void upload_file_to_blob(const std::string &sourcePath, const std::string &container, const std::string blob, const std::vector<std::pair<std::string, std::string>> &metadata = std::vector<std::pair<std::string, std::string>>(), size_t parallel = 8);
/// <summary>
/// Downloads the contents of a blob to a stream.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <param name="offset">The offset at which to begin downloading the blob, in bytes.</param>
/// <param name="size">The size of the data to download from the blob, in bytes.</param>
/// <param name="os">The target stream.</param>
void download_blob_to_stream(const std::string &container, const std::string &blob, unsigned long long offset, unsigned long long size, std::ostream &os);
/// <summary>
/// Downloads the contents of a blob to a local file.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <param name="offset">The offset at which to begin downloading the blob, in bytes.</param>
/// <param name="size">The size of the data to download from the blob, in bytes.</param>
/// <param name="destPath">The target file path.</param>
/// <param name="parallel">A size_t value indicates the maximum parallelism can be used in this request.</param>
/// <returns>A <see cref="storage_outcome" /> object that represents the properties (etag, last modified time and size) from the first chunk retrieved.</returns>
void download_blob_to_file(const std::string &container, const std::string &blob, const std::string &destPath, time_t &returned_last_modified, size_t parallel = 8);
/// <summary>
/// Gets the property of a blob.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <returns> A <see cref="blob_property"/> object that represents the proerty of a particular blob
blob_property get_blob_property(const std::string &container, const std::string &blob);
/// <summary>
/// Gets the property of a blob.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <returns> A <see cref="blob_property"/> object that represents the proerty of a particular blob
blob_property get_blob_property(const std::string &container, const std::string &blob, bool assume_cache_invalid);
/// <summary>
/// Examines the existance of a blob.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <returns>Return true if the blob does exist, otherwise, return false.</returns>
bool blob_exists(const std::string &container, const std::string &blob);
/// <summary>
/// Deletes a blob.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
void delete_blob(const std::string &container, const std::string &blob);
/// <summary>
/// Copy a blob to another.
/// </summary>
/// <param name="sourceContainer">The source container name.</param>
/// <param name="sourceBlob">The source blob name.</param>
/// <param name="destContainer">The destination container name.</param>
/// <param name="destBlob">The destination blob name.</param>
void start_copy(const std::string &sourceContainer, const std::string &sourceBlob, const std::string &destContainer, const std::string &destBlob);
private:
std::shared_ptr<sync_blob_client> m_blob_client_wrapper;
attribute_cache attr_cache;
};
} } // microsoft_azure::storage

19
azure-storage-cpp-lite/include/executor.h Normal file → Executable file
Просмотреть файл

@ -16,6 +16,8 @@
#include "retry.h"
#include "utility.h"
#define HTTP_CODE_SERVICE_UNAVAILABLE 503 //Service unavailable
namespace microsoft_azure {
namespace storage {
@ -51,7 +53,7 @@ namespace microsoft_azure {
std::string str(std::istreambuf_iterator<char>(s.istream()), std::istreambuf_iterator<char>());
if (code != CURLE_OK || unsuccessful(result)) {
promise.set_value(storage_outcome<RESPONSE_TYPE>(context.xml_parser()->parse_storage_error(str)));
retry.add_result(code == CURLE_OK ? result : 503);
retry.add_result(code == CURLE_OK ? result : HTTP_CODE_SERVICE_UNAVAILABLE);
h.reset_input_stream();
h.reset_output_stream();
async_executor<RESPONSE_TYPE>::submit_request(promise, a, r, h, context, retry);
@ -74,7 +76,6 @@ namespace microsoft_azure {
{
http->set_error_stream([](http_base::http_code) { return true; }, storage_iostream::create_storage_stream());
request->build_request(*account, *http);
retry_info info = context->retry_policy()->evaluate(*retry);
if (info.should_retry())
{
@ -84,10 +85,12 @@ namespace microsoft_azure {
if (code != CURLE_OK || unsuccessful(result))
{
auto error = context->xml_parser()->parse_storage_error(str);
error.code = std::to_string(result);
//to ensure the most helpful error code is returned, if the curl code returns ok
//return the http error code
error.code = std::to_string(code == CURLE_OK ? result : code);
*outcome = storage_outcome<RESPONSE_TYPE>(error);
//*outcome = storage_outcome<RESPONSE_TYPE>(context->xml_parser()->parse_storage_error(str));
retry->add_result(code == CURLE_OK ? result: 503);
retry->add_result(code == CURLE_OK ? result: HTTP_CODE_SERVICE_UNAVAILABLE);
http->reset_input_stream();
http->reset_output_stream();
async_executor<RESPONSE_TYPE>::submit_helper(promise, outcome, account, request, http, context, retry);
@ -132,7 +135,7 @@ namespace microsoft_azure {
std::string str(std::istreambuf_iterator<char>(s.istream()), std::istreambuf_iterator<char>());
if (code != CURLE_OK || unsuccessful(result)) {
promise.set_value(storage_outcome<void>(context.xml_parser()->parse_storage_error(str)));
retry.add_result(code == CURLE_OK ? result : 503);
retry.add_result(code == CURLE_OK ? result : HTTP_CODE_SERVICE_UNAVAILABLE);
h.reset_input_stream();
h.reset_output_stream();
async_executor<void>::submit_request(promise, a, r, h, context, retry);
@ -166,10 +169,12 @@ namespace microsoft_azure {
if (code != CURLE_OK || unsuccessful(result))
{
auto error = context->xml_parser()->parse_storage_error(str);
error.code = std::to_string(result);
//to ensure the most helpful error code is returned, if the curl code returns ok
//return the http error code
error.code = std::to_string(code == CURLE_OK ? result : code);
*outcome = storage_outcome<void>(error);
//*outcome = storage_outcome<void>(context->xml_parser()->parse_storage_error(str));
retry->add_result(code == CURLE_OK ? result: 503);
retry->add_result(code == CURLE_OK ? result: HTTP_CODE_SERVICE_UNAVAILABLE);
http->reset_input_stream();
http->reset_output_stream();
async_executor<void>::submit_helper(promise, outcome, account, request, http, context, retry);

Просмотреть файл

@ -87,6 +87,7 @@ public:
lease_status status;
lease_state state;
lease_duration duration;
std::string copy_status;
std::vector<std::pair<std::string, std::string>> metadata;
bool is_directory;
};

Просмотреть файл

@ -205,7 +205,7 @@ std::future<storage_outcome<list_blobs_hierarchical_response>> blob_client::list
auto request = std::make_shared<list_blobs_hierarchical_request>(container, delimiter, continuation_token, prefix);
request->set_maxresults(max_results);
request->set_includes(list_blobs_request_base::include::metadata);
request->set_includes(static_cast<list_blobs_request_base::include>(list_blobs_request_base::include::metadata | list_blobs_request_base::include::copy));
return async_executor<list_blobs_hierarchical_response>::submit(m_account, request, http, m_context);
}

Просмотреть файл

@ -0,0 +1,301 @@
#include "blob/blob_client.h"
namespace microsoft_azure {
namespace storage {
// Helper to the the string representing the parent directory of a given item.
std::string get_parent_str(std::string object)
{
size_t last_slash_idx = object.rfind('/');
if (std::string::npos != last_slash_idx)
{
return object.substr(0, last_slash_idx);
}
return std::string();
}
// Performs a thread-safe map lookup of the input key in the directory map.
// Will create new entries if necessary before returning.
std::shared_ptr<std::shared_timed_mutex> blob_client_attr_cache_wrapper::attribute_cache::get_dir_item(const std::string& path)
{
std::lock_guard<std::mutex> lock(dirs_mutex);
auto iter = dir_cache.find(path);
if(iter == dir_cache.end())
{
auto dir_item = std::make_shared<std::shared_timed_mutex>();
dir_cache[path] = dir_item;
return dir_item;
}
else
{
return iter->second;
}
}
// Performs a thread-safe map lookup of the input key in the blob map.
// Will create new entries if necessary before returning.
std::shared_ptr<blob_client_attr_cache_wrapper::blob_cache_item> blob_client_attr_cache_wrapper::attribute_cache::get_blob_item(const std::string& path)
{
std::lock_guard<std::mutex> lock(blobs_mutex);
auto iter = blob_cache.find(path);
if(iter == blob_cache.end())
{
auto blob_item = std::make_shared<blob_client_attr_cache_wrapper::blob_cache_item>("", blob_property(false));
blob_cache[path] = blob_item;
return blob_item;
}
else
{
return iter->second;
}
}
/// <summary>
/// List blobs in segments.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="delimiter">The delimiter used to designate the virtual directories.</param>
/// <param name="continuation_token">A continuation token returned by a previous listing operation.</param>
/// <param name="prefix">The blob name prefix.</param>
/// <param name="maxresults">Maximum amount of results to receive</param>
/// <returns>A response from list_blobs_hierarchical that contains a list of blobs and their details</returns>
list_blobs_hierarchical_response blob_client_attr_cache_wrapper::list_blobs_hierarchical(const std::string &container, const std::string &delimiter, const std::string &continuation_token, const std::string &prefix, int maxresults)
{
std::shared_ptr<std::shared_timed_mutex> dir_mutex = attr_cache.get_dir_item(prefix);
std::unique_lock<std::shared_timed_mutex> uniquelock(*dir_mutex);
errno = 0;
list_blobs_hierarchical_response response = m_blob_client_wrapper->list_blobs_hierarchical(container, delimiter, continuation_token, prefix, maxresults);
if (errno == 0)
{
for (size_t i = 0; i < response.blobs.size(); i++)
{
if (!response.blobs[i].is_directory)
{
// TODO - modify list_blobs to return blob_property items; simplifying this logic.
blob_property properties(true);
properties.cache_control = response.blobs[i].cache_control;
// properties.content_disposition = response.blobs[i].content_disposition; // TODO - once this is available in cpplite.
properties.content_encoding = response.blobs[i].content_encoding;
properties.content_language = response.blobs[i].content_language;
properties.size = response.blobs[i].content_length;
properties.content_md5 = response.blobs[i].content_md5;
properties.content_type = response.blobs[i].content_type;
properties.etag = response.blobs[i].etag;
properties.metadata = response.blobs[i].metadata;
properties.copy_status = response.blobs[i].copy_status;
properties.last_modified = curl_getdate(response.blobs[i].last_modified.c_str(), NULL);
// Note that this internally locks the mutex protecting the attr_cache blob list. Normally this is fine, but here it's a bit concerning, because we've already
// taken a lock on the directory string.
// It should be fine, there should be no chance of deadlock, as the internal mutex is released before get_blob_item returns, but we should take care when modifying.
std::shared_ptr<blob_client_attr_cache_wrapper::blob_cache_item> cache_item = attr_cache.get_blob_item(response.blobs[i].name);
std::unique_lock<std::shared_timed_mutex> uniquelock(cache_item->m_mutex);
cache_item->m_props = properties;
cache_item->m_confirmed = true;
}
}
}
return response;
}
blob_client_attr_cache_wrapper blob_client_attr_cache_wrapper::blob_client_attr_cache_wrapper_init(const std::string &account_name, const std::string &account_key, const std::string &sas_token, const unsigned int concurrency)
{
return blob_client_attr_cache_wrapper_init(account_name, account_key, sas_token, concurrency, false, NULL);
}
blob_client_attr_cache_wrapper blob_client_attr_cache_wrapper::blob_client_attr_cache_wrapper_init(const std::string &account_name, const std::string &account_key, const std::string &sas_token, const unsigned int concurrency, bool use_https, const std::string &blob_endpoint)
{
std::shared_ptr<blob_client_wrapper> wrapper = std::make_shared<blob_client_wrapper>(blob_client_wrapper::blob_client_wrapper_init(account_name, account_key, sas_token, concurrency, use_https, blob_endpoint));
return blob_client_attr_cache_wrapper(wrapper);
}
/// <summary>
/// Uploads the contents of a blob from a local file, file size need to be equal or smaller than 64MB.
/// </summary>
/// <param name="sourcePath">The source file path.</param>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <param name="metadata">A <see cref="std::vector"> that respresents metadatas.</param>
void blob_client_attr_cache_wrapper::put_blob(const std::string &sourcePath, const std::string &container, const std::string blob, const std::vector<std::pair<std::string, std::string>> &metadata)
{
// Invalidate the cache.
// TODO: consider updating the cache with the new values. Will require modifying cpplite to return info from put_blob.
std::shared_ptr<std::shared_timed_mutex> dir_mutex = attr_cache.get_dir_item(get_parent_str(blob));
std::shared_ptr<blob_client_attr_cache_wrapper::blob_cache_item> cache_item = attr_cache.get_blob_item(blob);
std::shared_lock<std::shared_timed_mutex> dirlock(*dir_mutex);
std::unique_lock<std::shared_timed_mutex> uniquelock(cache_item->m_mutex);
m_blob_client_wrapper->put_blob(sourcePath, container, blob, metadata);
cache_item->m_confirmed = false;
}
/// <summary>
/// Uploads the contents of a blob from a stream.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <param name="is">The source stream.</param>
/// <param name="metadata">A <see cref="std::vector"> that respresents metadatas.</param>
void blob_client_attr_cache_wrapper::upload_block_blob_from_stream(const std::string &container, const std::string blob, std::istream &is, const std::vector<std::pair<std::string, std::string>> &metadata)
{
// Invalidate the cache.
// TODO: consider updating the cache with the new values. Will require modifying cpplite to return info from put_blob.
std::shared_ptr<std::shared_timed_mutex> dir_mutex = attr_cache.get_dir_item(get_parent_str(blob));
std::shared_ptr<blob_client_attr_cache_wrapper::blob_cache_item> cache_item = attr_cache.get_blob_item(blob);
std::shared_lock<std::shared_timed_mutex> dirlock(*dir_mutex);
std::unique_lock<std::shared_timed_mutex> uniquelock(cache_item->m_mutex);
m_blob_client_wrapper->upload_block_blob_from_stream(container, blob, is, metadata);
cache_item->m_confirmed = false;
}
/// <summary>
/// Uploads the contents of a blob from a local file.
/// </summary>
/// <param name="sourcePath">The source file path.</param>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <param name="metadata">A <see cref="std::vector"> that respresents metadatas.</param>
/// <param name="parallel">A size_t value indicates the maximum parallelism can be used in this request.</param>
void blob_client_attr_cache_wrapper::upload_file_to_blob(const std::string &sourcePath, const std::string &container, const std::string blob, const std::vector<std::pair<std::string, std::string>> &metadata, size_t parallel)
{
// Invalidate the cache.
// TODO: consider updating the cache with the new values. Will require modifying cpplite to return info from put_blob.
std::shared_ptr<std::shared_timed_mutex> dir_mutex = attr_cache.get_dir_item(get_parent_str(blob));
std::shared_ptr<blob_client_attr_cache_wrapper::blob_cache_item> cache_item = attr_cache.get_blob_item(blob);
std::shared_lock<std::shared_timed_mutex> dirlock(*dir_mutex);
std::unique_lock<std::shared_timed_mutex> uniquelock(cache_item->m_mutex);
m_blob_client_wrapper->upload_file_to_blob(sourcePath, container, blob, metadata, parallel);
cache_item->m_confirmed = false;
}
/// <summary>
/// Downloads the contents of a blob to a stream.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <param name="offset">The offset at which to begin downloading the blob, in bytes.</param>
/// <param name="size">The size of the data to download from the blob, in bytes.</param>
/// <param name="os">The target stream.</param>
void blob_client_attr_cache_wrapper::download_blob_to_stream(const std::string &container, const std::string &blob, unsigned long long offset, unsigned long long size, std::ostream &os)
{
// TODO: lock & update the attribute cache with the headers from the get call(s), once download_blob_to_* is modified to return them.
m_blob_client_wrapper->download_blob_to_stream(container, blob, offset, size, os);
}
/// <summary>
/// Downloads the contents of a blob to a local file.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <param name="offset">The offset at which to begin downloading the blob, in bytes.</param>
/// <param name="size">The size of the data to download from the blob, in bytes.</param>
/// <param name="destPath">The target file path.</param>
/// <param name="parallel">A size_t value indicates the maximum parallelism can be used in this request.</param>
/// <returns>A <see cref="storage_outcome" /> object that represents the properties (etag, last modified time and size) from the first chunk retrieved.</returns>
void blob_client_attr_cache_wrapper::download_blob_to_file(const std::string &container, const std::string &blob, const std::string &destPath, time_t &returned_last_modified, size_t parallel)
{
// TODO: lock & update the attribute cache with the headers from the get call(s), once download_blob_to_* is modified to return them.
m_blob_client_wrapper->download_blob_to_file(container, blob, destPath, returned_last_modified, parallel);
}
/// <summary>
/// Gets the property of a blob.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
blob_property blob_client_attr_cache_wrapper::get_blob_property(const std::string &container, const std::string &blob)
{
return get_blob_property(container, blob, false);
}
/// <summary>
/// Gets the property of a blob.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <param name="assume_cache_invalid">True if the blob's properties should be fetched from the service, even if the cache item seels valid.
/// Useful if there is reason to suspect the properties may have changed behind the scenes (specifically, if there's a pending copy operation.)</param>
blob_property blob_client_attr_cache_wrapper::get_blob_property(const std::string &container, const std::string &blob, bool assume_cache_invalid)
{
std::shared_ptr<std::shared_timed_mutex> dir_mutex = attr_cache.get_dir_item(get_parent_str(blob));
std::shared_ptr<blob_client_attr_cache_wrapper::blob_cache_item> cache_item = attr_cache.get_blob_item(blob);
std::shared_lock<std::shared_timed_mutex> dirlock(*dir_mutex);
if (!assume_cache_invalid)
{
std::shared_lock<std::shared_timed_mutex> sharedlock(cache_item->m_mutex);
if (cache_item->m_confirmed)
{
return cache_item->m_props;
}
}
{
std::unique_lock<std::shared_timed_mutex> uniquelock(cache_item->m_mutex);
errno = 0;
cache_item->m_props = m_blob_client_wrapper->get_blob_property(container, blob);
if (errno != 0)
{
return blob_property(false); // keep errno unchanged
}
cache_item->m_confirmed = true;
return cache_item->m_props;
}
}
/// <summary>
/// Examines the existance of a blob.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
/// <returns>Return true if the blob does exist, otherwise, return false.</returns>
bool blob_client_attr_cache_wrapper::blob_exists(const std::string &container, const std::string &blob)
{
blob_property props = get_blob_property(container, blob); // go through the cache
if(props.valid())
{
errno = 0;
return true;
}
return false;
}
/// <summary>
/// Deletes a blob.
/// </summary>
/// <param name="container">The container name.</param>
/// <param name="blob">The blob name.</param>
void blob_client_attr_cache_wrapper::delete_blob(const std::string &container, const std::string &blob)
{
// These calls cannot be cached because we do not have a negative cache - blobs in the cache are either valid/confirmed, or unknown (which could be deleted, or not checked on the service.)
std::shared_ptr<std::shared_timed_mutex> dir_mutex = attr_cache.get_dir_item(get_parent_str(blob));
std::shared_ptr<blob_client_attr_cache_wrapper::blob_cache_item> cache_item = attr_cache.get_blob_item(blob);
std::shared_lock<std::shared_timed_mutex> dirlock(*dir_mutex);
std::unique_lock<std::shared_timed_mutex> uniquelock(cache_item->m_mutex);
m_blob_client_wrapper->delete_blob(container, blob);
cache_item->m_confirmed = false;
}
/// <summary>
/// Copy a blob to another.
/// </summary>
/// <param name="sourceContainer">The source container name.</param>
/// <param name="sourceBlob">The source blob name.</param>
/// <param name="destContainer">The destination container name.</param>
/// <param name="destBlob">The destination blob name.</param>
void blob_client_attr_cache_wrapper::start_copy(const std::string &sourceContainer, const std::string &sourceBlob, const std::string &destContainer, const std::string &destBlob)
{
// No need to lock on the source, as we're neither modifying nor querying the source blob or its cached content.
// We do need to lock on the destination, because if the start copy operation succeeds we need to invalidate the cached data.
std::shared_ptr<std::shared_timed_mutex> dir_mutex = attr_cache.get_dir_item(get_parent_str(destBlob));
std::shared_ptr<blob_client_attr_cache_wrapper::blob_cache_item> cache_item = attr_cache.get_blob_item(destBlob);
std::shared_lock<std::shared_timed_mutex> dirlock(*dir_mutex);
std::unique_lock<std::shared_timed_mutex> uniquelock(cache_item->m_mutex);
errno = 0;
m_blob_client_wrapper->start_copy(sourceContainer, sourceBlob, destContainer, destBlob);
cache_item->m_confirmed = false;
}
}}

Просмотреть файл

@ -106,6 +106,8 @@ namespace microsoft_azure {
return result;
}
sync_blob_client::~sync_blob_client() {}
blob_client_wrapper blob_client_wrapper::blob_client_wrapper_init(const std::string &account_name, const std::string &account_key, const std::string &sas_token, const unsigned int concurrency)
{
return blob_client_wrapper_init(account_name, account_key, sas_token, concurrency, false, NULL);

Просмотреть файл

@ -162,6 +162,7 @@ list_blobs_hierarchical_item tinyxml2_parser::parse_list_blobs_hierarchical_item
item.content_language = parse_text(xproperty, "Content-Language");
item.content_type = parse_text(xproperty, "Content-Type");
item.content_md5 = parse_text(xproperty, "Content-MD5");
item.copy_status = parse_text(xproperty, "CopyStatus");
item.content_length = parse_long(xproperty, "Content-Length");
item.status = parse_lease_status(parse_text(xproperty, "LeaseStatus"));
item.state = parse_lease_state(parse_text(xproperty, "LeaseState"));

Просмотреть файл

@ -21,6 +21,7 @@ struct options
const char *file_cache_timeout_in_seconds; // Timeout for the file cache (defaults to 120 seconds)
const char *container_name; //container to mount. Used only if config_file is not provided
const char *log_level; // Sets the level at which the process should log to syslog.
const char *use_attr_cache; // True if the cache for blob attributes should be used.
const char *version; // print blobfuse version
const char *help; // print blobfuse usage
};
@ -39,6 +40,7 @@ const struct fuse_opt option_spec[] =
OPTION("--file-cache-timeout-in-seconds=%s", file_cache_timeout_in_seconds),
OPTION("--container-name=%s", container_name),
OPTION("--log-level=%s", log_level),
OPTION("--use-attr-cache=%s", use_attr_cache),
OPTION("--version", version),
OPTION("-v", version),
OPTION("--help", help),
@ -46,7 +48,7 @@ const struct fuse_opt option_spec[] =
FUSE_OPT_END
};
std::shared_ptr<blob_client_wrapper> azure_blob_client_wrapper;
std::shared_ptr<sync_blob_client> azure_blob_client_wrapper;
class gc_cache gc_cache;
// Currently, the cpp lite lib puts the HTTP status code in errno.
@ -189,8 +191,17 @@ int read_config(const std::string configFile)
void *azs_init(struct fuse_conn_info * conn)
{
azure_blob_client_wrapper = std::make_shared<blob_client_wrapper>(blob_client_wrapper::blob_client_wrapper_init(str_options.accountName, str_options.accountKey, str_options.sasToken, 20/*concurrency*/, str_options.use_https,
if (str_options.use_attr_cache)
{
azure_blob_client_wrapper = std::make_shared<blob_client_attr_cache_wrapper>(blob_client_attr_cache_wrapper::blob_client_attr_cache_wrapper_init(str_options.accountName, str_options.accountKey, str_options.sasToken, 20/*concurrency*/, str_options.use_https,
str_options.blobEndpoint));
}
else
{
azure_blob_client_wrapper = std::make_shared<blob_client_wrapper>(blob_client_wrapper::blob_client_wrapper_init(str_options.accountName, str_options.accountKey, str_options.sasToken, 20/*concurrency*/, str_options.use_https,
str_options.blobEndpoint));
}
if(errno != 0)
{
syslog(LOG_CRIT, "azs_init - Unable to start blobfuse. Creating blob client failed: errno = %d.\n", errno);
@ -219,7 +230,7 @@ void *azs_init(struct fuse_conn_info * conn)
void print_usage()
{
fprintf(stdout, "Usage: blobfuse <mount-folder> --tmp-path=</path/to/fusecache> [--config-file=</path/to/config.cfg> | --container-name=<containername>]");
fprintf(stdout, " [--use-https=true] [--file-cache-timeout-in-seconds=120] [--log-level=LOG_OFF|LOG_CRIT|LOG_ERR|LOG_WARNING|LOG_INFO|LOG_DEBUG]\n\n");
fprintf(stdout, " [--use-https=true] [--file-cache-timeout-in-seconds=120] [--log-level=LOG_OFF|LOG_CRIT|LOG_ERR|LOG_WARNING|LOG_INFO|LOG_DEBUG] [--use-attr-cache=true]\n\n");
fprintf(stdout, "In addition to setting --tmp-path parameter, you must also do one of the following:\n");
fprintf(stdout, "1. Specify a config file (using --config-file]=) with account name, account key, and container name, OR\n");
fprintf(stdout, "2. Set the environment variables AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_ACCESS_KEY, and specify the container name with --container-name=\n\n");
@ -414,6 +425,16 @@ int read_and_set_arguments(int argc, char *argv[], struct fuse_args *args)
}
}
str_options.use_attr_cache = false;
if (options.use_attr_cache != NULL)
{
std::string attr_cache(options.use_attr_cache);
if (attr_cache == "true")
{
str_options.use_attr_cache = true;
}
}
if (options.file_cache_timeout_in_seconds != NULL)
{
std::string timeout(options.file_cache_timeout_in_seconds);

Просмотреть файл

@ -117,6 +117,7 @@ struct str_options
std::string containerName;
std::string tmpPath;
bool use_https;
bool use_attr_cache;
};
extern struct str_options str_options;
@ -127,7 +128,7 @@ extern int default_permission;
// This is used to make all the calls to Storage
// The C++ lite client does not store state, other than connection info, so we can use it between calls without issue.
extern std::shared_ptr<blob_client_wrapper> azure_blob_client_wrapper;
extern std::shared_ptr<sync_blob_client> azure_blob_client_wrapper;
// Used to map HTTP errors (ex. 404) to Linux errno (ex ENOENT)
extern std::map<int, int> error_mapping;

Просмотреть файл

@ -0,0 +1,326 @@
#include <uuid/uuid.h>
#include "gtest/gtest.h"
#include "gmock/gmock.h"
#include "blobfuse.h"
using ::testing::_;
using ::testing::Return;
class MockBlobClient : public sync_blob_client {
public:
MOCK_CONST_METHOD0(is_valid, bool());
MOCK_METHOD5(list_blobs_hierarchical, list_blobs_hierarchical_response(const std::string &container, const std::string &delimiter, const std::string &continuation_token, const std::string &prefix, int maxresults));
MOCK_METHOD4(put_blob, void(const std::string &sourcePath, const std::string &container, const std::string blob, const std::vector<std::pair<std::string, std::string>> &metadata));
MOCK_METHOD4(upload_block_blob_from_stream, void(const std::string &container, const std::string blob, std::istream &is, const std::vector<std::pair<std::string, std::string>> &metadata));
MOCK_METHOD5(upload_file_to_blob, void(const std::string &sourcePath, const std::string &container, const std::string blob, const std::vector<std::pair<std::string, std::string>> &metadata, size_t parallel));
MOCK_METHOD5(download_blob_to_stream, void(const std::string &container, const std::string &blob, unsigned long long offset, unsigned long long size, std::ostream &os));
MOCK_METHOD5(download_blob_to_file, void(const std::string &container, const std::string &blob, const std::string &destPath, time_t &returned_last_modified, size_t parallel));
MOCK_METHOD2(get_blob_property, blob_property(const std::string &container, const std::string &blob));
MOCK_METHOD2(blob_exists, bool(const std::string &container, const std::string &blob));
MOCK_METHOD2(delete_blob, void(const std::string &container, const std::string &blob));
MOCK_METHOD4(start_copy, void(const std::string &sourceContainer, const std::string &sourceBlob, const std::string &destContainer, const std::string &destBlob));
};
// These tests validate that calls into the cache layer from multiple threads are synchronized & serialized properly.
// Correctness of return values is not tested (that's in another file.)
// The approach is, for every possible pair of operations (or calls), run the operations in parallel in three different scenarios:
// - On the same blob
// - On two different blobs in the same directory
// - On two blobs in different directories.
// The first operation that runs contains a small delay; the second one does not. If the operations should be serialized, the first must finish first; otherwise the second should finish first.
// Parameterized testing is used to test each pair of operations in a separate test.
class AttribCacheSynchronizationTest : public ::testing::TestWithParam<std::tuple<std::string, std::string, int>> {
public:
void prep_mock(std::shared_ptr<std::mutex> m, std::shared_ptr<std::condition_variable> cv, std::shared_ptr<int> calls, std::shared_ptr<bool> sleep_finished);
// Using a nice mock greatly simplifies testing, which is fine because we're not testing for cache correctness here.
std::shared_ptr<::testing::NiceMock<MockBlobClient>> mockClient;
std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper;
std::string container_name;
// This runs before each test.
virtual void SetUp()
{
container_name = "container";
mockClient = std::make_shared<::testing::NiceMock<MockBlobClient>>();
attrib_cache_wrapper = std::make_shared<blob_client_attr_cache_wrapper>(mockClient);
}
virtual void TearDown()
{
}
};
// Mocked methods should call into this; it tracks the number of calls that have been made, and sleeps if this is the first call.
void prep(std::shared_ptr<std::mutex> m, std::shared_ptr<std::condition_variable> cv, std::shared_ptr<int> calls, std::shared_ptr<bool> sleep_finished)
{
int call = 0;
{
std::lock_guard<std::mutex> lk(*m);
call = *calls;
(*calls)++;
}
if (call == 0)
{
(*cv).notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // TODO: Consider making this value larger if tests are flaky.
std::lock_guard<std::mutex> lk(*m);
*sleep_finished = true;
}
}
// Sets up a default action on every potential mocked method.
void AttribCacheSynchronizationTest::prep_mock(std::shared_ptr<std::mutex> m, std::shared_ptr<std::condition_variable> cv, std::shared_ptr<int> calls, std::shared_ptr<bool> sleep_finished)
{
ON_CALL(*mockClient, get_blob_property(_, _))
.WillByDefault(::testing::InvokeWithoutArgs([=] ()
{
prep(m, cv, calls, sleep_finished);
return blob_property(true);
}));
ON_CALL(*mockClient, list_blobs_hierarchical(_, _ ,_, _, _))
.WillByDefault(::testing::InvokeWithoutArgs([=] ()
{
prep(m, cv, calls, sleep_finished);
return list_blobs_hierarchical_response();
}));
ON_CALL(*mockClient, put_blob(_, _, _, _))
.WillByDefault(::testing::InvokeWithoutArgs([=] ()
{
prep(m, cv, calls, sleep_finished);
}));
ON_CALL(*mockClient, upload_block_blob_from_stream(_, _, _, _))
.WillByDefault(::testing::InvokeWithoutArgs([=] ()
{
prep(m, cv, calls, sleep_finished);
}));
ON_CALL(*mockClient, upload_file_to_blob(_, _, _, _, _))
.WillByDefault(::testing::InvokeWithoutArgs([=] ()
{
prep(m, cv, calls, sleep_finished);
}));
ON_CALL(*mockClient, download_blob_to_stream(_, _, _, _, _))
.WillByDefault(::testing::InvokeWithoutArgs([=] ()
{
prep(m, cv, calls, sleep_finished);
}));
ON_CALL(*mockClient, download_blob_to_file(_, _, _, _, _))
.WillByDefault(::testing::InvokeWithoutArgs([=] ()
{
prep(m, cv, calls, sleep_finished);
}));
ON_CALL(*mockClient, blob_exists(_, _))
.WillByDefault(::testing::InvokeWithoutArgs([=] ()
{
prep(m, cv, calls, sleep_finished);
return true;
}));
ON_CALL(*mockClient, delete_blob(_, _))
.WillByDefault(::testing::InvokeWithoutArgs([=] ()
{
prep(m, cv, calls, sleep_finished);
}));
ON_CALL(*mockClient, start_copy(_, _, _, _))
.WillByDefault(::testing::InvokeWithoutArgs([=] ()
{
prep(m, cv, calls, sleep_finished);
}));
}
// This maps the operation name to the code required to run the actual operation.
// A promise is used in the event that the code is being called async.
std::map<std::string, std::function<void(std::shared_ptr<blob_client_attr_cache_wrapper>, std::string, std::string, std::promise<void>)>> fnMap =
{
{"Get", [](std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper, std::string container_name, std::string blob, std::promise<void> promise)
{
attrib_cache_wrapper->get_blob_property(container_name, blob);
promise.set_value();
}},
{"Put", [](std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper, std::string container_name, std::string blob, std::promise<void> promise)
{
std::vector<std::pair<std::string, std::string>> metadata;
attrib_cache_wrapper->put_blob("source_path", container_name, blob, metadata);
promise.set_value();
}},
{"UploadFromStream", [](std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper, std::string container_name, std::string blob, std::promise<void> promise)
{
std::stringstream is;
std::vector<std::pair<std::string, std::string>> metadata;
attrib_cache_wrapper->upload_block_blob_from_stream(container_name, blob, is, metadata);
promise.set_value();
}},
{"UploadFromFile", [](std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper, std::string container_name, std::string blob, std::promise<void> promise)
{
std::vector<std::pair<std::string, std::string>> metadata;
attrib_cache_wrapper->upload_file_to_blob("source_path", container_name, blob, metadata, 10);
promise.set_value();
}},
{"DownloadToStream", [](std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper, std::string container_name, std::string blob, std::promise<void> promise)
{
std::stringstream os;
attrib_cache_wrapper->download_blob_to_stream(container_name, blob, 0, 10, os);
promise.set_value();
}},
{"DownloadToFile", [](std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper, std::string container_name, std::string blob, std::promise<void> promise)
{
time_t lmt;;
attrib_cache_wrapper->download_blob_to_file(container_name, blob, "dest_path", lmt, 10);
promise.set_value();
}},
{"Exists", [](std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper, std::string container_name, std::string blob, std::promise<void> promise)
{
attrib_cache_wrapper->blob_exists(container_name, blob);
promise.set_value();
}},
{"Delete", [](std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper, std::string container_name, std::string blob, std::promise<void> promise)
{
attrib_cache_wrapper->delete_blob(container_name, blob);
promise.set_value();
}},
{"Copy", [](std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper, std::string container_name, std::string blob, std::promise<void> promise)
{
attrib_cache_wrapper->start_copy(container_name, "src", container_name, blob);
promise.set_value();
}},
{"List", [](std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper, std::string container_name, std::string prefix, std::promise<void> promise)
{
attrib_cache_wrapper->list_blobs_hierarchical(container_name, "/", "marker", prefix, 10);
promise.set_value();
}},
};
// Helpers that look at the operation name. Helps determine what the expected behavior is (whether or not to expect the calls to be serialized.)
bool includes_download_operation(std::string op1, std::string op2)
{
return (op1.find("Download") == 0) || (op2.find("Download") == 0);
}
bool is_list_operation(std::string op)
{
return op.find("List") == 0;
}
// Based on the operation name and the scenario, this calculates whether or not the test should expect the operations to be synchronized.
bool expect_synchronization(std::string first_operation, std::string second_operation, int scenario)
{
if (includes_download_operation(first_operation, second_operation))
{
return false;
}
switch (scenario)
{
case 0:
return true; // Operations on the same blob should synchronize.
break;
case 1:
return false; // Different directories should never synchronize
break;
case 2:
return is_list_operation(first_operation) || is_list_operation(second_operation); // Different blobs in the same directory should only synchronize if one or both operations are listing operations.
break;
default:
std::cout << "No such scenario " << scenario;
return false;
break;
}
}
TEST_P(AttribCacheSynchronizationTest, Run)
{
std::string firstOperation = std::get<0>(GetParam());
std::string secondOperation = std::get<1>(GetParam());
int scenario = std::get<2>(GetParam());
std::shared_ptr<std::mutex> m = std::make_shared<std::mutex>();
std::shared_ptr<std::condition_variable> cv = std::make_shared<std::condition_variable>();
std::shared_ptr<int> calls = std::make_shared<int>(0);
std::shared_ptr<bool> sleep_finished = std::make_shared<bool>(false);
// Note that listing operations here are a special case - instead of passing in the blob name, we need to pass in the prefix of the blob (meaning, the directory name)
std::string input1 = is_list_operation(firstOperation) ? "dir1" : "dir1/bloba";
std::string input2;
//TODO: use an enum instead of an int.
switch (scenario)
{
case 0:
input2 = is_list_operation(secondOperation) ? "dir1" : "dir1/bloba"; // Same blob
break;
case 1:
input2 = is_list_operation(secondOperation) ? "dir2" : "dir2/bloba"; // Different directory
break;
case 2:
input2 = is_list_operation(secondOperation) ? "dir1" : "dir1/blobb"; // Same directory, different blob.
break;
default:
FAIL() << "No such scenario " << scenario;
break;
}
prep_mock(m, cv, calls, sleep_finished);
std::promise<void> first_promise;
std::future<void> first_future = first_promise.get_future();
std::thread slow_call(fnMap[firstOperation], attrib_cache_wrapper, container_name, input1, std::move(first_promise));
{
// Ensure that the call in the new thread has started - without this, it's possible that the below call
// could happen prior to the get_properties call in a new thread.
std::unique_lock<std::mutex> lk(*m);
(*cv).wait(lk, [&] {return (*calls) > 0;});
}
EXPECT_FALSE(*sleep_finished);
std::promise<void> unused;
fnMap[secondOperation](attrib_cache_wrapper, container_name, input2, std::move(unused));
if (expect_synchronization(firstOperation, secondOperation, scenario))
{
EXPECT_TRUE(*sleep_finished);
}
else
{
EXPECT_FALSE(*sleep_finished);
}
slow_call.join();
}
// Generates the list of operations
std::vector<std::string> getKeys()
{
std::vector<std::string> keys;
for (auto it = fnMap.begin(); it != fnMap.end(); it++)
{
keys.push_back(it->first);
}
return keys;
}
// Helper to generate a (more) friendly name for a given test, based on the test parameters.
std::string getTestName(::testing::TestParamInfo<std::tuple<std::string, std::string, int>> info)
{
std::string scenario;
switch (std::get<2>(info.param))
{
case 0:
scenario = "SameBlob";
break;
case 1:
scenario = "DiffDirectory";
break;
case 2:
scenario = "DiffBlob";
break;
default:
break;
}
std::string ret;
return ret + std::get<0>(info.param) + "Then" + std::get<1>(info.param) + scenario;
}
INSTANTIATE_TEST_CASE_P(AttribCacheTests, AttribCacheSynchronizationTest, ::testing::Combine(::testing::ValuesIn(getKeys()), ::testing::ValuesIn(getKeys()), ::testing::Values(0, 1, 2)), getTestName);

555
test/attribcachetests.cpp Normal file
Просмотреть файл

@ -0,0 +1,555 @@
#include <uuid/uuid.h>
//#include "gtest/gtest.h"
#include "gmock/gmock.h"
#include "blobfuse.h"
using::testing::_;
using ::testing::Return;
// Used for GoogleMock
class MockBlobClient : public sync_blob_client {
public:
MOCK_CONST_METHOD0(is_valid, bool());
MOCK_METHOD5(list_blobs_hierarchical, list_blobs_hierarchical_response(const std::string &container, const std::string &delimiter, const std::string &continuation_token, const std::string &prefix, int maxresults));
MOCK_METHOD4(put_blob, void(const std::string &sourcePath, const std::string &container, const std::string blob, const std::vector<std::pair<std::string, std::string>> &metadata));
MOCK_METHOD4(upload_block_blob_from_stream, void(const std::string &container, const std::string blob, std::istream &is, const std::vector<std::pair<std::string, std::string>> &metadata));
MOCK_METHOD5(upload_file_to_blob, void(const std::string &sourcePath, const std::string &container, const std::string blob, const std::vector<std::pair<std::string, std::string>> &metadata, size_t parallel));
MOCK_METHOD5(download_blob_to_stream, void(const std::string &container, const std::string &blob, unsigned long long offset, unsigned long long size, std::ostream &os));
MOCK_METHOD5(download_blob_to_file, void(const std::string &container, const std::string &blob, const std::string &destPath, time_t &returned_last_modified, size_t parallel));
MOCK_METHOD2(get_blob_property, blob_property(const std::string &container, const std::string &blob));
MOCK_METHOD2(blob_exists, bool(const std::string &container, const std::string &blob));
MOCK_METHOD2(delete_blob, void(const std::string &container, const std::string &blob));
MOCK_METHOD4(start_copy, void(const std::string &sourceContainer, const std::string &sourceBlob, const std::string &destContainer, const std::string &destBlob));
};
// These tests primarily test correctness of the attr cache - both that the data is correct, and that data is being correctly cached.
// This file does not test synchronization behavior - that's in a different file.
//
// Overall reminder regarding the GoogleTest assertion macros -
// The EXPECT_* macros are used to validate correctness non-fatally. Meaning, if an expectation fails, the test will fail, but will continue to run to completion.
// The ASSERT_* macros are supposed to be fatal. If the assertion fails, the test fails, and the method returns at that point. (Note that the caller will continue to run.)
class AttribCacheTest : public ::testing::Test {
public:
// Usually using a strict mock is bad practice, but we're using it here because we're testing caching behavior.
// Nice mocks or naggy mocks could ignore calls that should fail tests (because the cache is being used incorrectly.)
std::shared_ptr<::testing::StrictMock<MockBlobClient>> mockClient;
std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper;
std::string container_name;
// This runs before each test.
virtual void SetUp()
{
container_name = "container";
mockClient = std::make_shared<::testing::StrictMock<MockBlobClient>>();
attrib_cache_wrapper = std::make_shared<blob_client_attr_cache_wrapper>(mockClient);
}
virtual void TearDown()
{
}
};
// Helper methods for checking equality of blob properties and metadata
void assert_metadata_equal(std::vector<std::pair<std::string, std::string>>& left, std::vector<std::pair<std::string, std::string>>& right)
{
ASSERT_EQ(left.size(), right.size()) << "blob_property objects not equal; differing metadata count.";
std::vector<std::pair<std::string, std::string>> left_copy(left);
std::vector<std::pair<std::string, std::string>> right_copy(right);
std::sort(left_copy.begin(), left_copy.end());
std::sort(right_copy.begin(), right_copy.end());
auto mismatch = std::mismatch(left_copy.begin(), left_copy.end(), right_copy.begin());
EXPECT_EQ(left_copy.end(), mismatch.first) << "Metadata not equal at left element = \"" << (*mismatch.first).first << "\"=\"" << (*mismatch.first).second << "\" and right element \"" << (*mismatch.second).first << "\"=\"" << (*mismatch.second).second << "\".";
}
void assert_blob_property_objects_equal(blob_property& left, blob_property& right)
{
if (!left.valid() && !right.valid()) return; // two invalid objects are equal; aborting comparison.
ASSERT_TRUE(left.valid()) << "blob_property objects not equal; left is invalid.";
ASSERT_TRUE(right.valid()) << "blob_property objects not equal; right is invalid.";
EXPECT_EQ(left.cache_control, right.cache_control) << "blob_property objects not equal; cache_control";
EXPECT_EQ(left.content_disposition, right.content_disposition) << "blob_property objects not equal; content_disposition";
EXPECT_EQ(left.content_encoding, right.content_encoding) << "blob_property objects not equal; content_encoding";
EXPECT_EQ(left.content_language, right.content_language) << "blob_property objects not equal; content_language";
EXPECT_EQ(left.size, right.size) << "blob_property objects not equal; size";
EXPECT_EQ(left.content_md5, right.content_md5) << "blob_property objects not equal; content_md5";
EXPECT_EQ(left.content_type, right.content_type) << "blob_property objects not equal; content_type";
EXPECT_EQ(left.etag, right.etag) << "blob_property objects not equal; etag";
EXPECT_EQ(left.copy_status, right.copy_status) << "blob_property objects not equal; copy_status";
EXPECT_EQ(left.last_modified, right.last_modified) << "blob_property objects not equal; last_modified";
// Add when implemented:
// blob_type m_type;
// azure::storage::lease_status m_lease_status;
// azure::storage::lease_state m_lease_state;
// azure::storage::lease_duration m_lease_duration;
assert_metadata_equal(left.metadata, right.metadata);
}
void assert_list_item_equal(list_blobs_hierarchical_item &left, list_blobs_hierarchical_item &right)
{
EXPECT_EQ(left.name, right.name);
EXPECT_EQ(left.snapshot, right.snapshot);
EXPECT_EQ(left.last_modified, right.last_modified);
EXPECT_EQ(left.etag, right.etag);
EXPECT_EQ(left.content_length, right.content_length);
EXPECT_EQ(left.content_encoding, right.content_encoding);
EXPECT_EQ(left.content_type, right.content_type);
EXPECT_EQ(left.content_md5, right.content_md5);
EXPECT_EQ(left.content_language, right.content_language);
EXPECT_EQ(left.cache_control, right.cache_control);
EXPECT_EQ(left.status, right.status);
EXPECT_EQ(left.state, right.state);
EXPECT_EQ(left.duration, right.duration);
EXPECT_EQ(left.copy_status, right.copy_status);
EXPECT_EQ(left.is_directory, right.is_directory);
assert_metadata_equal(left.metadata, right.metadata);
}
void assert_list_response_objects_equal(list_blobs_hierarchical_response &left, list_blobs_hierarchical_response& right)
{
EXPECT_EQ(left.next_marker, right.next_marker);
EXPECT_EQ(left.ms_request_id, right.ms_request_id);
ASSERT_EQ(left.blobs.size(), right.blobs.size());
for (size_t i = 0; i < left.blobs.size(); i++)
{
assert_list_item_equal(left.blobs[i], right.blobs[i]);
}
}
// Helper for creating a sample blob_property object with some sample data.
blob_property create_blob_property(std::string etag, unsigned long long size)
{
blob_property props(true);
props.etag = etag;
props.size = size;
props.cache_control = "cache_control";
// props.content_disposition = "content_disposition"; // Add when implemented
props.content_encoding = "content_encoding";
props.content_language = "content_language";
props.content_md5 = "content_md5";
props.content_type = "content_type";
props.copy_status = "copy_status";
props.last_modified = time(NULL);
// Just some sample metadata
props.metadata = {std::make_pair("k5", "v5"), std::make_pair("k1", "v1"), std::make_pair("k2", "v2"), std::make_pair("k3", "v3")};
return props;
}
// Helper for converting a blob_property object into a list_blobs_hierarchical_item.
// TODO: Remove this once cpplite unifies these two types.
list_blobs_hierarchical_item blob_property_to_item(std::string name, blob_property prop, bool is_directory)
{
list_blobs_hierarchical_item item;
item.name = name;
item.is_directory = is_directory;
item.cache_control = prop.cache_control;
item.content_encoding = prop.content_encoding;
item.content_language = prop.content_language;
item.content_length = prop.size;
item.content_md5 = prop.content_md5;
item.content_type = prop.content_type;
item.etag = prop.etag;
item.metadata = prop.metadata;
item.copy_status = prop.copy_status;
char buf[30];
std::time_t t = prop.last_modified;
std::tm *pm;
pm = std::gmtime(&t);
size_t s = std::strftime(buf, 30, constants::date_format_rfc_1123, pm);
item.last_modified = std::string(buf, s);
// Add when implemented:
// item.content_disposition = prop.content_disposition;
// Lease status / state / duration
return item;
}
// Base case - check that GetBlobProperties calls are cached.
TEST_F(AttribCacheTest, GetBlobPropertiesSingle)
{
std::string blob = "blob";
//TODO: replace with create_blob_property
blob_property prop = create_blob_property("samepleEtag", 4);
EXPECT_CALL(*mockClient, get_blob_property(container_name, blob))
.Times(1)
.WillOnce(Return(prop));
blob_property newprop = attrib_cache_wrapper->get_blob_property(container_name, blob);
blob_property newprop2 = attrib_cache_wrapper->get_blob_property(container_name, blob);
assert_blob_property_objects_equal(newprop, newprop2);
}
// Tests that regardless of multiple calls to get_property or ordering, each blob makes only one service call.
TEST_F(AttribCacheTest, GetBlobPropertiesMultiple)
{
std::string blob1 = "blob1";
std::string blob2 = "blob2";
std::string blob3 = "blob3";
blob_property prop1 = create_blob_property("etag1", 4);
blob_property prop2 = create_blob_property("etag2", 15);
blob_property prop3 = create_blob_property("etag3", 239817328401234ull); // larger than will fit in an int
EXPECT_CALL(*mockClient, get_blob_property(container_name, blob1))
.Times(1)
.WillOnce(Return(prop1));
EXPECT_CALL(*mockClient, get_blob_property(container_name, blob2))
.Times(1)
.WillOnce(Return(prop2));
EXPECT_CALL(*mockClient, get_blob_property(container_name, blob3))
.Times(1)
.WillOnce(Return(prop3));
blob_property prop2copy1 = attrib_cache_wrapper->get_blob_property(container_name, blob2);
blob_property prop1copy1 = attrib_cache_wrapper->get_blob_property(container_name, blob1);
blob_property prop1copy2 = attrib_cache_wrapper->get_blob_property(container_name, blob1);
blob_property prop2copy2 = attrib_cache_wrapper->get_blob_property(container_name, blob2);
blob_property prop2copy3 = attrib_cache_wrapper->get_blob_property(container_name, blob2);
blob_property prop1copy3 = attrib_cache_wrapper->get_blob_property(container_name, blob1);
blob_property prop3copy1 = attrib_cache_wrapper->get_blob_property(container_name, blob3);
blob_property prop1copy4 = attrib_cache_wrapper->get_blob_property(container_name, blob1);
blob_property prop2copy4 = attrib_cache_wrapper->get_blob_property(container_name, blob2);
blob_property prop1copy5 = attrib_cache_wrapper->get_blob_property(container_name, blob1);
assert_blob_property_objects_equal(prop1, prop1copy1);
assert_blob_property_objects_equal(prop1, prop1copy2);
assert_blob_property_objects_equal(prop1, prop1copy3);
assert_blob_property_objects_equal(prop1, prop1copy4);
assert_blob_property_objects_equal(prop1, prop1copy5);
assert_blob_property_objects_equal(prop2, prop2copy1);
assert_blob_property_objects_equal(prop2, prop2copy2);
assert_blob_property_objects_equal(prop2, prop2copy3);
assert_blob_property_objects_equal(prop2, prop2copy4);
assert_blob_property_objects_equal(prop3, prop3copy1);
}
// Check that listing operations cache the returned blob properties
TEST_F(AttribCacheTest, GetBlobPropertiesListSimple)
{
std::string blob1 = "blob1";
std::string blob2 = "blob2";
std::string blob3 = "blob3";
blob_property prop1 = create_blob_property("etag1", 4);
blob_property prop2 = create_blob_property("etag2", 15);
blob_property prop3 = create_blob_property("etag3", 239817328401234ull); // larger than will fit in an int
list_blobs_hierarchical_response list_response;
list_response.next_marker = "marker";
list_response.blobs.push_back(blob_property_to_item(blob1, prop1, false));
list_response.blobs.push_back(blob_property_to_item(blob2, prop2, true)); // Ensure that directories don't get cached
list_response.blobs.push_back(blob_property_to_item(blob3, prop3, false));
EXPECT_CALL(*mockClient, list_blobs_hierarchical(container_name, "/", "token", "prefix", 10000))
.Times(1)
.WillOnce(Return(list_response));
EXPECT_CALL(*mockClient, get_blob_property(container_name, blob2))
.Times(1)
.WillOnce(Return(prop2));
list_blobs_hierarchical_response list_response_cache = attrib_cache_wrapper->list_blobs_hierarchical(container_name, "/", "token", "prefix", 10000);
blob_property prop1_1 = attrib_cache_wrapper->get_blob_property(container_name, blob1);
blob_property prop2_1 = attrib_cache_wrapper->get_blob_property(container_name, blob2);
blob_property prop3_1 = attrib_cache_wrapper->get_blob_property(container_name, blob3);
assert_list_response_objects_equal(list_response, list_response_cache);
assert_blob_property_objects_equal(prop1, prop1_1);
assert_blob_property_objects_equal(prop2, prop2_1);
assert_blob_property_objects_equal(prop3, prop3_1);
}
TEST_F(AttribCacheTest, GetBlobPropertiesListRepeated)
{
// Here we will test the interaction of multiple get_blob_property and list_blobs calls.
// We'll make two list_blobs calls, with get_blob_property calls before, between, and after.
// Different blobs will return different values at the various calls, to test different behavior.
//
// blob1 is the base case, it's not modified between calls.
// blob2 will change between calls, ensuring that the cache is updated properly.
// blob3 will not be included in the listing results, ensuring that the data in the cache is still valid even in this case.
// blob4 will be invalidated between calls, to make sure that the cache data is re-applied in a list call in this case.
// blob5 will be invalidated between calls, to make sure that the cache data is re-applied in a get_properties call in this case.
std::string blob1 = "blob1";
std::string blob2 = "blob2";
std::string blob3 = "blob3";
std::string blob4 = "blob4";
std::string blob5 = "blob5";
blob_property prop1 = create_blob_property("etag1", 4);
blob_property prop2_v0 = create_blob_property("etag2_0", 29);
blob_property prop2_v1 = create_blob_property("etag2_1", 15);
blob_property prop2_v2 = create_blob_property("etag2_2", 43);
blob_property prop3 = create_blob_property("etag3", 239817328401234ull); // larger than will fit in an int
blob_property prop4_v1 = create_blob_property("etag4_1", 0);
blob_property prop4_v2 = create_blob_property("etag4_2", 1);
blob_property prop5_v1 = create_blob_property("etag5_1", 2);
blob_property prop5_v2 = create_blob_property("etag5_2", 3);
list_blobs_hierarchical_response list_response_1;
list_response_1.next_marker = "marker";
list_response_1.blobs.push_back(blob_property_to_item(blob1, prop1, false));
list_response_1.blobs.push_back(blob_property_to_item(blob2, prop2_v1, false));
list_response_1.blobs.push_back(blob_property_to_item(blob4, prop4_v1, false));
list_response_1.blobs.push_back(blob_property_to_item(blob5, prop5_v1, false));
list_blobs_hierarchical_response list_response_2;
list_response_2.next_marker = "marker";
list_response_2.blobs.push_back(blob_property_to_item(blob1, prop1, false));
list_response_2.blobs.push_back(blob_property_to_item(blob2, prop2_v2, false));
list_response_2.blobs.push_back(blob_property_to_item(blob4, prop4_v2, false));
list_response_2.blobs.push_back(blob_property_to_item(blob5, prop5_v2, false));
{
::testing::InSequence seq; // Expectations defined until this goes out-of-scope are validated in order.
EXPECT_CALL(*mockClient, get_blob_property(container_name, blob2))
.Times(1)
.WillOnce(Return(prop2_v0));
EXPECT_CALL(*mockClient, list_blobs_hierarchical(container_name, "/", "token", "prefix", 10000))
.Times(1)
.WillOnce(Return(list_response_1));
EXPECT_CALL(*mockClient, get_blob_property(container_name, blob3))
.Times(1)
.WillOnce(Return(prop3));
EXPECT_CALL(*mockClient, delete_blob(container_name, blob4))
.Times(1);
EXPECT_CALL(*mockClient, delete_blob(container_name, blob5))
.Times(1);
EXPECT_CALL(*mockClient, get_blob_property(container_name, blob5))
.Times(1)
.WillOnce(Return(prop5_v2));
EXPECT_CALL(*mockClient, list_blobs_hierarchical(container_name, "/", "token", "prefix", 10000))
.Times(1)
.WillOnce(Return(list_response_2));
}
blob_property propcache2_0 = attrib_cache_wrapper->get_blob_property(container_name, blob2);
list_blobs_hierarchical_response list_response_cache_1 = attrib_cache_wrapper->list_blobs_hierarchical(container_name, "/", "token", "prefix", 10000);
blob_property propcache1_1 = attrib_cache_wrapper->get_blob_property(container_name, blob1);
blob_property propcache2_1 = attrib_cache_wrapper->get_blob_property(container_name, blob2);
blob_property propcache3_1 = attrib_cache_wrapper->get_blob_property(container_name, blob3);
blob_property propcache4_1 = attrib_cache_wrapper->get_blob_property(container_name, blob4);
blob_property propcache5_1 = attrib_cache_wrapper->get_blob_property(container_name, blob5);
attrib_cache_wrapper->delete_blob(container_name, blob4); // Invalidate blobs 4 and 5
attrib_cache_wrapper->delete_blob(container_name, blob5);
blob_property propcache5_post_invalidate = attrib_cache_wrapper->get_blob_property(container_name, blob5);
list_blobs_hierarchical_response list_response_cache_2 = attrib_cache_wrapper->list_blobs_hierarchical(container_name, "/", "token", "prefix", 10000);
blob_property propcache1_2 = attrib_cache_wrapper->get_blob_property(container_name, blob1);
blob_property propcache2_2 = attrib_cache_wrapper->get_blob_property(container_name, blob2);
blob_property propcache3_2 = attrib_cache_wrapper->get_blob_property(container_name, blob3);
blob_property propcache4_2 = attrib_cache_wrapper->get_blob_property(container_name, blob4);
blob_property propcache5_2 = attrib_cache_wrapper->get_blob_property(container_name, blob5);
assert_list_response_objects_equal(list_response_1, list_response_cache_1);
assert_list_response_objects_equal(list_response_2, list_response_cache_2);
assert_blob_property_objects_equal(prop2_v0, propcache2_0);
assert_blob_property_objects_equal(prop1, propcache1_1);
assert_blob_property_objects_equal(prop2_v1, propcache2_1);
assert_blob_property_objects_equal(prop3, propcache3_1);
assert_blob_property_objects_equal(prop4_v1, propcache4_1);
assert_blob_property_objects_equal(prop5_v1, propcache5_1);
assert_blob_property_objects_equal(prop5_v2, propcache5_post_invalidate);
assert_blob_property_objects_equal(prop1, propcache1_2);
assert_blob_property_objects_equal(prop2_v2, propcache2_2);
assert_blob_property_objects_equal(prop3, propcache3_2);
assert_blob_property_objects_equal(prop4_v2, propcache4_2);
assert_blob_property_objects_equal(prop5_v2, propcache5_2);
}
// These tests ensure that methods other than get_blob_properties and list_blobs invalidate the cache when called.
// We use GoogleTest's parameterized testing to generate one test per method.
class AttribCacheInvalidateCacheTest : public AttribCacheTest, public ::testing::WithParamInterface<std::string> {
};
// Maps the name of an operation to the code needed to call the operation under test.
std::map<std::string, std::function<void(std::shared_ptr<blob_client_attr_cache_wrapper>, std::string, std::string)>> operationMap =
{
{"Put", [](std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper, std::string container_name, std::string blob)
{
std::vector<std::pair<std::string, std::string>> metadata;
attrib_cache_wrapper->put_blob("source_path", container_name, blob, metadata);
}},
{"UploadFromStream", [](std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper, std::string container_name, std::string blob)
{
std::stringstream is;
std::vector<std::pair<std::string, std::string>> metadata;
attrib_cache_wrapper->upload_block_blob_from_stream(container_name, blob, is, metadata);
}},
{"UploadFromFile", [](std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper, std::string container_name, std::string blob)
{
std::vector<std::pair<std::string, std::string>> metadata;
attrib_cache_wrapper->upload_file_to_blob("source_path", container_name, blob, metadata, 10);
}},
{"DownloadToStream", [](std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper, std::string container_name, std::string blob)
{
std::stringstream os;
attrib_cache_wrapper->download_blob_to_stream(container_name, blob, 0, 10, os);
}},
{"DownloadToFile", [](std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper, std::string container_name, std::string blob)
{
time_t lmt;
attrib_cache_wrapper->download_blob_to_file(container_name, blob, "dest_path", lmt, 10);
}},
{"Exists", [](std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper, std::string container_name, std::string blob)
{
attrib_cache_wrapper->blob_exists(container_name, blob);
}},
{"Delete", [](std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper, std::string container_name, std::string blob)
{
attrib_cache_wrapper->delete_blob(container_name, blob);
}},
{"Copy", [](std::shared_ptr<blob_client_attr_cache_wrapper> attrib_cache_wrapper, std::string container_name, std::string blob)
{
attrib_cache_wrapper->start_copy(container_name, "src", container_name, blob);
}},
};
// Maps the name of an operation to the code needed to set up the expectation for that operation on the mock.
// Needed because we are using a StrictMock, and we want to validate the exact call sequence.
std::map<std::string, std::function<void(std::shared_ptr<::testing::StrictMock<MockBlobClient>>, std::string, std::string, ::testing::Sequence)>> expectationMap =
{
{"Put", [](std::shared_ptr<::testing::StrictMock<MockBlobClient>> mockClient, std::string container_name, std::string blob_name, ::testing::Sequence seq)
{
EXPECT_CALL(*mockClient, put_blob(_, container_name, blob_name, _))
.Times(1)
.InSequence(seq);
}},
{"UploadFromStream", [](std::shared_ptr<::testing::StrictMock<MockBlobClient>> mockClient, std::string container_name, std::string blob_name, ::testing::Sequence seq)
{
EXPECT_CALL(*mockClient, upload_block_blob_from_stream(container_name, blob_name, _, _))
.Times(1)
.InSequence(seq);
}},
{"UploadFromFile", [](std::shared_ptr<::testing::StrictMock<MockBlobClient>> mockClient, std::string container_name, std::string blob_name, ::testing::Sequence seq)
{
EXPECT_CALL(*mockClient, upload_file_to_blob(_, container_name, blob_name, _, _))
.Times(1)
.InSequence(seq);
}},
{"DownloadToStream", [](std::shared_ptr<::testing::StrictMock<MockBlobClient>> mockClient, std::string container_name, std::string blob_name, ::testing::Sequence seq)
{
EXPECT_CALL(*mockClient, download_blob_to_stream(container_name, blob_name, _, _, _))
.Times(1)
.InSequence(seq);
}},
{"DownloadToFile", [](std::shared_ptr<::testing::StrictMock<MockBlobClient>> mockClient, std::string container_name, std::string blob_name, ::testing::Sequence seq)
{
EXPECT_CALL(*mockClient, download_blob_to_file(container_name, blob_name, _, _, _))
.Times(1)
.InSequence(seq);
}},
{"Exists", [](std::shared_ptr<::testing::StrictMock<MockBlobClient>>, std::string, std::string, ::testing::Sequence)
{
// Exists is a special case - it calls this.get_blob_property internally, which (in this case) should be cached, so we don't expect anything.
}},
{"Delete", [](std::shared_ptr<::testing::StrictMock<MockBlobClient>> mockClient, std::string container_name, std::string blob_name, ::testing::Sequence seq)
{
EXPECT_CALL(*mockClient, delete_blob(container_name, blob_name))
.Times(1)
.InSequence(seq);
}},
{"Copy", [](std::shared_ptr<::testing::StrictMock<MockBlobClient>> mockClient, std::string container_name, std::string blob_name, ::testing::Sequence seq)
{
EXPECT_CALL(*mockClient, start_copy(container_name, _, container_name, blob_name))
.Times(1)
.InSequence(seq);
}},
};
// For each operation, whether or not the test should expect the operation to invalidate the cache.
std::map<std::string, bool> expectInvalidate =
{
{"Put", true},
{"UploadFromStream", true},
{"UploadFromFile", true},
{"DownloadToStream", false},
{"DownloadToFile", false},
{"Exists", false},
{"Delete", true},
{"Copy", true},
};
TEST_P(AttribCacheInvalidateCacheTest, Run)
{
std::string operation_name = GetParam();
std::string blob_name = "blob";
blob_property prop1 = create_blob_property("etag1", 4);
blob_property prop2 = create_blob_property("etag2", 17);
// This is a way to "checkpoint" calls - used with a Sequence, it helps ensure that the expectations are being called from the correct point.
// Otherwise, it might be unclear exactly which get_blob_property call was being matched in the mock.
::testing::MockFunction<void(std::string check_point_name)> check;
::testing::Sequence seq;
EXPECT_CALL(*mockClient, get_blob_property(container_name, blob_name)).Times(1).InSequence(seq).WillOnce(Return(prop1));
EXPECT_CALL(check, Call("1")).Times(1).InSequence(seq);
EXPECT_CALL(check, Call("2")).Times(1).InSequence(seq);
expectationMap[operation_name](mockClient, container_name, blob_name, seq);
EXPECT_CALL(check, Call("3")).Times(1).InSequence(seq);
if (expectInvalidate[operation_name])
{
EXPECT_CALL(*mockClient, get_blob_property(container_name, blob_name)).Times(1).InSequence(seq)
.WillOnce(Return(prop2));
}
blob_property prop_cache1 = attrib_cache_wrapper->get_blob_property(container_name, blob_name);
check.Call("1");
blob_property prop_cache2 = attrib_cache_wrapper->get_blob_property(container_name, blob_name);
check.Call("2");
operationMap[operation_name](attrib_cache_wrapper, container_name, blob_name);
check.Call("3");
blob_property prop_cache3 = attrib_cache_wrapper->get_blob_property(container_name, blob_name);
assert_blob_property_objects_equal(prop1, prop_cache1);
assert_blob_property_objects_equal(prop1, prop_cache2);
assert_blob_property_objects_equal(expectInvalidate[operation_name] ? prop2 : prop1, prop_cache3);
}
// Helpers for instantiating the parameterized tests
std::vector<std::string> getKeys2()
{
std::vector<std::string> keys;
for (auto it = operationMap.begin(); it != operationMap.end(); it++)
{
keys.push_back(it->first);
}
return keys;
}
// Helper to generate a more-informative test name
std::string getTestName(::testing::TestParamInfo<std::string> info)
{
return info.param;
}
INSTANTIATE_TEST_CASE_P(AttribCacheTests, AttribCacheInvalidateCacheTest, ::testing::ValuesIn(getKeys2()), getTestName);
// TODO: move main() into a separate file; it should exist only once for the 'blobfusetests' application.
int main(int argc, char** argv) {
::testing::InitGoogleMock(&argc, argv);
return RUN_ALL_TESTS();
}

Просмотреть файл

@ -1,5 +1,6 @@
#include <uuid/uuid.h>
#include <ftw.h>
#include <random>
#include "gtest/gtest.h"
//#include "gmock/gmock.h"

148
test/hierarchical_test.cpp Normal file
Просмотреть файл

@ -0,0 +1,148 @@
#include <iostream>
#include <uuid/uuid.h>
#include "blobfuse.h"
using namespace std;
static std::shared_ptr<blob_client_wrapper> test_blob_client_wrapper;
std::string container_name;
std::string tmp_dir;
bool init_test()
{
int ret = read_config("../connection.cfg");
if(ret != 0)
{
cout << "Could not read config file";
return false;
}
std::string blob_endpoint;
std::string sas_token;
test_blob_client_wrapper = std::make_shared<blob_client_wrapper>(blob_client_wrapper::blob_client_wrapper_init(str_options.accountName, str_options.accountKey, sas_token, 20, str_options.use_https, blob_endpoint));
return true;
}
// This runs before each test. We create a container with a unique name, and create a test directory to be a sandbox.
bool SetUp()
{
uuid_t container_uuid;
uuid_generate( (unsigned char *)&container_uuid );
char container_name_uuid[37];
uuid_unparse_lower(container_uuid, container_name_uuid);
std::string container_name_prefix = "container";
container_name = container_name_prefix + container_name_uuid;
container_name.erase(std::remove(container_name.begin(), container_name.end(), '-'), container_name.end());
errno = 0;
test_blob_client_wrapper->create_container(container_name);
if(errno != 0)
{
cout << "SetUp - CreateContainer failed with errno = " << errno;
return false;
}
tmp_dir = "/tmp/blobfuseteststmp";
errno = 0;
struct stat buf;
int statret = stat(tmp_dir.c_str(), &buf);
if (statret == 0)
{
errno = 0;
destroy_path(tmp_dir);
ASSERT_EQ(0, errno) << "SetUp - cleanup of old tmp directory failed with errno " << errno;
}
errno = 0;
mkdir(tmp_dir.c_str(), 0777);
ASSERT_EQ(0, errno) << "SetUp - tmp dir creation failed with errno " << errno;
}
void write_to_file(std::string path, std::string text)
{
std::ofstream output_stream(path, std::ios::binary);
output_stream << text;
}
bool tests_list_blob_hierarchical()
{
// Setup a series of blobs in a pretend file structure
std::string file_path = tmp_dir + "/tmpfile";
std::string file_text = "some file text here.";
write_to_file(file_path, file_text);
std::vector<std::string> blob_names;
blob_names.push_back("bloba");
blob_names.push_back("blobb");
blob_names.push_back("blobc");
blob_names.push_back("zblob");
blob_names.push_back("dira/blobd");
blob_names.push_back("dira/blobe");
blob_names.push_back("dirb/blobf");
blob_names.push_back("dirb/blobg");
blob_names.push_back("dira/dirc/blobd");
blob_names.push_back("dira/dirc/blobe");
for (uint i = 0; i < blob_names.size(); i++)
{
errno = 0;
test_blob_client_wrapper->put_blob(file_path, container_name, blob_names[i]);
ASSERT_EQ(0, errno) << "put_blob failed for blob << " << blob_names[i];
}
// Validate that all blobs and blob "directories" are correctly found for given prefixes
errno = 0;
std::vector<list_blobs_hierarchical_item> blob_list_results = list_all_blobs(container_name, "/", "");
ASSERT_EQ(0, errno) << "list_all_blobs failed for empty prefix";
ASSERT_EQ(6, blob_list_results.size()) << "Incorrect number of blob entries found.";
CHECK_STRINGS(blob_list_results[0].name, blob_names[0]);
ASSERT_FALSE(blob_list_results[0].is_directory);
CHECK_STRINGS(blob_list_results[1].name, blob_names[1]);
ASSERT_FALSE(blob_list_results[1].is_directory);
CHECK_STRINGS(blob_list_results[2].name, blob_names[2]);
ASSERT_FALSE(blob_list_results[2].is_directory);
CHECK_STRINGS(blob_list_results[3].name, blob_names[3]);
ASSERT_FALSE(blob_list_results[3].is_directory);
CHECK_STRINGS(blob_list_results[4].name, "dira/");
ASSERT_TRUE(blob_list_results[4].is_directory);
CHECK_STRINGS(blob_list_results[5].name, "dirb/");
ASSERT_TRUE(blob_list_results[5].is_directory);
errno = 0;
blob_list_results = list_all_blobs(container_name, "/", "dira/");
ASSERT_EQ(0, errno) << "list_all_blobs failed for prefix dira/";
ASSERT_EQ(3, blob_list_results.size()) << "Incorrect number of blob entries found.";
CHECK_STRINGS(blob_list_results[0].name, blob_names[4]);
ASSERT_FALSE(blob_list_results[0].is_directory);
CHECK_STRINGS(blob_list_results[1].name, blob_names[5]);
ASSERT_FALSE(blob_list_results[1].is_directory);
CHECK_STRINGS(blob_list_results[2].name, "dira/dirc/");
ASSERT_TRUE(blob_list_results[2].is_directory);
errno = 0;
blob_list_results = list_all_blobs(container_name, "/", "dira/dirc/");
ASSERT_EQ(0, errno) << "list_all_blobs failed for prefix dira/dirc/";
ASSERT_EQ(2, blob_list_results.size()) << "Incorrect number of blob entries found.";
CHECK_STRINGS(blob_list_results[0].name, blob_names[8]);
ASSERT_FALSE(blob_list_results[0].is_directory);
CHECK_STRINGS(blob_list_results[1].name, blob_names[9]);
ASSERT_FALSE(blob_list_results[1].is_directory);
errno = 0;
list_all_blobs(container_name + 'x', "/", "");
ASSERT_EQ(404, errno) << "Listing did not fail as expected.";
errno = 0;
blob_list_results = list_all_blobs(container_name, "/", "notaprefix");
ASSERT_EQ(0, errno) << "Listing failed for zero results";
ASSERT_EQ(0, blob_list_results.size()) << "Incorrect number of blobs found.";
}
int main()
{
tests_list_blob_hierarchical();
return 0;
}