Perf tool: support binary data (#4771)

This commit is contained in:
Maik Riechert 2022-12-19 13:53:12 +00:00 коммит произвёл GitHub
Родитель f10e40e96c
Коммит 9d5a42ef02
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 99 добавлений и 113 удалений

Просмотреть файл

@ -30,7 +30,7 @@ class Analyze:
self.ms_latency_list = []
def get_req_type(self, df_responses: pd.DataFrame) -> str:
return df_responses.iloc[0]["rawResponse"].split(" ")[0]
return df_responses.iloc[0]["rawResponse"].split(b" ")[0].decode("ascii")
def get_latency_at_i(
self, df_sends: pd.DataFrame, df_responses: pd.DataFrame, req_id: int
@ -41,10 +41,10 @@ class Analyze:
)
def check_success(self, df_responses: pd.DataFrame, req_id: int) -> int:
req_resp = df_responses.iloc[req_id]["rawResponse"].split("\n")
status_list = req_resp[0].split(" ")
req_resp = df_responses.iloc[req_id]["rawResponse"].split(b"\n")
status_list = req_resp[0].split(b" ")
# if we get a full statues and says ok increase the successful
if len(status_list) > 1 and re.search("^2[0-9][0-9]$", status_list[1]):
if len(status_list) > 1 and re.search(b"^2[0-9][0-9]$", status_list[1]):
return 1
return 0
@ -142,7 +142,7 @@ class Analyze:
# Get the first write Tx in parser and then the id
raw_0 = Parser().parsestr(
df_responses.iloc[0]["rawResponse"].split("\r\n", 1)[1]
df_responses.iloc[0]["rawResponse"].split(b"\r\n", 1)[1].decode("ascii")
)
init_tx_id = int(raw_0[custom_tx_header].split(".")[1]) - 1
init_time = float(df_responses.iloc[0]["receiveTime"])
@ -151,28 +151,30 @@ class Analyze:
committed_ids = []
time_units = []
for row in range(len(df_generator.index)):
if df_generator.iloc[row]["request"].split(" ")[
if df_generator.iloc[row]["request"].split(b" ")[
0
] == "GET" and df_generator.iloc[row]["request"].split(" ")[1].endswith(
"commit"
] == b"GET" and df_generator.iloc[row]["request"].split(b" ")[1].endswith(
b"commit"
):
# Break when the first of the aggressive
# commits (consecutive GETs) reached the posts
if (
len(tx_ids) > 1
and tx_ids[-1] == committed_ids[-1] - 1
and df_generator.iloc[row - 1]["request"].split(" ")[0] == "GET"
and df_generator.iloc[row - 1]["request"].split(b" ")[0] == b"GET"
):
break
commit_tx = df_responses.iloc[row]["rawResponse"].split("\r\n\r\n")[-1]
commit_tx = df_responses.iloc[row]["rawResponse"].split(b"\r\n\r\n")[-1]
headers_alone = df_responses.iloc[row - 1]["rawResponse"].split(
"\r\n", 1
)[1]
headers_alone = (
df_responses.iloc[row - 1]["rawResponse"]
.split(b"\r\n", 1)[1]
.decode("ascii")
)
raw = Parser().parsestr(headers_alone)
if df_generator.iloc[row - 1]["request"].split(" ")[0] != "GET":
if df_generator.iloc[row - 1]["request"].split(b" ")[0] != b"GET":
tx_ids.append(int(raw[custom_tx_header].split(".")[1]) - init_tx_id)
else:
tx_ids.append(tx_ids[-1])

Просмотреть файл

@ -30,30 +30,36 @@ class Messages:
append it to self.df and return the new df
"""
batch_df = pd.DataFrame(columns=["messageID", "request"])
data_headers = "\r\n"
data_headers = b"\r\n"
if len(additional_headers) > 0:
additional_headers += "\r\n"
if len(data) > 0:
data_headers = "content-length: " + str(len(data)) + "\r\n\r\n" + data
if isinstance(data, str):
data = data.encode("ascii")
data_headers = (f"content-length: {len(data)}\r\n\r\n").encode(
"ascii"
) + data
df_size = len(self.df.index)
for ind in range(iterations):
batch_df.loc[ind] = [
str(ind + df_size),
verb.upper()
+ " "
+ path
+ " "
+ request_type
+ "\r\n"
+ "host: "
+ host
+ "\r\n"
+ additional_headers
+ "content-type: "
+ content_type.lower()
+ "\r\n"
(
verb.upper()
+ " "
+ path
+ " "
+ request_type
+ "\r\n"
+ "host: "
+ host
+ "\r\n"
+ additional_headers
+ "content-type: "
+ content_type.lower()
+ "\r\n"
).encode("ascii")
+ data_headers,
]

Просмотреть файл

@ -32,21 +32,21 @@ public:
cert,
"Use the provided certificate file when working with a SSL-based "
"protocol.")
->required(false)
->required(true)
->check(CLI::ExistingFile);
app
.add_option(
"-k,--key",
key,
"Specify the path to the file containing the private key.")
->required(false)
->required(true)
->check(CLI::ExistingFile);
app
.add_option(
"--cacert",
rootCa,
"Use the specified file for certificate verification.")
->required(false)
->required(true)
->check(CLI::ExistingFile);
app
.add_option(

Просмотреть файл

@ -3,14 +3,17 @@
#pragma once
#include <string>
#include <vector>
class ParquetData
{
public:
ParquetData() {}
std::vector<std::string> ids;
std::vector<std::string> request;
std::vector<std::string> raw_response;
std::vector<std::vector<uint8_t>> request;
std::vector<std::vector<uint8_t>> raw_response;
std::vector<double> send_time;
std::vector<double> response_time;
};

Просмотреть файл

@ -14,8 +14,10 @@
#include <arrow/array/array_binary.h>
#include <arrow/filesystem/localfs.h>
#include <arrow/io/file.h>
#include <arrow/builder.h>
#include <arrow/table.h>
#include <parquet/arrow/reader.h>
#include <parquet/stream_writer.h>
#include <parquet/arrow/writer.h>
#include <sys/time.h>
using namespace std;
@ -68,49 +70,17 @@ void read_parquet_file(string generator_filepath, ParquetData& data_handler)
0)); // ASSIGN there is only one chunk with col->num_chunks();
::arrow::Status column2Status = arrow_reader->ReadColumn(2, &column);
std::shared_ptr<arrow::StringArray> col2Vals =
std::dynamic_pointer_cast<arrow::StringArray>(column->chunk(
std::shared_ptr<arrow::BinaryArray> col2Vals =
std::dynamic_pointer_cast<arrow::BinaryArray>(column->chunk(
0)); // ASSIGN there is only one chunk with col->num_chunks();
for (int row = 0; row < col1Vals->length(); row++)
{
data_handler.ids.push_back(col1Vals->GetString(row));
data_handler.request.push_back(col2Vals->GetString(row));
data_handler.request.push_back({col2Vals->Value(row).begin(),
col2Vals->Value(row).end()});
}
}
parquet::StreamWriter init_parquet_columns(
std::string filepath,
ParquetData& data_handler,
std::vector<
std::tuple<std::string, parquet::Type::type, parquet::ConvertedType::type>>
columns)
{
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(filepath));
parquet::WriterProperties::Builder builder;
parquet::schema::NodeVector fields;
for (auto const& col : columns)
{
fields.push_back(parquet::schema::PrimitiveNode::Make(
std::get<0>(col),
parquet::Repetition::REQUIRED,
std::get<1>(col),
std::get<2>(col)));
}
std::shared_ptr<parquet::schema::GroupNode> schema =
std::static_pointer_cast<parquet::schema::GroupNode>(
parquet::schema::GroupNode::Make(
"schema", parquet::Repetition::REQUIRED, fields));
return parquet::StreamWriter{
parquet::ParquetFileWriter::Open(outfile, schema, builder.build())};
}
std::shared_ptr<RpcTlsClient> create_connection(
std::vector<string> certificates, std::string server_address)
{
@ -151,42 +121,52 @@ void store_parquet_results(ArgumentParser args, ParquetData data_handler)
{
LOG_INFO_FMT("Start storing results");
// Initialize Send Columns
std::vector<
std::tuple<std::string, parquet::Type::type, parquet::ConvertedType::type>>
send_cols{
std::make_tuple(
"messageID", parquet::Type::BYTE_ARRAY, parquet::ConvertedType::UTF8),
std::make_tuple(
"sendTime", parquet::Type::DOUBLE, parquet::ConvertedType::NONE)};
// Initialize Response Columns
std::vector<
std::tuple<std::string, parquet::Type::type, parquet::ConvertedType::type>>
response_cols{
std::make_tuple(
"messageID", parquet::Type::BYTE_ARRAY, parquet::ConvertedType::UTF8),
std::make_tuple(
"receiveTime", parquet::Type::DOUBLE, parquet::ConvertedType::NONE),
std::make_tuple(
"rawResponse",
parquet::Type::BYTE_ARRAY,
parquet::ConvertedType::UTF8)};
// Write Send Parquet
auto os = init_parquet_columns(args.send_filepath, data_handler, send_cols);
for (size_t i = 0; i < data_handler.send_time.size(); i++)
{
os << to_string(i) << data_handler.send_time[i] << parquet::EndRow;
arrow::StringBuilder message_id_builder;
PARQUET_THROW_NOT_OK(message_id_builder.AppendValues(data_handler.ids));
arrow::NumericBuilder<arrow::DoubleType> send_time_builder;
PARQUET_THROW_NOT_OK(send_time_builder.AppendValues(data_handler.send_time));
auto table = arrow::Table::Make(
arrow::schema({arrow::field("messageID", arrow::utf8()),
arrow::field("sendTime", arrow::float64())}),
{message_id_builder.Finish().ValueOrDie(), send_time_builder.Finish().ValueOrDie()});
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(args.send_filepath));
PARQUET_THROW_NOT_OK(
parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 1));
}
// Write Response Parquet
os =
init_parquet_columns(args.response_filepath, data_handler, response_cols);
for (size_t i = 0; i < data_handler.response_time.size(); i++)
{
os << to_string(i) << data_handler.response_time[i]
<< data_handler.raw_response[i] << parquet::EndRow;
arrow::StringBuilder message_id_builder;
PARQUET_THROW_NOT_OK(message_id_builder.AppendValues(data_handler.ids));
arrow::NumericBuilder<arrow::DoubleType> receive_time_builder;
PARQUET_THROW_NOT_OK(receive_time_builder.AppendValues(data_handler.response_time));
arrow::BinaryBuilder raw_response_builder;
for (auto& raw_response : data_handler.raw_response)
{
PARQUET_THROW_NOT_OK(
raw_response_builder.Append(raw_response.data(), raw_response.size()));
}
auto table = arrow::Table::Make(
arrow::schema({arrow::field("messageID", arrow::utf8()),
arrow::field("receiveTime", arrow::float64()),
arrow::field("rawResponse", arrow::binary()),
}),
{message_id_builder.Finish().ValueOrDie(), receive_time_builder.Finish().ValueOrDie(),
raw_response_builder.Finish().ValueOrDie()});
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(args.response_filepath));
PARQUET_THROW_NOT_OK(
parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 1));
}
LOG_INFO_FMT("Finished storing results");
@ -217,16 +197,9 @@ int main(int argc, char** argv)
std::vector<timeval> start(requests_size);
std::vector<timeval> end(requests_size);
std::vector<std::vector<uint8_t>> raw_reqs(requests_size);
// Store responses until they are processed to be written in parquet
std::vector<std::vector<uint8_t>> resp_text(data_handler.ids.size());
// Add raw requests straight as uint8_t inside a vector
for (size_t req = 0; req < requests_size; req++)
{
raw_reqs[req] = std::vector<uint8_t>(
data_handler.request[req].begin(), data_handler.request[req].end());
}
LOG_INFO_FMT("Start Request Submission");
@ -237,7 +210,8 @@ int main(int argc, char** argv)
for (size_t req = 0; req < requests_size; req++)
{
gettimeofday(&start[req], NULL);
connection->write(raw_reqs[req]);
auto request = data_handler.request[req];
connection->write({request.data(), request.size()});
resp_text[req] = connection->read_raw_response();
gettimeofday(&end[req], NULL);
}
@ -254,7 +228,8 @@ int main(int argc, char** argv)
for (size_t req = 0; req < requests_size; req++)
{
gettimeofday(&start[req], NULL);
connection->write(raw_reqs[req]);
auto request = data_handler.request[req];
connection->write({request.data(), request.size()});
if (connection->bytes_available())
{
resp_text[read_reqs] = connection->read_raw_response();
@ -269,7 +244,8 @@ int main(int argc, char** argv)
for (size_t req = 0; req < requests_size; req++)
{
gettimeofday(&start[req], NULL);
connection->write(raw_reqs[req]);
auto request = data_handler.request[req];
connection->write({request.data(), request.size()});
if (
connection->bytes_available() or
req - read_reqs >= args.max_inflight_requests)
@ -294,8 +270,7 @@ int main(int argc, char** argv)
for (size_t req = 0; req < requests_size; req++)
{
data_handler.raw_response.push_back(
std::string(reinterpret_cast<char*>(resp_text[req].data())));
data_handler.raw_response.push_back(resp_text[req]);
double send_time = start[req].tv_sec + start[req].tv_usec / 1000000.0;
double response_time = end[req].tv_sec + end[req].tv_usec / 1000000.0;
data_handler.send_time.push_back(send_time);