This commit is contained in:
Jinming Hu 2019-11-27 16:57:31 +08:00 коммит произвёл Vincent Jiang (LEI)
Родитель 3c87b389f4
Коммит 811704d729
30 изменённых файлов: 24562 добавлений и 42 удалений

Просмотреть файл

@ -3,10 +3,47 @@ cmake_minimum_required(VERSION 2.8)
project(azure_storage_adls)
set(AZURE_STORAGE_ADLS_SOURCES
include/json.hpp
include/nlohmann_json_parser.h
include/adls_client.h
include/adls_request_base.h
include/create_directory_request.h
include/delete_directory_request.h
include/set_access_control_request.h
include/get_access_control_request.h
include/list_paths_request.h
include/create_file_request.h
include/append_data_request.h
include/flush_data_request.h
include/rename_file_request.h
src/nlohmann_json_parser.cpp
src/adls_client.cpp
src/create_directory_request.cpp
src/delete_directory_request.cpp
src/set_access_control_request.cpp
src/get_access_control_request.cpp
src/list_paths_request.cpp
src/create_file_request.cpp
src/append_data_request.cpp
src/flush_data_request.cpp
src/rename_file_request.cpp
)
add_library(azure_storage_adls ${AZURE_STORAGE_ADLS_SOURCES})
add_library(azure-storage-adls ${AZURE_STORAGE_ADLS_SOURCES})
target_link_libraries(azure_storage_adls azure-storage-lite)
target_include_directories(azure-storage-adls PUBLIC include)
target_link_libraries(azure-storage-adls azure-storage-lite)
if(BUILD_TESTS)
set(AZURE_STORAGE_ADLS_TEST_SOURCES
test/adls_test_base.h
test/client_test.cpp
test/filesystem_test.cpp
test/directory_test.cpp
test/file_test.cpp
)
string(REGEX REPLACE "([^;]+)" "${CMAKE_CURRENT_SOURCE_DIR}/\\1" AZURE_STORAGE_ADLS_TEST_SOURCES "${AZURE_STORAGE_ADLS_TEST_SOURCES}")
set(AZURE_STORAGE_ADLS_TEST_SOURCES ${AZURE_STORAGE_ADLS_TEST_SOURCES} PARENT_SCOPE)
endif()

Просмотреть файл

