This commit is contained in:
Eddy Ashton 2023-01-05 13:59:20 +00:00 коммит произвёл GitHub
Родитель ea53c63af8
Коммит 49d39614c0
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
12 изменённых файлов: 238 добавлений и 178 удалений

Просмотреть файл

@ -948,36 +948,8 @@ if(BUILD_TESTS)
PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/piccolo_driver.py PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/piccolo_driver.py
CONSENSUS cft CONSENSUS cft
CLIENT_BIN ./submit CLIENT_BIN ./submit
ADDITIONAL_ARGS ADDITIONAL_ARGS --package "samples/apps/logging/liblogging"
-p --max-writes-ahead 1000 --repetitions 10000
"samples/apps/logging/liblogging"
-m
1000
--cert
"user1_cert.pem"
--key
"user1_privk.pem"
--cacert
"service_cert.pem"
)
add_perf_test(
NAME pi_ls_jwt
PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/piccolo_driver.py
CONSENSUS cft
CLIENT_BIN ./submit
ADDITIONAL_ARGS
-p
"samples/apps/logging/liblogging"
-m
1000
--cert
"user1_cert.pem"
--key
"user1_privk.pem"
--cacert
"service_cert.pem"
--use-jwt
) )
add_perf_test( add_perf_test(
@ -999,6 +971,21 @@ if(BUILD_TESTS)
msgpack msgpack
) )
add_perf_test(
NAME pi_ls_jwt
PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/piccolo_driver.py
CONSENSUS cft
CLIENT_BIN ./submit
ADDITIONAL_ARGS
--package
"samples/apps/logging/liblogging"
--max-writes-ahead
1000
--repetitions
1000
--use-jwt
)
add_perf_test( add_perf_test(
NAME ls_js NAME ls_js
PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/perfclient.py PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/perfclient.py

Просмотреть файл

@ -9,7 +9,7 @@ that could be submitted to the server. The user can declare the requests by leve
the functions inside the library in :ccf_repo:`tests/perf-system/generator/generator.py`. the functions inside the library in :ccf_repo:`tests/perf-system/generator/generator.py`.
The user can generate requests from the library by either calling the command line tool The user can generate requests from the library by either calling the command line tool
in :ccf_repo:`tests/perf-system/generator/generate_packages.py` or by creating a script in :ccf_repo:`tests/perf-system/generator/generate_packages.py` or by creating a script
calling the functions of the library, such as the :ccf_repo:`tests/perf-system/generator/loggin_generator.py` calling the functions of the library, such as the :ccf_repo:`tests/perf-system/generator/logging_generator.py`
which contains a sample generation of requests for the logging CCF application. which contains a sample generation of requests for the logging CCF application.
Prior running any of these files you first need to install the requirements Prior running any of these files you first need to install the requirements
@ -70,7 +70,7 @@ can run your script as you would run any python file:
.. code-block:: bash .. code-block:: bash
$ python3 loggin_generator.py $ python3 logging_generator.py
Parquet files are an easy and well-compressed way of storing requests generated from this component Parquet files are an easy and well-compressed way of storing requests generated from this component

Просмотреть файл

@ -36,7 +36,7 @@ the following arguments
-s,--send-filepath TEXT REQUIRED Path to parquet file to store the submitted requests. -s,--send-filepath TEXT REQUIRED Path to parquet file to store the submitted requests.
-r,--response-filepath TEXT REQUIRED Path to parquet file to store the responses from the submitted requests. -r,--response-filepath TEXT REQUIRED Path to parquet file to store the responses from the submitted requests.
-g,--generator-filepath TEXT REQUIRED Path to parquet file with the generated requests to be submitted. -g,--generator-filepath TEXT REQUIRED Path to parquet file with the generated requests to be submitted.
-m,--max-inflight-requests INT=0 Specifies the number of outstanding requests sent to the server while waiting for response. When this options is set to 0 there will be no pipelining. Any other value will enable pipelining. A positive value will specify a window of outstanding requests on the server while waiting for a response. -1 or a negative value will set the window of outstanding requests to maximum i.e. submit requests without waiting for a response -m,--max-writes-ahead INT=0 Specifies the number of outstanding requests sent to the server while waiting for response. When this options is set to 0 there will be no pipelining. Any other value will enable pipelining. A positive value will specify a window of outstanding requests on the server while waiting for a response. -1 or a negative value will set the window of outstanding requests to maximum i.e. submit requests without waiting for a response
Once the component finishes submitting and receiving responses for all the requests it Once the component finishes submitting and receiving responses for all the requests it
will then store the results into two ``.parquet`` files. Hence, the path to file with the will then store the results into two ``.parquet`` files. Hence, the path to file with the

Просмотреть файл

@ -93,6 +93,7 @@ class UserInfo:
local_id: int local_id: int
service_id: str service_id: str
cert_path: str cert_path: str
key_path: str
class Network: class Network:
@ -860,7 +861,8 @@ class Network:
cert_path = os.path.join(self.common_dir, f"{local_user_id}_cert.pem") cert_path = os.path.join(self.common_dir, f"{local_user_id}_cert.pem")
with open(cert_path, encoding="utf-8") as c: with open(cert_path, encoding="utf-8") as c:
service_user_id = infra.crypto.compute_cert_der_hash_hex_from_pem(c.read()) service_user_id = infra.crypto.compute_cert_der_hash_hex_from_pem(c.read())
new_user = UserInfo(local_user_id, service_user_id, cert_path) key_path = os.path.join(self.common_dir, f"{local_user_id}_privk.pem")
new_user = UserInfo(local_user_id, service_user_id, cert_path, key_path)
if record: if record:
self.users.append(new_user) self.users.append(new_user)

Просмотреть файл

@ -13,6 +13,8 @@ import cimetrics.upload
import time import time
import http import http
import sys import sys
import hashlib
import json
sys.path.insert(0, "../tests/perf-system/generator") sys.path.insert(0, "../tests/perf-system/generator")
import generator import generator
@ -21,8 +23,16 @@ sys.path.insert(0, "../tests/perf-system/analyzer")
import analyzer import analyzer
def get_command_args(args, get_command): def get_command_args(args, network, get_command):
command_args = [] client_ident = network.users[0]
command_args = [
"--cert",
client_ident.cert_path,
"--key",
client_ident.key_path,
"--cacert",
network.cert_path,
]
return get_command(*command_args) return get_command(*command_args)
@ -92,48 +102,48 @@ def run(get_command, args):
primary, backups = network.find_nodes() primary, backups = network.find_nodes()
command_args = get_command_args(args, get_command) command_args = get_command_args(args, network, get_command)
jwt_header = "" additional_headers = {}
if args.use_jwt: if args.use_jwt:
jwt_issuer = infra.jwt_issuer.JwtIssuer("https://example.issuer") jwt_issuer = infra.jwt_issuer.JwtIssuer("https://example.issuer")
jwt_issuer.register(network) jwt_issuer.register(network)
jwt = jwt_issuer.issue_jwt() jwt = jwt_issuer.issue_jwt()
jwt_header = "Authorization: Bearer " + jwt additional_headers["Authorization"] = f"Bearer {jwt}"
logging_filename = "piccolo_logging_100ktxs" LOG.info(f"Generating {args.repetitions} parquet requests")
LOG.info("Starting parquet requests generation")
msgs = generator.Messages() msgs = generator.Messages()
for i in range(100000): for i in range(args.repetitions):
body = {
"id": i % 100,
"msg": f"Unique message: {hashlib.md5(str(i).encode()).hexdigest()}",
}
msgs.append( msgs.append(
"127.0.0.1:8000",
"/app/log/private", "/app/log/private",
"POST", "POST",
additional_headers=jwt_header, additional_headers=additional_headers,
data='{"id": ' body=json.dumps(body),
+ str(i % 100)
+ ', "msg": "Unique message: 93b885adfe0da089cdf634904fd59f7'
+ str(i)
+ '"}',
) )
path_to_generator_file = os.path.join( filename_prefix = "piccolo_driver"
network.common_dir, f"{logging_filename}.parquet" path_to_requests_file = os.path.join(
network.common_dir, f"{filename_prefix}_requests.parquet"
) )
msgs.to_parquet_file(path_to_generator_file) LOG.info(f"Writing generated requests to {path_to_requests_file}")
msgs.to_parquet_file(path_to_requests_file)
path_to_send_file = os.path.join( path_to_send_file = os.path.join(
network.common_dir, f"{logging_filename}_send.parquet" network.common_dir, f"{filename_prefix}_send.parquet"
) )
path_to_response_file = os.path.join( path_to_response_file = os.path.join(
network.common_dir, f"{logging_filename}_response.parquet" network.common_dir, f"{filename_prefix}_response.parquet"
) )
# Add filepaths in commands # Add filepaths in commands
command_args += ["-s", path_to_send_file] command_args += ["--send-filepath", path_to_send_file]
command_args += ["-r", path_to_response_file] command_args += ["--response-filepath", path_to_response_file]
command_args += ["--generator-filepath", path_to_generator_file] command_args += ["--generator-filepath", path_to_requests_file]
nodes_to_send_to = filter_nodes(primary, backups, args.send_tx_to) nodes_to_send_to = filter_nodes(primary, backups, args.send_tx_to)
clients = [] clients = []
@ -198,6 +208,9 @@ def run(get_command, args):
for remote_client in clients: for remote_client in clients:
analysis = analyzer.Analyze() analysis = analyzer.Analyze()
LOG.info(
f"Analyzing results from {path_to_send_file} and {path_to_response_file}"
)
df_sends = analyzer.get_df_from_parquet_file(path_to_send_file) df_sends = analyzer.get_df_from_parquet_file(path_to_send_file)
df_responses = analyzer.get_df_from_parquet_file( df_responses = analyzer.get_df_from_parquet_file(
path_to_response_file path_to_response_file
@ -237,8 +250,8 @@ def run(get_command, args):
for remote_client in clients: for remote_client in clients:
remote_client.stop() remote_client.stop()
except Exception: except Exception as e:
LOG.error("Stopping clients due to exception") LOG.error(f"Stopping clients due to exception: {e}")
for remote_client in clients: for remote_client in clients:
remote_client.stop() remote_client.stop()
raise raise
@ -293,6 +306,17 @@ def cli_args(add=lambda x: None, accept_unknown=False):
help="Use JWT with a temporary issuer as authentication method.", help="Use JWT with a temporary issuer as authentication method.",
action="store_true", action="store_true",
) )
parser.add_argument(
"--repetitions",
help="Number of requests to send",
type=int,
default=100,
)
parser.add_argument(
"--write-tx-times",
help="Unused, swallowed for compatibility with old args",
action="store_true",
)
parser.add_argument("--config", help="Path to config for client binary", default="") parser.add_argument("--config", help="Path to config for client binary", default="")
return infra.e2e_args.cli_args( return infra.e2e_args.cli_args(
@ -312,11 +336,7 @@ if __name__ == "__main__":
unknown_args = [term for arg in unknown_args for term in arg.split(" ")] unknown_args = [term for arg in unknown_args for term in arg.split(" ")]
write_tx_index = unknown_args.index("--write-tx-times")
def get_command(*args): def get_command(*args):
return ( return [*args] + unknown_args
[*args] + unknown_args[:write_tx_index] + unknown_args[write_tx_index + 1 :]
)
run(get_command, args) run(get_command, args)

Просмотреть файл

@ -316,7 +316,7 @@ class Analyze:
def get_df_from_parquet_file(input_file: str): def get_df_from_parquet_file(input_file: str):
return pd.read_parquet(input_file, engine="fastparquet") return pd.read_parquet(input_file)
def default_analysis(send_file, response_file): def default_analysis(send_file, response_file):

Просмотреть файл

@ -74,7 +74,6 @@ def main():
msg = Messages() msg = Messages()
msg.append( msg.append(
args.host,
args.path, args.path,
args.verb, args.verb,
args.request_type, args.request_type,

Просмотреть файл

@ -12,59 +12,46 @@ import fastparquet as fp # type: ignore
class Messages: class Messages:
def __init__(self): def __init__(self):
self.df = pd.DataFrame(columns=["messageID", "request"]) self.requests = []
def append( def append(
self, self,
host,
path, path,
verb, verb,
request_type="HTTP/1.1", http_version="HTTP/1.1",
content_type="application/json", content_type="application/json",
additional_headers="", additional_headers=None,
data="", body=bytes(),
iterations=1,
): ):
""" """
Create a new df with the contents specified by the arguments, Serialise HTTP request specified by the arguments, and
append it to self.df and return the new df append it to self.requests
""" """
batch_df = pd.DataFrame(columns=["messageID", "request"])
data_headers = b"\r\n"
if len(additional_headers) > 0:
additional_headers += "\r\n"
if len(data) > 0:
if isinstance(data, str):
data = data.encode("ascii")
data_headers = (f"content-length: {len(data)}\r\n\r\n").encode(
"ascii"
) + data
df_size = len(self.df.index) headers = {}
if additional_headers is not None:
headers.update({k.lower(): v for k, v in additional_headers.items()})
for ind in range(iterations): # Insert content-length, and content-type headers, if they're not already present
batch_df.loc[ind] = [ if "content-length" not in headers:
str(ind + df_size), headers["content-length"] = str(len(body))
( if "content-type" not in headers and content_type is not None:
verb.upper() headers["content-type"] = content_type
+ " "
+ path
+ " "
+ request_type
+ "\r\n"
+ "host: "
+ host
+ "\r\n"
+ additional_headers
+ "content-type: "
+ content_type.lower()
+ "\r\n"
).encode("ascii")
+ data_headers,
]
self.df = pd.concat([self.df, batch_df]) # Convert body to bytes if we were given a string
return batch_df if type(body) == str:
body = body.encode("utf-8")
request_line = f"{verb.upper()} {path} {http_version}"
headers_string = "\r\n".join(f"{k}: {v}" for k, v in headers.items())
serialised_request = (
f"{request_line}\r\n{headers_string}\r\n\r\n".encode("ascii") + body
)
self.requests.append(
{"messageID": str(len(self.requests)), "request": serialised_request}
)
def to_parquet_file(self, path): def to_parquet_file(self, path):
fp.write(path, self.df) df = pd.DataFrame(self.requests)
fp.write(path, df, write_index=True)

Просмотреть файл

@ -1,31 +0,0 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
from generator import Messages
HOST = "127.0.0.1:8000"
REQUEST_CONTENT_TYPE = "content-type: application/json"
msgs = Messages()
inputs = msgs.append(HOST, "/app/log/private/count", "GET")
for i in range(14):
msgs.append(
HOST,
"/app/log/private",
"POST",
data='{"id": ' + str(i) + ', "msg": "Logged ' + str(i) + ' to private table"}',
)
inputs = msgs.append(HOST, "/app/log/private/count", "GET")
for i in range(14):
msgs.append(HOST, "/app/log/private?id=" + str(i), "GET")
inputs = msgs.append(HOST, "/app/log/private/count", "GET")
for i in range(14):
msgs.append(HOST, "/app/log/private?id=" + str(i), "DELETE")
inputs = msgs.append(HOST, "/app/log/private/count", "GET")
msgs.to_parquet_file("new_raw.parquet")

Просмотреть файл

@ -0,0 +1,30 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
from generator import Messages
common_headers = {"host": "127.0.0.1:8000"}
msgs = Messages()
msgs.append("/app/log/private/count", "GET")
msg_count = 14
for i in range(msg_count):
msgs.append(
"/app/log/private",
"POST",
additional_headers=common_headers,
body=f'{{"id": {i}, "msg": "Logged {i} to private table"}}',
)
msgs.append("/app/log/private/count", "GET", additional_headers=common_headers)
for i in range(msg_count):
msgs.append(f"/app/log/private?id={i}", "GET", additional_headers=common_headers)
msgs.append("/app/log/private/count", "GET", additional_headers=common_headers)
for i in range(msg_count):
msgs.append(f"/app/log/private?id={i}", "DELETE", additional_headers=common_headers)
msgs.append("/app/log/private/count", "GET", additional_headers=common_headers)
msgs.to_parquet_file("new_raw.parquet")

Просмотреть файл

@ -75,7 +75,7 @@ public:
->required(); ->required();
app app
.add_option( .add_option(
"-m,--max-inflight-requests", "-m,--max-writes-ahead",
max_inflight_requests, max_inflight_requests,
"Specifies the number of outstanding requests sent to the server while " "Specifies the number of outstanding requests sent to the server while "
"waiting for response. When this options is set to 0 there will be no " "waiting for response. When this options is set to 0 there will be no "

Просмотреть файл

@ -12,9 +12,9 @@
#include <CLI11/CLI11.hpp> #include <CLI11/CLI11.hpp>
#include <arrow/array/array_binary.h> #include <arrow/array/array_binary.h>
#include <arrow/builder.h>
#include <arrow/filesystem/localfs.h> #include <arrow/filesystem/localfs.h>
#include <arrow/io/file.h> #include <arrow/io/file.h>
#include <arrow/builder.h>
#include <arrow/table.h> #include <arrow/table.h>
#include <parquet/arrow/reader.h> #include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h> #include <parquet/arrow/writer.h>
@ -40,8 +40,11 @@ void read_parquet_file(string generator_filepath, ParquetData& data_handler)
st = parquet::arrow::OpenFile(input, pool, &arrow_reader); st = parquet::arrow::OpenFile(input, pool, &arrow_reader);
if (!st.ok()) if (!st.ok())
{ {
LOG_FAIL_FMT("Couldn't find generator file"); LOG_FAIL_FMT(
exit(2); "Couldn't find generator file ({}): {}",
generator_filepath,
st.ToString());
exit(1);
} }
else else
{ {
@ -49,35 +52,90 @@ void read_parquet_file(string generator_filepath, ParquetData& data_handler)
} }
// Read entire file as a single Arrow table // Read entire file as a single Arrow table
auto selected_columns = {0, 1}; std::shared_ptr<arrow::Table> table = nullptr;
std::shared_ptr<arrow::Table> table; st = arrow_reader->ReadTable(&table);
st = arrow_reader->ReadTable(selected_columns, &table); if (!st.ok() || table == nullptr)
if (!st.ok())
{ {
LOG_FAIL_FMT("Couldn't open generator file"); LOG_FAIL_FMT(
exit(2); "Couldn't open generator file ({}): {}",
generator_filepath,
st.ToString());
exit(1);
} }
else else
{ {
LOG_INFO_FMT("Opened generator file"); LOG_INFO_FMT("Opened generator file");
} }
std::shared_ptr<::arrow::ChunkedArray> column; const auto& schema = table->schema();
::arrow::Status column1Status = arrow_reader->ReadColumn(1, &column); std::vector<std::string> column_names = {"messageID", "request"};
std::shared_ptr<arrow::StringArray> col1Vals =
std::dynamic_pointer_cast<arrow::StringArray>(column->chunk(
0)); // ASSIGN there is only one chunk with col->num_chunks();
::arrow::Status column2Status = arrow_reader->ReadColumn(2, &column); st = schema->CanReferenceFieldsByNames(column_names);
std::shared_ptr<arrow::BinaryArray> col2Vals = if (!st.ok())
std::dynamic_pointer_cast<arrow::BinaryArray>(column->chunk(
0)); // ASSIGN there is only one chunk with col->num_chunks();
for (int row = 0; row < col1Vals->length(); row++)
{ {
data_handler.ids.push_back(col1Vals->GetString(row)); LOG_FAIL_FMT(
data_handler.request.push_back({col2Vals->Value(row).begin(), "Input file does not contain unambiguous field names - cannot lookup "
col2Vals->Value(row).end()}); "desired columns: {}",
st.ToString());
exit(1);
}
const auto message_id_idx = schema->GetFieldIndex("messageID");
if (message_id_idx == -1)
{
LOG_FAIL_FMT("No messageID field found in file");
exit(1);
}
std::shared_ptr<::arrow::ChunkedArray> message_id_column =
table->column(message_id_idx);
if (message_id_column->num_chunks() != 1)
{
LOG_FAIL_FMT(
"Expected a single chunk, found {}", message_id_column->num_chunks());
exit(1);
}
auto message_id_values =
std::dynamic_pointer_cast<arrow::StringArray>(message_id_column->chunk(0));
if (message_id_values == nullptr)
{
LOG_FAIL_FMT(
"The messageID column of input file could not be read as string array");
exit(1);
}
const auto request_idx = schema->GetFieldIndex("request");
if (request_idx == -1)
{
LOG_FAIL_FMT("No request field found in file");
exit(1);
}
std::shared_ptr<::arrow::ChunkedArray> request_column =
table->column(request_idx);
if (request_column->num_chunks() != 1)
{
LOG_FAIL_FMT(
"Expected a single chunk, found {}", request_column->num_chunks());
exit(1);
}
auto request_values =
std::dynamic_pointer_cast<arrow::BinaryArray>(request_column->chunk(0));
if (request_values == nullptr)
{
LOG_FAIL_FMT(
"The request column of input file could not be read as binary array");
exit(1);
}
for (int row = 0; row < table->num_rows(); row++)
{
data_handler.ids.push_back(message_id_values->GetString(row));
const auto request = request_values->Value(row);
data_handler.request.push_back({request.begin(), request.end()});
} }
} }
@ -127,17 +185,21 @@ void store_parquet_results(ArgumentParser args, ParquetData data_handler)
PARQUET_THROW_NOT_OK(message_id_builder.AppendValues(data_handler.ids)); PARQUET_THROW_NOT_OK(message_id_builder.AppendValues(data_handler.ids));
arrow::NumericBuilder<arrow::DoubleType> send_time_builder; arrow::NumericBuilder<arrow::DoubleType> send_time_builder;
PARQUET_THROW_NOT_OK(send_time_builder.AppendValues(data_handler.send_time)); PARQUET_THROW_NOT_OK(
send_time_builder.AppendValues(data_handler.send_time));
auto table = arrow::Table::Make( auto table = arrow::Table::Make(
arrow::schema({arrow::field("messageID", arrow::utf8()), arrow::schema(
{arrow::field("messageID", arrow::utf8()),
arrow::field("sendTime", arrow::float64())}), arrow::field("sendTime", arrow::float64())}),
{message_id_builder.Finish().ValueOrDie(), send_time_builder.Finish().ValueOrDie()}); {message_id_builder.Finish().ValueOrDie(),
send_time_builder.Finish().ValueOrDie()});
std::shared_ptr<arrow::io::FileOutputStream> outfile; std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(args.send_filepath)); PARQUET_ASSIGN_OR_THROW(
PARQUET_THROW_NOT_OK( outfile, arrow::io::FileOutputStream::Open(args.send_filepath));
parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 1)); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(
*table, arrow::default_memory_pool(), outfile, 1));
} }
// Write Response Parquet // Write Response Parquet
@ -146,7 +208,8 @@ void store_parquet_results(ArgumentParser args, ParquetData data_handler)
PARQUET_THROW_NOT_OK(message_id_builder.AppendValues(data_handler.ids)); PARQUET_THROW_NOT_OK(message_id_builder.AppendValues(data_handler.ids));
arrow::NumericBuilder<arrow::DoubleType> receive_time_builder; arrow::NumericBuilder<arrow::DoubleType> receive_time_builder;
PARQUET_THROW_NOT_OK(receive_time_builder.AppendValues(data_handler.response_time)); PARQUET_THROW_NOT_OK(
receive_time_builder.AppendValues(data_handler.response_time));
arrow::BinaryBuilder raw_response_builder; arrow::BinaryBuilder raw_response_builder;
for (auto& raw_response : data_handler.raw_response) for (auto& raw_response : data_handler.raw_response)
@ -156,17 +219,20 @@ void store_parquet_results(ArgumentParser args, ParquetData data_handler)
} }
auto table = arrow::Table::Make( auto table = arrow::Table::Make(
arrow::schema({arrow::field("messageID", arrow::utf8()), arrow::schema({
arrow::field("messageID", arrow::utf8()),
arrow::field("receiveTime", arrow::float64()), arrow::field("receiveTime", arrow::float64()),
arrow::field("rawResponse", arrow::binary()), arrow::field("rawResponse", arrow::binary()),
}), }),
{message_id_builder.Finish().ValueOrDie(), receive_time_builder.Finish().ValueOrDie(), {message_id_builder.Finish().ValueOrDie(),
receive_time_builder.Finish().ValueOrDie(),
raw_response_builder.Finish().ValueOrDie()}); raw_response_builder.Finish().ValueOrDie()});
std::shared_ptr<arrow::io::FileOutputStream> outfile; std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(args.response_filepath)); PARQUET_ASSIGN_OR_THROW(
PARQUET_THROW_NOT_OK( outfile, arrow::io::FileOutputStream::Open(args.response_filepath));
parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 1)); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(
*table, arrow::default_memory_pool(), outfile, 1));
} }
LOG_INFO_FMT("Finished storing results"); LOG_INFO_FMT("Finished storing results");