@ -1,73 +1,102 @@
#pragma once
#ifdef _WIN32
#ifdef azure_storage_adls_EXPORTS
#define AZURE_STORAGE_ADLS_API __declspec(dllexport)
#else
#define AZURE_STORAGE_ADLS_API __declspec(dllimport)
#endif
#else /* ifdef _WIN32 */
#define AZURE_STORAGE_ADLS_API
#endif
#include <stdexcept>
#include "blob/blob_client.h"
#include "set_access_control_request.h"
#include "list_paths_request.h"
namespace azure { namespace storage_adls {
using storage_account = azure::storage_lite::storage_account;
using executor_context = azure::storage_lite::executor_context;
struct list_filesystem_item
struct list_filesystems_item
{
std::string name;
// Other properties
};
struct list_filesystem_result
struct list_filesystems_result
{
std::vector<list_filesystem_item> filesystems;
std::vector<list_filesystems_item> filesystems;
std::string continuation_token;
};
struct list_path_item
struct storage_exception : public std::exception
{
std::string name;
// Other properties
};
storage_exception(int code, std::string code_name, std::string message) : code(code), code_name(std::move(code_name)), message(std::move(message)) {}
struct list_path_result
{
std::vector<list_path_item> paths;
std::string continuation_token;
int code;
std::string code_name;
std::string message;
};
class adls_client final
{
public:
adls_client(std::shared_ptr<storage_account> account, int max_concurrency) : m_account(account) {}
AZURE_STORAGE_ADLS_API adls_client(std::shared_ptr<storage_account> account, int max_concurrency, bool exception_enabled = true);
void create_filesystem(const std::string& filesystem);
void delete_filesystem(const std::string& filesystem);
bool filesystem_exists(const std::string& filesystem);
void set_filesystem_properties(const std::string& filesystem, const std::vector<std::pair<std::string, std::string>>& properties);
std::vector<std::pair<std::string, std::string>> get_filesystem_properties(const std::string& filesystem);
list_filesystem_result list_filesystems_segmented(const std::string& prefix, const std::string& continuation_token = std::string(), const int max_results = 5);
AZURE_STORAGE_ADLS_API void create_filesystem(const std::string& filesystem);
AZURE_STORAGE_ADLS_API void delete_filesystem(const std::string& filesystem);
AZURE_STORAGE_ADLS_API bool filesystem_exists(const std::string& filesystem);
AZURE_STORAGE_ADLS_API void set_filesystem_properties(const std::string& filesystem, const std::vector<std::pair<std::string, std::string>>& properties);
AZURE_STORAGE_ADLS_API std::vector<std::pair<std::string, std::string>> get_filesystem_properties(const std::string& filesystem);
AZURE_STORAGE_ADLS_API list_filesystems_result list_filesystems_segmented(const std::string& prefix, const std::string& continuation_token = std::string(), const int max_results = 0);
void create_directory(const std::string& filesystem, const std::string& directory);
void delete_directory(const std::string& filesystem, const std::string& directory);
bool directory_exists(const std::string& filesystem, const std::string& directory);
void set_directory_properties(const std::string& filesystem, const std::string& directory, const std::vector<std::pair<std::string, std::string>>& properties);
std::vector<std::pair<std::string, std::string>> get_directory_properties(const std::string& filesystem, const std::string& directory);
void set_directory_access_control(const std::string& filesystem, const std::string& directory, const std::string& acl);
std::string get_directory_access_control(const std::string& filesystem, const std::string& directory);
list_path_result list_paths_segmented(const std::string& filesystem, const std::string& directory, bool recursive = false, const std::string& continuation_token = std::string(), const int max_results = 5);
AZURE_STORAGE_ADLS_API void create_directory(const std::string& filesystem, const std::string& directory);
AZURE_STORAGE_ADLS_API void delete_directory(const std::string& filesystem, const std::string& directory);
AZURE_STORAGE_ADLS_API bool directory_exists(const std::string& filesystem, const std::string& directory);
AZURE_STORAGE_ADLS_API void move_directory(const std::string& filesystem, const std::string& source_path, const std::string& destination_path);
AZURE_STORAGE_ADLS_API void move_directory(const std::string& source_filesystem, const std::string& source_path, const std::string& destination_filesystem, const std::string& destination_path);
AZURE_STORAGE_ADLS_API void set_directory_properties(const std::string& filesystem, const std::string& directory, const std::vector<std::pair<std::string, std::string>>& properties);
AZURE_STORAGE_ADLS_API std::vector<std::pair<std::string, std::string>> get_directory_properties(const std::string& filesystem, const std::string& directory);
AZURE_STORAGE_ADLS_API void set_directory_access_control(const std::string& filesystem, const std::string& directory, const access_control& acl);
AZURE_STORAGE_ADLS_API access_control get_directory_access_control(const std::string& filesystem, const std::string& directory);
AZURE_STORAGE_ADLS_API list_paths_result list_paths_segmented(const std::string& filesystem, const std::string& directory, bool recursive = false, const std::string& continuation_token = std::string(), const int max_results = 0);
void append_data_from_stream(const std::string& filesystem, const std::string& file, size_t offset, std::istream& istream);
void flush_data(const std::string& filesystem, const std::string& file, size_t offset);
void upload_file_from_stream(const std::string& filesystem, const std::string& file, std::istream& in_stream, const std::vector<std::pair<std::string, std::string>>& properties = std::vector<std::pair<std::string, std::string>>());
void download_file_to_stream(const std::string& filesystem, const std::string& file, std::ostream& out_stream);
void download_file_to_stream(const std::string& filesystem, const std::string& file, size_t offset, size_t size, std::ostream& out_stream);
void delete_file(const std::string& filesystem, const std::string& file);
bool file_exists(const std::string& filesystem, const std::string& file);
void set_file_properties(const std::string& filesystem, const std::string& file, const std::vector<std::pair<std::string, std::string>>& properties);
std::vector<std::pair<std::string, std::string>> get_file_properties(const std::string& filesystem, const std::string& file);
void set_file_access_control(const std::string& filesystem, const std::string& file, const std::string& acl);
std::string get_file_access_control(const std::string& filesystem, const std::string& file);
AZURE_STORAGE_ADLS_API void create_file(const std::string& filename, const std::string& file);
AZURE_STORAGE_ADLS_API void append_data_from_stream(const std::string& filesystem, const std::string& file, uint64_t offset, std::istream& in_stream, uint64_t stream_len = 0);
AZURE_STORAGE_ADLS_API void flush_data(const std::string& filesystem, const std::string& file, uint64_t offset);
AZURE_STORAGE_ADLS_API void upload_file_from_stream(const std::string& filesystem, const std::string& file, std::istream& in_stream, const std::vector<std::pair<std::string, std::string>>& properties = std::vector<std::pair<std::string, std::string>>());
AZURE_STORAGE_ADLS_API void download_file_to_stream(const std::string& filesystem, const std::string& file, std::ostream& out_stream);
AZURE_STORAGE_ADLS_API void download_file_to_stream(const std::string& filesystem, const std::string& file, uint64_t offset, uint64_t size, std::ostream& out_stream);
AZURE_STORAGE_ADLS_API void delete_file(const std::string& filesystem, const std::string& file);
AZURE_STORAGE_ADLS_API bool file_exists(const std::string& filesystem, const std::string& file);
AZURE_STORAGE_ADLS_API void move_file(const std::string& filesystem, const std::string& source_path, const std::string& destination_path);
AZURE_STORAGE_ADLS_API void move_file(const std::string& source_filesystem, const std::string& source_path, const std::string& destination_filesystem, const std::string& destination_path);
AZURE_STORAGE_ADLS_API void set_file_properties(const std::string& filesystem, const std::string& file, const std::vector<std::pair<std::string, std::string>>& properties);
AZURE_STORAGE_ADLS_API std::vector<std::pair<std::string, std::string>> get_file_properties(const std::string& filesystem, const std::string& file);
AZURE_STORAGE_ADLS_API void set_file_access_control(const std::string& filesystem, const std::string& file, const access_control& acl);
AZURE_STORAGE_ADLS_API access_control get_file_access_control(const std::string& filesystem, const std::string& file);
bool exception_enabled() const
{
return m_exception_enabled;
}
private:
template<class RET, class FUNC>
RET blob_client_adaptor(FUNC func);
bool success() const
{
return !(!m_exception_enabled && errno != 0);
}
private:
std::shared_ptr<azure::storage_lite::blob_client_wrapper> m_blob_client;
std::shared_ptr<azure::storage_lite::blob_client> m_blob_client;
std::shared_ptr<storage_account> m_account;
std::shared_ptr<executor_context> m_context;
const bool m_exception_enabled;
};
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,16 @@
#pragma once
#include "storage_request_base.h"
#include "storage_account.h"
#include "constants.h"
namespace azure { namespace storage_adls {
using storage_account = azure::storage_lite::storage_account;
using http_base = azure::storage_lite::http_base;
class adls_request_base : public azure::storage_lite::storage_request_base
{
};
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,19 @@
#pragma once
#include "adls_request_base.h"
namespace azure { namespace storage_adls {
class append_data_request final : public adls_request_base
{
public:
append_data_request(std::string filesystem, std::string file, uint64_t offset, uint64_t length) : m_filesystem(std::move(filesystem)), m_file(std::move(file)), m_offset(offset), m_length(length) {}
void build_request(const storage_account& account, http_base& http) const override;
private:
std::string m_filesystem;
std::string m_file;
uint64_t m_offset;
uint64_t m_length;
};
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,18 @@
#pragma once
#include "adls_request_base.h"
namespace azure { namespace storage_adls {
class create_directory_request final : public adls_request_base
{
public:
create_directory_request(std::string filesystem, std::string directory) : m_filesystem(std::move(filesystem)), m_directory(std::move(directory)) {}
void build_request(const storage_account& account, http_base& http) const override;
private:
std::string m_filesystem;
std::string m_directory;
};
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,18 @@
#pragma once
#include "adls_request_base.h"
namespace azure { namespace storage_adls {
class create_file_request final : public adls_request_base
{
public:
create_file_request(std::string filesystem, std::string file) : m_filesystem(std::move(filesystem)), m_file(std::move(file)) {}
void build_request(const storage_account& account, http_base& http) const override;
private:
std::string m_filesystem;
std::string m_file;
};
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,19 @@
#pragma once
#include "adls_request_base.h"
namespace azure { namespace storage_adls {
class delete_directory_request final : public adls_request_base
{
public:
delete_directory_request(std::string filesystem, std::string directory, std::string continuation) : m_filesystem(std::move(filesystem)), m_directory(std::move(directory)), m_continuation(std::move(continuation)) {}
void build_request(const storage_account& account, http_base& http) const override;
private:
std::string m_filesystem;
std::string m_directory;
std::string m_continuation;
};
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,19 @@
#pragma once
#include "adls_request_base.h"
namespace azure { namespace storage_adls {
class flush_data_request final : public adls_request_base
{
public:
flush_data_request(std::string filesystem, std::string file, uint64_t offset) : m_filesystem(std::move(filesystem)), m_file(std::move(file)), m_offset(offset) {}
void build_request(const storage_account& account, http_base& http) const override;
private:
std::string m_filesystem;
std::string m_file;
uint64_t m_offset;
};
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,19 @@
#pragma once
#include "adls_request_base.h"
#include "set_access_control_request.h"
namespace azure { namespace storage_adls {
class get_access_control_request final : public adls_request_base
{
public:
get_access_control_request(std::string filesystem, std::string path) : m_filesystem(std::move(filesystem)), m_path(std::move(path)) {}
void build_request(const storage_account& account, http_base& http) const override;
private:
std::string m_filesystem;
std::string m_path;
};
}} // azure::storage_adls

22875
adls/include/json.hpp Normal file

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -0,0 +1,38 @@
#pragma once
#include "adls_request_base.h"
#include "set_access_control_request.h"
namespace azure { namespace storage_adls {
struct list_paths_item
{
std::string name;
std::string etag;
uint64_t content_length;
std::string last_modified;
access_control acl;
bool is_directory;
};
struct list_paths_result
{
std::vector<list_paths_item> paths;
std::string continuation_token;
};
class list_paths_request final : public adls_request_base
{
public:
list_paths_request(std::string filesystem, std::string directory, bool recursive, std::string continuation, int max_results) : m_filesystem(std::move(filesystem)), m_directory(std::move(directory)), m_recursive(recursive), m_continuation(std::move(continuation)), m_max_results(max_results) {}
void build_request(const storage_account& account, http_base& http) const override;
private:
std::string m_filesystem;
std::string m_directory;
bool m_recursive;
std::string m_continuation;
int m_max_results;
};
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,27 @@
#pragma once
#include <string>
#include <vector>
#include "json_parser_base.h"
#include "list_paths_request.h"
namespace azure { namespace storage_adls {
class nlohmann_json_parser final : public azure::storage_lite::json_parser_base
{
public:
static std::vector<list_paths_item> parse_list_paths_response(const std::string& json);
};
}} // azure::storage_adls
namespace azure { namespace storage_lite {
template<>
inline std::vector<azure::storage_adls::list_paths_item> json_parser_base::parse_response(const std::string& json) const
{
return azure::storage_adls::nlohmann_json_parser::parse_list_paths_response(json);
}
}} // azure::storage_lite

Просмотреть файл

@ -0,0 +1,20 @@
#pragma once
#include "adls_request_base.h"
namespace azure { namespace storage_adls {
class rename_file_request final : public adls_request_base
{
public:
rename_file_request(std::string source_filesystem, std::string source_path, std::string destination_filesystem, std::string destination_path) : m_source_filesystem(std::move(source_filesystem)), m_source_path(std::move(source_path)), m_destination_filesystem(std::move(destination_filesystem)), m_destination_path(std::move(destination_path)) {}
void build_request(const storage_account& account, http_base& http) const override;
private:
std::string m_source_filesystem;
std::string m_source_path;
std::string m_destination_filesystem;
std::string m_destination_path;
};
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,27 @@
#pragma once
#include "adls_request_base.h"
namespace azure { namespace storage_adls {
struct access_control
{
std::string owner;
std::string group;
std::string permissions;
std::string acl;
};
class set_access_control_request final : public adls_request_base
{
public:
set_access_control_request(std::string filesystem, std::string path, access_control acl) : m_filesystem(std::move(filesystem)), m_path(std::move(path)), m_acl(std::move(acl)) {}
void build_request(const storage_account& account, http_base& http) const override;
private:
std::string m_filesystem;
std::string m_path;
access_control m_acl;
};
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,391 @@
#include "adls_client.h"
#include "logging.h"
#include "storage_errno.h"
#include "nlohmann_json_parser.h"
#include "create_directory_request.h"
#include "delete_directory_request.h"
#include "get_access_control_request.h"
#include "create_file_request.h"
#include "rename_file_request.h"
#include "append_data_request.h"
#include "flush_data_request.h"
#include <cerrno>
#include <functional>
namespace azure { namespace storage_adls {
using azure::storage_lite::storage_outcome;
namespace constants = azure::storage_lite::constants;
adls_client::adls_client(std::shared_ptr<storage_account> account, int max_concurrency, bool exception_enabled) : m_account(account), m_blob_client(std::make_shared<azure::storage_lite::blob_client>(account, max_concurrency)), m_context(m_blob_client->context()), m_exception_enabled(exception_enabled)
{
m_context->set_json_parser(std::make_shared<nlohmann_json_parser>());
}
template<class RET, class FUNC>
RET adls_client::blob_client_adaptor(FUNC func)
{
try
{
storage_outcome<RET> result = func().get();
if (result.success() && !m_exception_enabled)
{
errno = 0;
}
else if (!result.success())
{
int error_code = std::stoi(result.error().code);
if (m_exception_enabled)
{
throw storage_exception(error_code, result.error().code_name, result.error().message);
}
else
{
azure::storage_lite::logger::error(result.error().code_name + ": " + result.error().message);
errno = error_code;
}
}
return result.response();
}
catch (std::exception& e)
{
if (m_exception_enabled)
{
throw;
}
else
{
azure::storage_lite::logger::error("Unknown failure: %s", e.what());
errno = unknown_error;
}
}
return RET();
}
void adls_client::create_filesystem(const std::string& filesystem)
{
return blob_client_adaptor<void>(std::bind(&azure::storage_lite::blob_client::create_container, m_blob_client, filesystem));
}
void adls_client::delete_filesystem(const std::string& filesystem)
{
return blob_client_adaptor<void>(std::bind(&azure::storage_lite::blob_client::delete_container, m_blob_client, filesystem));
}
bool adls_client::filesystem_exists(const std::string& filesystem)
{
bool container_exists = false;
try
{
auto container_properties = blob_client_adaptor<azure::storage_lite::container_property>(std::bind(&azure::storage_lite::blob_client::get_container_properties, m_blob_client, filesystem));
container_exists = container_properties.valid();
}
catch (const storage_exception& e)
{
if (e.code != 404)
{
throw;
}
}
if (!success() && errno == 404)
{
errno = 0;
}
return container_exists;
}
void adls_client::set_filesystem_properties(const std::string& filesystem, const std::vector<std::pair<std::string, std::string>>& properties)
{
return blob_client_adaptor<void>(std::bind(&azure::storage_lite::blob_client::set_container_metadata, m_blob_client, filesystem, properties));
}
std::vector<std::pair<std::string, std::string>> adls_client::get_filesystem_properties(const std::string& filesystem)
{
auto container_properties = blob_client_adaptor<azure::storage_lite::container_property>(std::bind(&azure::storage_lite::blob_client::get_container_properties, m_blob_client, filesystem));
std::vector<std::pair<std::string, std::string>> filesystem_properties(std::move(container_properties.metadata));
return filesystem_properties;
}
list_filesystems_result adls_client::list_filesystems_segmented(const std::string& prefix, const std::string& continuation_token, const int max_results)
{
auto containers_segment = blob_client_adaptor<azure::storage_lite::list_constainers_segmented_response>(std::bind(&azure::storage_lite::blob_client::list_containers_segmented, m_blob_client, prefix, continuation_token, max_results, false));
list_filesystems_result result;
for (const auto& container_item : containers_segment.containers)
{
list_filesystems_item fs_item;
fs_item.name = container_item.name;
result.filesystems.emplace_back(std::move(fs_item));
}
result.continuation_token = containers_segment.next_marker;
return result;
}
void adls_client::create_directory(const std::string& filesystem, const std::string& directory)
{
auto http = m_blob_client->client()->get_handle();
auto request = std::make_shared<create_directory_request>(filesystem, directory);
auto async_func = std::bind(&azure::storage_lite::async_executor<void>::submit, m_account, request, http, m_context);
return blob_client_adaptor<void>(async_func);
}
void adls_client::delete_directory(const std::string& filesystem, const std::string& directory)
{
auto http = m_blob_client->client()->get_handle();
std::string continuation;
while (true)
{
auto request = std::make_shared<delete_directory_request>(filesystem, directory, continuation);
auto async_func = std::bind(&azure::storage_lite::async_executor<void>::submit, m_account, request, http, m_context);
blob_client_adaptor<void>(async_func);
if (!success())
{
return;
}
continuation = http->get_header(constants::header_ms_continuation);
if (continuation.empty())
{
break;
}
}
}
bool adls_client::directory_exists(const std::string& filesystem, const std::string& directory)
{
bool blob_exists = false;
try
{
auto blob_properties = blob_client_adaptor<azure::storage_lite::blob_property>(std::bind(&azure::storage_lite::blob_client::get_blob_properties, m_blob_client, filesystem, directory));
blob_exists = blob_properties.valid();
}
catch (const storage_exception& e)
{
if (e.code != 404)
{
throw;
}
}
if (!success() && errno == 404)
{
errno = 0;
}
return blob_exists;
}
void adls_client::move_directory(const std::string& filesystem, const std::string& source_path, const std::string& destination_path)
{
return move_directory(filesystem, source_path, filesystem, destination_path);
}
void adls_client::move_directory(const std::string& source_filesystem, const std::string& source_path, const std::string& destination_filesystem, const std::string& destination_path)
{
auto http = m_blob_client->client()->get_handle();
// Currently we haven't seen any difference between moving a directory and a file, so we'll just use rename_file_request.
auto request = std::make_shared<rename_file_request>(source_filesystem, source_path, destination_filesystem, destination_path);
auto async_func = std::bind(&azure::storage_lite::async_executor<void>::submit, m_account, request, http, m_context);
return blob_client_adaptor<void>(async_func);
}
void adls_client::set_directory_properties(const std::string& filesystem, const std::string& directory, const std::vector<std::pair<std::string, std::string>>& properties)
{
return blob_client_adaptor<void>(std::bind(&azure::storage_lite::blob_client::set_blob_metadata, m_blob_client, filesystem, directory, properties));
}
std::vector<std::pair<std::string, std::string>> adls_client::get_directory_properties(const std::string& filesystem, const std::string& directory)
{
auto blob_properties = blob_client_adaptor<azure::storage_lite::blob_property>(std::bind(&azure::storage_lite::blob_client::get_blob_properties, m_blob_client, filesystem, directory));
std::vector<std::pair<std::string, std::string>> directory_properties(std::move(blob_properties.metadata));
auto ite = std::find_if(directory_properties.begin(), directory_properties.end(), [](const std::pair<std::string, std::string>& p)
{
return p.first == constants::header_ms_meta_hdi_isfoler + constants::header_ms_meta_prefix_size;
});
if (ite != directory_properties.end())
{
directory_properties.erase(ite);
}
return directory_properties;
}
void adls_client::set_directory_access_control(const std::string& filesystem, const std::string& directory, const access_control& acl)
{
auto http = m_blob_client->client()->get_handle();
auto request = std::make_shared<set_access_control_request>(filesystem, directory, acl);
auto async_func = std::bind(&azure::storage_lite::async_executor<void>::submit, m_account, request, http, m_context);
return blob_client_adaptor<void>(async_func);
}
access_control adls_client::get_directory_access_control(const std::string& filesystem, const std::string& directory)
{
auto http = m_blob_client->client()->get_handle();
auto request = std::make_shared<get_access_control_request>(filesystem, directory);
auto async_func = std::bind(&azure::storage_lite::async_executor<void>::submit, m_account, request, http, m_context);
blob_client_adaptor<void>(async_func);
access_control acl;
if (success())
{
acl.owner = http->get_header(constants::header_ms_owner);
acl.group = http->get_header(constants::header_ms_group);
acl.permissions = http->get_header(constants::header_ms_permissions);
acl.acl = http->get_header(constants::header_ms_acl);
}
return acl;
}
list_paths_result adls_client::list_paths_segmented(const std::string& filesystem, const std::string& directory, bool recursive, const std::string& continuation_token, const int max_results)
{
auto http = m_blob_client->client()->get_handle();
auto request = std::make_shared<list_paths_request>(filesystem, directory, recursive, continuation_token, max_results);
auto async_func = std::bind(&azure::storage_lite::async_executor<std::vector<list_paths_item>>::submit, m_account, request, http, m_context);
list_paths_result result;
result.paths = blob_client_adaptor<std::vector<list_paths_item>>(async_func);
result.continuation_token = http->get_header(constants::header_ms_continuation);
return result;
}
void adls_client::create_file(const std::string& filesystem, const std::string& file)
{
auto http = m_blob_client->client()->get_handle();
auto request = std::make_shared<create_file_request>(filesystem, file);
auto async_func = std::bind(&azure::storage_lite::async_executor<void>::submit, m_account, request, http, m_context);
return blob_client_adaptor<void>(async_func);
}
void adls_client::append_data_from_stream(const std::string& filesystem, const std::string& file, uint64_t offset, std::istream& in_stream, uint64_t stream_len)
{
if (stream_len == 0)
{
uint64_t cur = in_stream.tellg();
in_stream.seekg(0, std::ios_base::end);
uint64_t end = in_stream.tellg();
in_stream.seekg(cur);
stream_len = end - cur;
}
auto http = m_blob_client->client()->get_handle();
auto request = std::make_shared<append_data_request>(filesystem, file, offset, stream_len);
http->set_input_stream(azure::storage_lite::storage_istream(in_stream));
http->set_is_input_length_known();
http->set_input_content_length(stream_len);
auto async_func = std::bind(&azure::storage_lite::async_executor<void>::submit, m_account, request, http, m_context);
return blob_client_adaptor<void>(async_func);
}
void adls_client::flush_data(const std::string& filesystem, const std::string& file, uint64_t offset)
{
auto http = m_blob_client->client()->get_handle();
auto request = std::make_shared<flush_data_request>(filesystem, file, offset);
auto async_func = std::bind(&azure::storage_lite::async_executor<void>::submit, m_account, request, http, m_context);
return blob_client_adaptor<void>(async_func);
}
void adls_client::upload_file_from_stream(const std::string& filesystem, const std::string& file, std::istream& in_stream, const std::vector<std::pair<std::string, std::string>>& properties)
{
auto blob_client = m_blob_client;
auto async_task = [blob_client, filesystem, file, &in_stream, properties]()
{
return blob_client->upload_block_blob_from_stream(filesystem, file, in_stream, properties);
};
return blob_client_adaptor<void>(async_task);
}
void adls_client::download_file_to_stream(const std::string& filesystem, const std::string& file, std::ostream& out_stream)
{
return download_file_to_stream(filesystem, file, 0, 0, out_stream);
}
void adls_client::download_file_to_stream(const std::string& filesystem, const std::string& file, uint64_t offset, uint64_t size, std::ostream& out_stream)
{
// std::bind doesn't seem to work since download_blob_to_stream is an overloaded function.
auto blob_client = m_blob_client;
auto async_task = [blob_client, filesystem, file, offset, size, &out_stream]()
{
return blob_client->download_blob_to_stream(filesystem, file, offset, size, out_stream);
};
return blob_client_adaptor<void>(async_task);
}
void adls_client::delete_file(const std::string& filesystem, const std::string& file)
{
return blob_client_adaptor<void>(std::bind(&azure::storage_lite::blob_client::delete_blob, m_blob_client, filesystem, file, false));
}
bool adls_client::file_exists(const std::string& filesystem, const std::string& file)
{
bool blob_exists = false;
try
{
auto blob_properties = blob_client_adaptor<azure::storage_lite::blob_property>(std::bind(&azure::storage_lite::blob_client::get_blob_properties, m_blob_client, filesystem, file));
blob_exists = blob_properties.valid();
}
catch (const storage_exception& e)
{
if (e.code != 404)
{
throw;
}
}
if (!success() && errno == 404)
{
errno = 0;
}
return blob_exists;
}
void adls_client::move_file(const std::string& filesystem, const std::string& source_path, const std::string& destination_path)
{
return move_file(filesystem, source_path, filesystem, destination_path);
}
void adls_client::move_file(const std::string& source_filesystem, const std::string& source_path, const std::string& destination_filesystem, const std::string& destination_path)
{
auto http = m_blob_client->client()->get_handle();
auto request = std::make_shared<rename_file_request>(source_filesystem, source_path, destination_filesystem, destination_path);
auto async_func = std::bind(&azure::storage_lite::async_executor<void>::submit, m_account, request, http, m_context);
return blob_client_adaptor<void>(async_func);
}
void adls_client::set_file_properties(const std::string& filesystem, const std::string& file, const std::vector<std::pair<std::string, std::string>>& properties)
{
return blob_client_adaptor<void>(std::bind(&azure::storage_lite::blob_client::set_blob_metadata, m_blob_client, filesystem, file, properties));
}
std::vector<std::pair<std::string, std::string>> adls_client::get_file_properties(const std::string& filesystem, const std::string& file)
{
auto blob_properties = blob_client_adaptor<azure::storage_lite::blob_property>(std::bind(&azure::storage_lite::blob_client::get_blob_properties, m_blob_client, filesystem, file));
std::vector<std::pair<std::string, std::string>> file_properties(std::move(blob_properties.metadata));
return file_properties;
}
void adls_client::set_file_access_control(const std::string& filesystem, const std::string& file, const access_control& acl)
{
auto http = m_blob_client->client()->get_handle();
auto request = std::make_shared<set_access_control_request>(filesystem, file, acl);
auto async_func = std::bind(&azure::storage_lite::async_executor<void>::submit, m_account, request, http, m_context);
return blob_client_adaptor<void>(async_func);
}
access_control adls_client::get_file_access_control(const std::string& filesystem, const std::string& file)
{
auto http = m_blob_client->client()->get_handle();
auto request = std::make_shared<get_access_control_request>(filesystem, file);
auto async_func = std::bind(&azure::storage_lite::async_executor<void>::submit, m_account, request, http, m_context);
blob_client_adaptor<void>(async_func);
access_control acl;
if (success())
{
acl.owner = http->get_header(constants::header_ms_owner);
acl.group = http->get_header(constants::header_ms_group);
acl.permissions = http->get_header(constants::header_ms_permissions);
acl.acl = http->get_header(constants::header_ms_acl);
}
return acl;
}
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,30 @@
#include "append_data_request.h"
#include "http_base.h"
#include "storage_url.h"
#include "utility.h"
namespace azure { namespace storage_adls {
void append_data_request::build_request(const storage_account& account, http_base& http) const
{
using namespace azure::storage_lite;
http.set_method(http_base::http_method::patch);
storage_url url = account.get_url(storage_account::service::adls);
url.append_path(m_filesystem).append_path(m_file);
url.add_query(constants::query_action, constants::query_action_append);
url.add_query(constants::query_position, std::to_string(m_offset));
http.set_url(url.to_string());
storage_headers headers;
add_content_length(http, headers, m_length);
http.add_header(constants::header_user_agent, constants::header_value_user_agent);
add_ms_header(http, headers, constants::header_ms_date, get_ms_date(date_format::rfc_1123));
add_ms_header(http, headers, constants::header_ms_version, constants::header_value_storage_version);
account.credential()->sign_request(*this, http, url, headers);
}
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,29 @@
#include "create_directory_request.h"
#include "http_base.h"
#include "storage_url.h"
#include "utility.h"
namespace azure { namespace storage_adls {
void create_directory_request::build_request(const storage_account& account, http_base& http) const
{
using namespace azure::storage_lite;
http.set_method(http_base::http_method::put);
storage_url url = account.get_url(storage_account::service::adls);
url.append_path(m_filesystem).append_path(m_directory);
url.add_query(constants::query_resource, constants::query_resource_directory);
http.set_url(url.to_string());
storage_headers headers;
add_content_length(http, headers, 0);
http.add_header(constants::header_user_agent, constants::header_value_user_agent);
add_ms_header(http, headers, constants::header_ms_date, get_ms_date(date_format::rfc_1123));
add_ms_header(http, headers, constants::header_ms_version, constants::header_value_storage_version);
account.credential()->sign_request(*this, http, url, headers);
}
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,29 @@
#include "create_file_request.h"
#include "http_base.h"
#include "storage_url.h"
#include "utility.h"
namespace azure { namespace storage_adls {
void create_file_request::build_request(const storage_account& account, http_base& http) const
{
using namespace azure::storage_lite;
http.set_method(http_base::http_method::put);
storage_url url = account.get_url(storage_account::service::adls);
url.append_path(m_filesystem).append_path(m_file);
url.add_query(constants::query_resource, constants::query_resource_file);
http.set_url(url.to_string());
storage_headers headers;
add_content_length(http, headers, 0);
http.add_header(constants::header_user_agent, constants::header_value_user_agent);
add_ms_header(http, headers, constants::header_ms_date, get_ms_date(date_format::rfc_1123));
add_ms_header(http, headers, constants::header_ms_version, constants::header_value_storage_version);
account.credential()->sign_request(*this, http, url, headers);
}
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,29 @@
#include "delete_directory_request.h"
#include "http_base.h"
#include "storage_url.h"
#include "utility.h"
namespace azure { namespace storage_adls {
void delete_directory_request::build_request(const storage_account& account, http_base& http) const
{
using namespace azure::storage_lite;
http.set_method(http_base::http_method::del);
storage_url url = account.get_url(storage_account::service::adls);
url.append_path(m_filesystem).append_path(m_directory);
url.add_query(constants::query_recursive, "true");
add_optional_query(url, constants::query_continuation, m_continuation);
http.set_url(url.to_string());
storage_headers headers;
http.add_header(constants::header_user_agent, constants::header_value_user_agent);
add_ms_header(http, headers, constants::header_ms_date, get_ms_date(date_format::rfc_1123));
add_ms_header(http, headers, constants::header_ms_version, constants::header_value_storage_version);
account.credential()->sign_request(*this, http, url, headers);
}
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,30 @@
#include "flush_data_request.h"
#include "http_base.h"
#include "storage_url.h"
#include "utility.h"
namespace azure { namespace storage_adls {
void flush_data_request::build_request(const storage_account& account, http_base& http) const
{
using namespace azure::storage_lite;
http.set_method(http_base::http_method::patch);
storage_url url = account.get_url(storage_account::service::adls);
url.append_path(m_filesystem).append_path(m_file);
url.add_query(constants::query_action, constants::query_action_flush);
url.add_query(constants::query_position, std::to_string(m_offset));
http.set_url(url.to_string());
storage_headers headers;
add_content_length(http, headers, 0);
http.add_header(constants::header_user_agent, constants::header_value_user_agent);
add_ms_header(http, headers, constants::header_ms_date, get_ms_date(date_format::rfc_1123));
add_ms_header(http, headers, constants::header_ms_version, constants::header_value_storage_version);
account.credential()->sign_request(*this, http, url, headers);
}
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,28 @@
#include "get_access_control_request.h"
#include "http_base.h"
#include "storage_url.h"
#include "utility.h"
namespace azure { namespace storage_adls {
void get_access_control_request::build_request(const storage_account& account, http_base& http) const
{
using namespace azure::storage_lite;
http.set_method(http_base::http_method::head);
storage_url url = account.get_url(storage_account::service::adls);
url.append_path(m_filesystem).append_path(m_path);
url.add_query(constants::query_action, constants::query_action_get_access_control);
http.set_url(url.to_string());
storage_headers headers;
http.add_header(constants::header_user_agent, constants::header_value_user_agent);
add_ms_header(http, headers, constants::header_ms_date, get_ms_date(date_format::rfc_1123));
add_ms_header(http, headers, constants::header_ms_version, constants::header_value_storage_version);
account.credential()->sign_request(*this, http, url, headers);
}
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,32 @@
#include "list_paths_request.h"
#include "http_base.h"
#include "storage_url.h"
#include "utility.h"
namespace azure { namespace storage_adls {
void list_paths_request::build_request(const storage_account& account, http_base& http) const
{
using namespace azure::storage_lite;
http.set_method(http_base::http_method::get);
storage_url url = account.get_url(storage_account::service::adls);
url.append_path(m_filesystem);
url.add_query(constants::query_resource, constants::query_resource_filesystem);
url.add_query(constants::query_resource_directory, m_directory);
url.add_query(constants::query_recursive, m_recursive ? "true" : "false");
add_optional_query(url, constants::query_continuation, m_continuation);
add_optional_query(url, constants::query_maxResults, m_max_results);
http.set_url(url.to_string());
storage_headers headers;
http.add_header(constants::header_user_agent, constants::header_value_user_agent);
add_ms_header(http, headers, constants::header_ms_date, get_ms_date(date_format::rfc_1123));
add_ms_header(http, headers, constants::header_ms_version, constants::header_value_storage_version);
account.credential()->sign_request(*this, http, url, headers);
}
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,29 @@
#include "json.hpp"
#include "nlohmann_json_parser.h"
#include "set_access_control_request.h"
namespace azure { namespace storage_adls {
std::vector<list_paths_item> nlohmann_json_parser::parse_list_paths_response(const std::string& response)
{
auto json_object = nlohmann::json::parse(response);
std::vector<azure::storage_adls::list_paths_item> paths;
for (const auto& path_element : json_object["paths"])
{
list_paths_item path_item;
path_item.name = path_element["name"].get<std::string>();
path_item.content_length = std::stoull(path_element["contentLength"].get<std::string>());
path_item.etag = path_element["etag"].get<std::string>();
path_item.last_modified = path_element["lastModified"].get<std::string>();
path_item.acl.owner = path_element["owner"].get<std::string>();
path_item.acl.group = path_element["group"].get<std::string>();
path_item.acl.permissions = path_element["permissions"].get<std::string>();
path_item.is_directory = path_element.count("isDirectory") && path_element["isDirectory"].get<std::string>() == "true";
paths.emplace_back(std::move(path_item));
}
return paths;
}
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,29 @@
#include "rename_file_request.h"
#include "http_base.h"
#include "storage_url.h"
#include "utility.h"
namespace azure { namespace storage_adls {
void rename_file_request::build_request(const storage_account& account, http_base& http) const
{
using namespace azure::storage_lite;
http.set_method(http_base::http_method::put);
storage_url url = account.get_url(storage_account::service::adls);
url.append_path(m_destination_filesystem).append_path(m_destination_path);
http.set_url(url.to_string());
storage_headers headers;
add_content_length(http, headers, 0);
http.add_header(constants::header_user_agent, constants::header_value_user_agent);
add_ms_header(http, headers, constants::header_ms_date, get_ms_date(date_format::rfc_1123));
add_ms_header(http, headers, constants::header_ms_version, constants::header_value_storage_version);
add_ms_header(http, headers, constants::header_ms_rename_source, encode_url_path("/" + m_source_filesystem + "/" + m_source_path));
account.credential()->sign_request(*this, http, url, headers);
}
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,33 @@
#include "set_access_control_request.h"
#include "http_base.h"
#include "storage_url.h"
#include "utility.h"
namespace azure { namespace storage_adls {
void set_access_control_request::build_request(const storage_account& account, http_base& http) const
{
using namespace azure::storage_lite;
http.set_method(http_base::http_method::patch);
storage_url url = account.get_url(storage_account::service::adls);
url.append_path(m_filesystem).append_path(m_path);
url.add_query(constants::query_action, constants::query_action_set_access_control);
http.set_url(url.to_string());
storage_headers headers;
add_content_length(http, headers, 0);
http.add_header(constants::header_user_agent, constants::header_value_user_agent);
add_ms_header(http, headers, constants::header_ms_date, get_ms_date(date_format::rfc_1123));
add_ms_header(http, headers, constants::header_ms_version, constants::header_value_storage_version);
add_ms_header(http, headers, constants::header_ms_owner, m_acl.owner, true);
add_ms_header(http, headers, constants::header_ms_group, m_acl.group, true);
add_ms_header(http, headers, constants::header_ms_permissions, m_acl.permissions, true);
add_ms_header(http, headers, constants::header_ms_acl, m_acl.acl, true);
account.credential()->sign_request(*this, http, url, headers);
}
}} // azure::storage_adls

Просмотреть файл

@ -0,0 +1,37 @@
#pragma once
#include "../test/test_base.h"
#include "adls_client.h"
#include <cerrno>
namespace as_test {
class adls_base : public base
{
public:
static azure::storage_adls::adls_client test_adls_client(bool exception_enabled)
{
azure::storage_adls::adls_client client(init_account(standard_storage_connection_string()), 1, exception_enabled);
return client;
}
static const std::string& standard_storage_connection_string()
{
static std::string sscs = "DefaultEndpointsProtocol=https;";
return sscs;
}
static std::string create_random_filesystem(azure::storage_adls::adls_client& client)
{
std::string fs_name = as_test::get_random_string(10);
client.create_filesystem(fs_name);
if (!client.exception_enabled() && errno != 0)
{
fs_name.clear();
}
return fs_name;
}
};
} // as_test

48
adls/test/client_test.cpp Normal file
Просмотреть файл

@ -0,0 +1,48 @@
#include "catch2/catch.hpp"
#include "adls_client.h"
#include "adls_test_base.h"
TEST_CASE("Client General", "[adls][client]")
{
for (bool exception_enabled : {true, false})
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(exception_enabled);
REQUIRE(client.exception_enabled() == exception_enabled);
}
}
TEST_CASE("Client Throw Exception", "[adls][client]")
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(true);
bool exception_caught = false;
try
{
client.delete_filesystem(as_test::get_random_string(10));
}
catch (const std::exception&)
{
exception_caught = true;
}
REQUIRE(exception_caught);
}
TEST_CASE("Client Errno", "[adls][client]")
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(false);
bool exception_caught = false;
int error_code = 0;
try
{
client.delete_filesystem(as_test::get_random_string(10));
error_code = errno;
}
catch (const std::exception&)
{
exception_caught = true;
}
REQUIRE_FALSE(exception_caught);
REQUIRE(error_code != 0);
}

Просмотреть файл

@ -0,0 +1,220 @@
#include "catch2/catch.hpp"
#include "adls_client.h"
#include "adls_test_base.h"
TEST_CASE("Create Directory", "[adls][directory]")
{
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(false);
std::string fs_name = as_test::adls_base::create_random_filesystem(client);
std::string dir_name = as_test::get_random_string(10);
std::string dir_name2 = dir_name + "/" + as_test::get_random_string(10);
client.create_directory(fs_name, dir_name);
REQUIRE(errno == 0);
REQUIRE(client.directory_exists(fs_name, dir_name));
client.create_directory(fs_name, dir_name2);
REQUIRE(errno == 0);
REQUIRE(client.directory_exists(fs_name, dir_name2));
client.delete_directory(fs_name, dir_name);
REQUIRE(errno == 0);
REQUIRE_FALSE(client.directory_exists(fs_name, dir_name));
REQUIRE_FALSE(client.directory_exists(fs_name, dir_name2));
client.delete_filesystem(fs_name);
}
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(true);
std::string fs_name = as_test::adls_base::create_random_filesystem(client);
std::string dir_name = as_test::get_random_string(10);
REQUIRE_THROWS_AS(client.delete_directory(fs_name, dir_name), std::exception);
REQUIRE_FALSE(client.directory_exists(fs_name, dir_name));
REQUIRE(errno == 0);
client.delete_filesystem(fs_name);
}
}
TEST_CASE("Recursive Delete Directory", "[adls][directory]")
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(false);
std::string fs_name = as_test::adls_base::create_random_filesystem(client);
std::string dir_name = as_test::get_random_string(10);
client.create_directory(fs_name, dir_name);
REQUIRE(client.directory_exists(fs_name, dir_name));
int file_num = 100;
std::vector<std::future<bool>> handles;
for (int i = 0; i < file_num; ++i)
{
auto create_file_func = [fs_name, dir_name]()
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(false);
client.create_file(fs_name, dir_name + "/" + as_test::get_random_string(10));
return errno == 0;
};
handles.emplace_back(std::async(std::launch::async, create_file_func));
}
for (auto& handle : handles)
{
REQUIRE(handle.get());
}
client.delete_directory(fs_name, dir_name);
REQUIRE_FALSE(client.directory_exists(fs_name, dir_name));
client.delete_filesystem(fs_name);
}
TEST_CASE("Directory Access Control", "[adls][directory][acl]")
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(false);
std::string fs_name = as_test::adls_base::create_random_filesystem(client);
std::string dir_name = as_test::get_random_string(10);
client.create_directory(fs_name, dir_name);
azure::storage_adls::access_control acl;
acl.acl = "user::rw-,group::rw-,other::r--";
client.set_directory_access_control(fs_name, dir_name, acl);
auto acl2 = client.get_directory_access_control(fs_name, dir_name);
REQUIRE(acl2.owner == "$superuser");
REQUIRE(acl2.group == "$superuser");
REQUIRE(acl2.acl == acl.acl);
acl.acl.clear();
acl.permissions = "0644";
client.set_directory_access_control(fs_name, dir_name, acl);
acl2 = client.get_directory_access_control(fs_name, dir_name);
REQUIRE(acl2.owner == "$superuser");
REQUIRE(acl2.group == "$superuser");
REQUIRE(acl2.permissions == "rw-r--r--");
client.delete_filesystem(fs_name);
}
TEST_CASE("List Paths", "[adls][directory]")
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(false);
std::string fs_name = as_test::adls_base::create_random_filesystem(client);
std::string dir_name = as_test::get_random_string(10);
client.create_directory(fs_name, dir_name);
std::set<std::string> recursive_paths;
std::set<std::string> non_recursive_paths;
for (int i = 0; i < 5; ++i)
{
std::string dir2_name = dir_name + "/" + as_test::get_random_string(10);
client.create_directory(fs_name, dir2_name);
for (int i = 0; i < 5; ++i)
{
std::string dir3_name = dir2_name + "/" + as_test::get_random_string(10);
client.create_directory(fs_name, dir3_name);
recursive_paths.emplace(dir3_name);
}
non_recursive_paths.emplace(dir2_name);
recursive_paths.emplace(dir2_name);
}
for (auto do_recursive : { false, true })
{
std::string continuation;
std::set<std::string> list_paths;
do
{
auto list_result = client.list_paths_segmented(fs_name, dir_name, do_recursive, continuation, 5);
for (auto& p : list_result.paths)
{
REQUIRE(!p.name.empty());
REQUIRE(!p.etag.empty());
REQUIRE(!p.last_modified.empty());
REQUIRE(!p.acl.owner.empty());
REQUIRE(!p.acl.group.empty());
REQUIRE(!p.acl.permissions.empty());
REQUIRE(p.acl.acl.empty());
list_paths.emplace(p.name);
REQUIRE(p.is_directory);
}
continuation = list_result.continuation_token;
} while (!continuation.empty());
REQUIRE(list_paths == (do_recursive ? recursive_paths : non_recursive_paths));
}
client.delete_filesystem(fs_name);
}
TEST_CASE("Move Directory", "[adls][directory]")
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(false);
std::string fs1_name = as_test::adls_base::create_random_filesystem(client);
std::string fs2_name = as_test::adls_base::create_random_filesystem(client);
std::string src_dir = as_test::get_random_string(10);
std::string sub_dir1 = as_test::get_random_string(10);
std::string file1 = sub_dir1 + "/" + as_test::get_random_string(10);
std::string file2 = sub_dir1 + "/" + as_test::get_random_string(10) + "/" + as_test::get_random_string(10);
std::string file3 = as_test::get_random_string(10);
client.create_directory(fs1_name, src_dir);
client.create_file(fs1_name, src_dir + "/" + file1);
client.create_file(fs1_name, src_dir + "/" + file2);
client.create_file(fs1_name, src_dir + "/" + file3);
REQUIRE(errno == 0);
std::string dest_dir = as_test::get_random_string(10);
client.create_directory(fs2_name, dest_dir);
// Move a directory into another directory
client.move_directory(fs1_name, src_dir, fs2_name, dest_dir);
REQUIRE(errno == 0);
REQUIRE_FALSE(client.file_exists(fs1_name, src_dir + "/" + file1));
REQUIRE_FALSE(client.file_exists(fs1_name, src_dir + "/" + file2));
REQUIRE_FALSE(client.file_exists(fs1_name, src_dir + "/" + file3));
REQUIRE(client.file_exists(fs2_name, dest_dir + "/" + src_dir + "/" + file1));
REQUIRE(client.file_exists(fs2_name, dest_dir + "/" + src_dir + "/" + file2));
REQUIRE(client.file_exists(fs2_name, dest_dir + "/" + src_dir + "/" + file3));
client.create_directory(fs1_name, src_dir);
file1 = as_test::get_random_string(10);
client.create_file(fs1_name, file1);
// Move directory onto a file
client.move_directory(fs1_name, src_dir, file1);
REQUIRE(errno != 0);
client.delete_filesystem(fs1_name);
client.delete_filesystem(fs2_name);
}
TEST_CASE("Directory Properties", "[adls][directory]")
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(false);
std::string fs_name = as_test::adls_base::create_random_filesystem(client);
std::string dir_name = as_test::get_random_string(10);
client.create_directory(fs_name, dir_name);
std::vector<std::pair<std::string, std::string>> metadata;
metadata.emplace_back(std::make_pair("mkey1", "mvalue1"));
metadata.emplace_back(std::make_pair("mKey2", "mvAlue1 $%^&#"));
client.set_directory_properties(fs_name, dir_name, metadata);
REQUIRE(errno == 0);
auto metadata2 = client.get_directory_properties(fs_name, dir_name);
REQUIRE(errno == 0);
REQUIRE(metadata == metadata2);
metadata.clear();
client.set_directory_properties(fs_name, dir_name, metadata);
REQUIRE(errno == 0);
metadata2 = client.get_directory_properties(fs_name, dir_name);
REQUIRE(errno == 0);
REQUIRE(metadata == metadata2);
client.delete_filesystem(fs_name);
}

237
adls/test/file_test.cpp Normal file
Просмотреть файл

@ -0,0 +1,237 @@
#include "catch2/catch.hpp"
#include "adls_client.h"
#include "adls_test_base.h"
TEST_CASE("Create File", "[adls][file]")
{
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(false);
std::string fs_name = as_test::adls_base::create_random_filesystem(client);
std::string file_name = as_test::get_random_string(10) + "/" + as_test::get_random_string(10);
client.create_file(fs_name, file_name);
REQUIRE(errno == 0);
REQUIRE(client.file_exists(fs_name, file_name));
client.delete_file(fs_name, file_name);
REQUIRE(errno == 0);
REQUIRE_FALSE(client.file_exists(fs_name, file_name));
client.delete_file(fs_name, file_name);
REQUIRE(errno != 0);
client.delete_filesystem(fs_name);
}
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(true);
std::string fs_name = as_test::adls_base::create_random_filesystem(client);
std::string file_name = as_test::get_random_string(10);
REQUIRE_THROWS_AS(client.delete_file(fs_name, file_name), std::exception);
REQUIRE_FALSE(client.file_exists(fs_name, file_name));
REQUIRE(errno == 0);
client.delete_filesystem(fs_name);
}
}
TEST_CASE("Append Data", "[adls][file]")
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(false);
std::string fs_name = as_test::adls_base::create_random_filesystem(client);
std::string file_name = as_test::get_random_string(10);
client.create_file(fs_name, file_name);
std::mt19937_64 gen(std::random_device{}());
std::uniform_int_distribution<uint64_t> dist(1, 8 * 1024 * 1024);
std::vector<std::pair<uint64_t, uint64_t>> segments;
for (int i = 0; i < 4; ++i)
{
uint64_t segment_size = dist(gen);
segments.emplace_back(std::make_pair(segments.empty() ? uint64_t(0) : segments.back().first + segments.back().second, segment_size));
}
uint64_t file_size = segments.back().first + segments.back().second;
std::shuffle(segments.begin(), segments.end(), gen);
std::string file_content;
file_content.resize(file_size);
for (auto p : segments)
{
uint64_t offset = p.first;
uint64_t segment_length = p.second;
auto in_stream = as_test::get_istringstream_with_random_buffer(segment_length);
in_stream.read(&file_content[offset], segment_length);
in_stream.seekg(0);
client.append_data_from_stream(fs_name, file_name, offset, in_stream);
REQUIRE(errno == 0);
}
client.flush_data(fs_name, file_name, file_size);
REQUIRE(errno == 0);
// total download
std::ostringstream out_stream;
client.download_file_to_stream(fs_name, file_name, out_stream);
REQUIRE(file_content == out_stream.str());
// range download
for (int i = 0; i < 10; ++i)
{
uint64_t random_offset = std::uniform_int_distribution<uint64_t>(0, file_size - 1)(gen);
uint64_t random_length = std::uniform_int_distribution<uint64_t>(1, file_size - random_offset + 1)(gen);
std::ostringstream out_stream;
client.download_file_to_stream(fs_name, file_name, random_offset, random_length, out_stream);
REQUIRE(file_content.substr(random_offset, random_length) == out_stream.str());
}
// download till end
out_stream.str("");
uint64_t random_offset = std::uniform_int_distribution<uint64_t>(0, file_size - 1)(gen);
uint64_t random_length = file_size - random_offset;
client.download_file_to_stream(fs_name, file_name, random_offset, 0, out_stream);
REQUIRE(random_length == out_stream.str().length());
REQUIRE(file_content.substr(random_offset, random_length) == out_stream.str());
// download nothing
out_stream.str("");
client.download_file_to_stream(fs_name, file_name, file_size, 0, out_stream);
REQUIRE(out_stream.str().length() == 0);
client.delete_filesystem(fs_name);
}
TEST_CASE("Upload File", "[adls][file]")
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(false);
std::string fs_name = as_test::adls_base::create_random_filesystem(client);
std::string file_name = as_test::get_random_string(10);
std::string file_content;
size_t file_size = 4 * 1024 * 1024 + 1234;
file_content.resize(file_size);
auto in_stream = as_test::get_istringstream_with_random_buffer(file_size);
in_stream.read(&file_content[0], file_size);
in_stream.seekg(0);
std::vector<std::pair<std::string, std::string>> metadata;
metadata.emplace_back(std::make_pair("key1", "value1"));
metadata.emplace_back(std::make_pair("keY2", "ValUe2"));
client.upload_file_from_stream(fs_name, file_name, in_stream, metadata);
REQUIRE(errno == 0);
std::ostringstream out_stream;
client.download_file_to_stream(fs_name, file_name, out_stream);
REQUIRE(file_content == out_stream.str());
auto metadata2 = client.get_file_properties(fs_name, file_name);
REQUIRE(metadata == metadata2);
client.delete_filesystem(fs_name);
}
TEST_CASE("File Access Control", "[adls][file][acl]")
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(false);
std::string fs_name = as_test::adls_base::create_random_filesystem(client);
std::string file_name = as_test::get_random_string(10);
client.create_file(fs_name, file_name);
azure::storage_adls::access_control acl;
acl.acl = "user::rw-,group::rw-,other::r--";
client.set_file_access_control(fs_name, file_name, acl);
auto acl2 = client.get_file_access_control(fs_name, file_name);
REQUIRE(acl2.owner == "$superuser");
REQUIRE(acl2.group == "$superuser");
REQUIRE(acl2.acl == acl.acl);
acl.acl.clear();
acl.permissions = "0644";
client.set_file_access_control(fs_name, file_name, acl);
acl2 = client.get_file_access_control(fs_name, file_name);
REQUIRE(acl2.owner == "$superuser");
REQUIRE(acl2.group == "$superuser");
REQUIRE(acl2.permissions == "rw-r--r--");
client.delete_filesystem(fs_name);
}
TEST_CASE("Move File", "[adls][file]")
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(false);
std::string fs1_name = as_test::adls_base::create_random_filesystem(client);
std::string fs2_name = as_test::adls_base::create_random_filesystem(client);
std::string file1_name = as_test::get_random_string(10) + "/" + as_test::get_random_string(10);
std::string file2_dir = as_test::get_random_string(10);
std::string file2_basename = as_test::get_random_string(10);
std::string file2_name = file2_dir + "/" + file2_basename;
std::string file_content;
size_t file_size = 512 * 1024;
file_content.resize(file_size);
auto in_stream = as_test::get_istringstream_with_random_buffer(file_size);
in_stream.read(&file_content[0], file_size);
in_stream.seekg(0);
client.upload_file_from_stream(fs1_name, file1_name, in_stream);
REQUIRE(errno == 0);
client.create_directory(fs2_name, file2_dir);
// Move file
client.move_file(fs1_name, file1_name, fs2_name, file2_name);
REQUIRE(errno == 0);
REQUIRE(client.file_exists(fs2_name, file2_name));
REQUIRE_FALSE(client.file_exists(fs1_name, file1_name));
std::ostringstream out_stream;
client.download_file_to_stream(fs2_name, file2_name, out_stream);
REQUIRE(file_content == out_stream.str());
std::string dest_dir = as_test::get_random_string(10);
client.create_directory(fs2_name, dest_dir);
// Move file into a directory
client.move_file(fs2_name, file2_name, dest_dir);
REQUIRE(errno == 0);
REQUIRE_FALSE(client.file_exists(fs2_name, file2_name));
REQUIRE(client.file_exists(fs2_name, dest_dir + "/" + file2_basename));
client.delete_filesystem(fs1_name);
client.delete_filesystem(fs2_name);
}
TEST_CASE("File Properties", "[adls][file]")
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(false);
std::string fs_name = as_test::adls_base::create_random_filesystem(client);
std::string file_name = as_test::get_random_string(10);
client.create_file(fs_name, file_name);
std::vector<std::pair<std::string, std::string>> metadata;
metadata.emplace_back(std::make_pair("mkey1", "mvalue1"));
metadata.emplace_back(std::make_pair("mKey2", "mvAlue1 $%^&#"));
client.set_file_properties(fs_name, file_name, metadata);
REQUIRE(errno == 0);
auto metadata2 = client.get_file_properties(fs_name, file_name);
REQUIRE(errno == 0);
REQUIRE(metadata == metadata2);
metadata.clear();
client.set_file_properties(fs_name, file_name, metadata);
REQUIRE(errno == 0);
metadata2 = client.get_file_properties(fs_name, file_name);
REQUIRE(errno == 0);
REQUIRE(metadata == metadata2);
client.delete_filesystem(fs_name);
}

Просмотреть файл

@ -0,0 +1,108 @@
#include "catch2/catch.hpp"
#include "adls_client.h"
#include "adls_test_base.h"
TEST_CASE("Create Filesystem", "[adls][filesystem]")
{
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(false);
std::string fs_name = as_test::get_random_string(10);
client.create_filesystem(fs_name);
REQUIRE(errno == 0);
REQUIRE(client.filesystem_exists(fs_name));
client.delete_filesystem(fs_name);
REQUIRE(errno == 0);
REQUIRE_FALSE(client.filesystem_exists(fs_name));
client.delete_filesystem(fs_name);
REQUIRE(errno != 0);
}
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(true);
std::string fs_name = as_test::get_random_string(10);
client.create_filesystem(fs_name);
REQUIRE(client.filesystem_exists(fs_name));
REQUIRE_THROWS_AS(client.create_filesystem(fs_name), std::exception);
client.delete_filesystem(fs_name);
REQUIRE_FALSE(client.filesystem_exists(fs_name));
REQUIRE(errno == 0);
}
}
TEST_CASE("List Filesystem", "[adls][filesystem]")
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(false);
std::string prefix1 = as_test::get_random_string(5);
std::string prefix2 = as_test::get_random_string(5);
REQUIRE(prefix1 != prefix2);
std::set<std::string> fss1;
std::set<std::string> fss2;
for (int i = 0; i < 10; ++i)
{
std::string fs_name = prefix1 + as_test::get_random_string(10);
client.create_filesystem(fs_name);
fss1.insert(fs_name);
fs_name = prefix2 + as_test::get_random_string(10);
client.create_filesystem(fs_name);
fss2.insert(fs_name);
}
std::string continuation;
do
{
const size_t segment_size = 5;
auto list_result = client.list_filesystems_segmented(prefix1, continuation, segment_size);
REQUIRE(list_result.filesystems.size() <= segment_size);
continuation = list_result.continuation_token;
for (const auto& fs : list_result.filesystems)
{
auto ite = fss1.find(fs.name);
REQUIRE(ite != fss1.end());
fss1.erase(ite);
}
} while (!continuation.empty());
REQUIRE(fss1.empty());
for (const auto& fs_name : fss1)
{
client.delete_filesystem(fs_name);
}
for (const auto& fs_name : fss2)
{
client.delete_filesystem(fs_name);
}
}
TEST_CASE("Filesystem Properties", "[adls][filesystem]")
{
azure::storage_adls::adls_client client = as_test::adls_base::test_adls_client(false);
std::string fs_name = as_test::adls_base::create_random_filesystem(client);
std::vector<std::pair<std::string, std::string>> metadata;
metadata.emplace_back(std::make_pair("mkey1", "mvalue1"));
metadata.emplace_back(std::make_pair("mKey2", "mvAlue1 $%^&#"));
client.set_filesystem_properties(fs_name, metadata);
REQUIRE(errno == 0);
auto metadata2 = client.get_filesystem_properties(fs_name);
REQUIRE(errno == 0);
REQUIRE(metadata == metadata2);
metadata.clear();
client.set_filesystem_properties(fs_name, metadata);
REQUIRE(errno == 0);
metadata2 = client.get_filesystem_properties(fs_name);
REQUIRE(errno == 0);
REQUIRE(metadata == metadata2);
client.delete_filesystem(fs_name);
}