This commit is contained in:
Michael Spector 2024-06-25 17:22:44 +03:00
Родитель 316f4eddfe
Коммит ca4eba989f
16 изменённых файлов: 2741 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,351 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::{sync::Arc, time::Duration};
use anyhow::{bail, Context, Result};
use arrow_array::{ArrayRef, Datum, RecordBatch, StringArray};
use arrow_cast::{cast_with_options, pretty::pretty_format_batches, CastOptions};
use arrow_flight::{sql::client::FlightSqlServiceClient, FlightInfo};
use arrow_schema::Schema;
use clap::{Parser, Subcommand};
use futures::TryStreamExt;
use tonic::{
metadata::MetadataMap,
transport::{Channel, ClientTlsConfig, Endpoint},
};
use tracing_log::log::info;
/// Logging CLI config.
#[derive(Debug, Parser)]
pub struct LoggingArgs {
/// Log verbosity.
///
/// Defaults to "warn".
///
/// Use `-v` for "info", `-vv` for "debug", `-vvv` for "trace".
///
/// Note you can also set logging level using `RUST_LOG` environment variable:
/// `RUST_LOG=debug`.
#[clap(
short = 'v',
long = "verbose",
action = clap::ArgAction::Count,
)]
log_verbose_count: u8,
}
#[derive(Debug, Parser)]
struct ClientArgs {
/// Additional headers.
///
/// Can be given multiple times. Headers and values are separated by '='.
///
/// Example: `-H foo=bar -H baz=42`
#[clap(long = "header", short = 'H', value_parser = parse_key_val)]
headers: Vec<(String, String)>,
/// Username.
///
/// Optional. If given, `password` must also be set.
#[clap(long, requires = "password")]
username: Option<String>,
/// Password.
///
/// Optional. If given, `username` must also be set.
#[clap(long, requires = "username")]
password: Option<String>,
/// Auth token.
#[clap(long)]
token: Option<String>,
/// Use TLS.
///
/// If not provided, use cleartext connection.
#[clap(long)]
tls: bool,
/// Server host.
///
/// Required.
#[clap(long)]
host: String,
/// Server port.
///
/// Defaults to `443` if `tls` is set, otherwise defaults to `80`.
#[clap(long)]
port: Option<u16>,
}
#[derive(Debug, Parser)]
struct Args {
/// Logging args.
#[clap(flatten)]
logging_args: LoggingArgs,
/// Client args.
#[clap(flatten)]
client_args: ClientArgs,
#[clap(subcommand)]
cmd: Command,
}
/// Different available commands.
#[derive(Debug, Subcommand)]
enum Command {
/// Execute given statement.
StatementQuery {
/// SQL query.
///
/// Required.
query: String,
},
/// Prepare given statement and then execute it.
PreparedStatementQuery {
/// SQL query.
///
/// Required.
///
/// Can contains placeholders like `$1`.
///
/// Example: `SELECT * FROM t WHERE x = $1`
query: String,
/// Additional parameters.
///
/// Can be given multiple times. Names and values are separated by '='. Values will be
/// converted to the type that the server reported for the prepared statement.
///
/// Example: `-p $1=42`
#[clap(short, value_parser = parse_key_val)]
params: Vec<(String, String)>,
},
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
setup_logging(args.logging_args)?;
let mut client = setup_client(args.client_args)
.await
.context("setup client")?;
let flight_info = match args.cmd {
Command::StatementQuery { query } => client
.execute(query, None)
.await
.context("execute statement")?,
Command::PreparedStatementQuery { query, params } => {
let mut prepared_stmt = client
.prepare(query, None)
.await
.context("prepare statement")?;
if !params.is_empty() {
prepared_stmt
.set_parameters(
construct_record_batch_from_params(
&params,
prepared_stmt
.parameter_schema()
.context("get parameter schema")?,
)
.context("construct parameters")?,
)
.context("bind parameters")?;
}
prepared_stmt
.execute()
.await
.context("execute prepared statement")?
}
};
let batches = execute_flight(&mut client, flight_info)
.await
.context("read flight data")?;
let res = pretty_format_batches(batches.as_slice()).context("format results")?;
println!("{res}");
Ok(())
}
async fn execute_flight(
client: &mut FlightSqlServiceClient<Channel>,
info: FlightInfo,
) -> Result<Vec<RecordBatch>> {
let schema = Arc::new(Schema::try_from(info.clone()).context("valid schema")?);
let mut batches = Vec::with_capacity(info.endpoint.len() + 1);
batches.push(RecordBatch::new_empty(schema));
info!("decoded schema");
for endpoint in info.endpoint {
let Some(ticket) = &endpoint.ticket else {
bail!("did not get ticket");
};
let mut flight_data = client.do_get(ticket.clone()).await.context("do get")?;
log_metadata(flight_data.headers(), "header");
let mut endpoint_batches: Vec<_> = (&mut flight_data)
.try_collect()
.await
.context("collect data stream")?;
batches.append(&mut endpoint_batches);
if let Some(trailers) = flight_data.trailers() {
log_metadata(&trailers, "trailer");
}
}
info!("received data");
Ok(batches)
}
fn construct_record_batch_from_params(
params: &[(String, String)],
parameter_schema: &Schema,
) -> Result<RecordBatch> {
let mut items = Vec::<(&String, ArrayRef)>::new();
for (name, value) in params {
let field = parameter_schema.field_with_name(name)?;
let value_as_array = StringArray::new_scalar(value);
let casted = cast_with_options(
value_as_array.get().0,
field.data_type(),
&CastOptions::default(),
)?;
items.push((name, casted))
}
Ok(RecordBatch::try_from_iter(items)?)
}
fn setup_logging(args: LoggingArgs) -> Result<()> {
use tracing_subscriber::{util::SubscriberInitExt, EnvFilter, FmtSubscriber};
tracing_log::LogTracer::init().context("tracing log init")?;
let filter = match args.log_verbose_count {
0 => "warn",
1 => "info",
2 => "debug",
_ => "trace",
};
let filter = EnvFilter::try_new(filter).context("set up log env filter")?;
let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish();
subscriber.try_init().context("init logging subscriber")?;
Ok(())
}
async fn setup_client(args: ClientArgs) -> Result<FlightSqlServiceClient<Channel>> {
let port = args.port.unwrap_or(if args.tls { 443 } else { 80 });
let protocol = if args.tls { "https" } else { "http" };
let mut endpoint = Endpoint::new(format!("{}://{}:{}", protocol, args.host, port))
.context("create endpoint")?
.connect_timeout(Duration::from_secs(20))
.timeout(Duration::from_secs(20))
.tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
.tcp_keepalive(Option::Some(Duration::from_secs(3600)))
.http2_keep_alive_interval(Duration::from_secs(300))
.keep_alive_timeout(Duration::from_secs(20))
.keep_alive_while_idle(true);
if args.tls {
let tls_config = ClientTlsConfig::new();
endpoint = endpoint
.tls_config(tls_config)
.context("create TLS endpoint")?;
}
let channel = endpoint.connect().await.context("connect to endpoint")?;
let mut client = FlightSqlServiceClient::new(channel);
info!("connected");
for (k, v) in args.headers {
client.set_header(k, v);
}
if let Some(token) = args.token {
client.set_token(token);
info!("token set");
}
match (args.username, args.password) {
(None, None) => {}
(Some(username), Some(password)) => {
client
.handshake(&username, &password)
.await
.context("handshake")?;
info!("performed handshake");
}
(Some(_), None) => {
bail!("when username is set, you also need to set a password")
}
(None, Some(_)) => {
bail!("when password is set, you also need to set a username")
}
}
Ok(client)
}
/// Parse a single key-value pair
fn parse_key_val(s: &str) -> Result<(String, String), String> {
let pos = s
.find('=')
.ok_or_else(|| format!("invalid KEY=value: no `=` found in `{s}`"))?;
Ok((s[..pos].to_owned(), s[pos + 1..].to_owned()))
}
/// Log headers/trailers.
fn log_metadata(map: &MetadataMap, what: &'static str) {
for k_v in map.iter() {
match k_v {
tonic::metadata::KeyAndValueRef::Ascii(k, v) => {
info!(
"{}: {}={}",
what,
k.as_str(),
v.to_str().unwrap_or("<invalid>"),
);
}
tonic::metadata::KeyAndValueRef::Binary(k, v) => {
info!(
"{}: {}={}",
what,
k.as_str(),
String::from_utf8_lossy(v.as_ref()),
);
}
}
}
}

Просмотреть файл

@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use arrow::error::Result;
use arrow::ipc::reader::FileReader;
use arrow::ipc::writer::StreamWriter;
use clap::Parser;
use std::fs::File;
use std::io::{self, BufReader};
#[derive(Debug, Parser)]
#[clap(author, version, about("Read an arrow file and stream to stdout"), long_about = None)]
struct Args {
file_name: String,
}
fn main() -> Result<()> {
let args = Args::parse();
let f = File::open(args.file_name)?;
let reader = BufReader::new(f);
let mut reader = FileReader::try_new(reader, None)?;
let schema = reader.schema();
let mut writer = StreamWriter::try_new(io::stdout(), &schema)?;
reader.try_for_each(|batch| {
let batch = batch?;
writer.write(&batch)
})?;
writer.finish()?;
Ok(())
}

Просмотреть файл

@ -0,0 +1,183 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use arrow::error::{ArrowError, Result};
use arrow::ipc::reader::FileReader;
use arrow::ipc::writer::FileWriter;
use arrow_integration_test::*;
use arrow_integration_testing::{canonicalize_schema, open_json_file};
use clap::Parser;
use std::fs::File;
#[derive(clap::ValueEnum, Debug, Clone)]
#[clap(rename_all = "SCREAMING_SNAKE_CASE")]
enum Mode {
ArrowToJson,
JsonToArrow,
Validate,
}
#[derive(Debug, Parser)]
#[clap(author, version, about("rust arrow-json-integration-test"), long_about = None)]
struct Args {
#[clap(short, long)]
integration: bool,
#[clap(short, long, help("Path to ARROW file"))]
arrow: String,
#[clap(short, long, help("Path to JSON file"))]
json: String,
#[clap(
value_enum,
short,
long,
default_value = "VALIDATE",
help = "Mode of integration testing tool"
)]
mode: Mode,
#[clap(short, long)]
verbose: bool,
}
fn main() -> Result<()> {
let args = Args::parse();
let arrow_file = args.arrow;
let json_file = args.json;
let verbose = args.verbose;
match args.mode {
Mode::JsonToArrow => json_to_arrow(&json_file, &arrow_file, verbose),
Mode::ArrowToJson => arrow_to_json(&arrow_file, &json_file, verbose),
Mode::Validate => validate(&arrow_file, &json_file, verbose),
}
}
fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()> {
if verbose {
eprintln!("Converting {json_name} to {arrow_name}");
}
let json_file = open_json_file(json_name)?;
let arrow_file = File::create(arrow_name)?;
let mut writer = FileWriter::try_new(arrow_file, &json_file.schema)?;
for b in json_file.read_batches()? {
writer.write(&b)?;
}
writer.finish()?;
Ok(())
}
fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
if verbose {
eprintln!("Converting {arrow_name} to {json_name}");
}
let arrow_file = File::open(arrow_name)?;
let reader = FileReader::try_new(arrow_file, None)?;
let mut fields: Vec<ArrowJsonField> = vec![];
for f in reader.schema().fields() {
fields.push(ArrowJsonField::from(f));
}
let schema = ArrowJsonSchema {
fields,
metadata: None,
};
let batches = reader
.map(|batch| Ok(ArrowJsonBatch::from_batch(&batch?)))
.collect::<Result<Vec<_>>>()?;
let arrow_json = ArrowJson {
schema,
batches,
dictionaries: None,
};
let json_file = File::create(json_name)?;
serde_json::to_writer(&json_file, &arrow_json).unwrap();
Ok(())
}
fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
if verbose {
eprintln!("Validating {arrow_name} and {json_name}");
}
// open JSON file
let json_file = open_json_file(json_name)?;
// open Arrow file
let arrow_file = File::open(arrow_name)?;
let mut arrow_reader = FileReader::try_new(arrow_file, None)?;
let arrow_schema = arrow_reader.schema().as_ref().to_owned();
// compare schemas
if canonicalize_schema(&json_file.schema) != canonicalize_schema(&arrow_schema) {
return Err(ArrowError::ComputeError(format!(
"Schemas do not match. JSON: {:?}. Arrow: {:?}",
json_file.schema, arrow_schema
)));
}
let json_batches = json_file.read_batches()?;
// compare number of batches
assert!(
json_batches.len() == arrow_reader.num_batches(),
"JSON batches and Arrow batches are unequal"
);
if verbose {
eprintln!(
"Schemas match. JSON file has {} batches.",
json_batches.len()
);
}
for json_batch in json_batches {
if let Some(Ok(arrow_batch)) = arrow_reader.next() {
// compare batches
let num_columns = arrow_batch.num_columns();
assert!(num_columns == json_batch.num_columns());
assert!(arrow_batch.num_rows() == json_batch.num_rows());
for i in 0..num_columns {
assert_eq!(
arrow_batch.column(i).as_ref(),
json_batch.column(i).as_ref(),
"Arrow and JSON batch columns not the same"
);
}
} else {
return Err(ArrowError::ComputeError(
"no more arrow batches left".to_owned(),
));
}
}
if arrow_reader.next().is_some() {
return Err(ArrowError::ComputeError(
"no more json batches left".to_owned(),
));
}
Ok(())
}

Просмотреть файл

@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::io;
use arrow::error::Result;
use arrow::ipc::reader::StreamReader;
use arrow::ipc::writer::FileWriter;
fn main() -> Result<()> {
let mut arrow_stream_reader = StreamReader::try_new(io::stdin(), None)?;
let schema = arrow_stream_reader.schema();
let mut writer = FileWriter::try_new(io::stdout(), &schema)?;
arrow_stream_reader.try_for_each(|batch| writer.write(&batch?))?;
writer.finish()?;
Ok(())
}

Просмотреть файл

@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use arrow_integration_testing::flight_client_scenarios;
use clap::Parser;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T = (), E = Error> = std::result::Result<T, E>;
#[derive(clap::ValueEnum, Debug, Clone)]
enum Scenario {
Middleware,
#[clap(name = "auth:basic_proto")]
AuthBasicProto,
}
#[derive(Debug, Parser)]
#[clap(author, version, about("rust flight-test-integration-client"), long_about = None)]
struct Args {
#[clap(long, help = "host of flight server")]
host: String,
#[clap(long, help = "port of flight server")]
port: u16,
#[clap(
short,
long,
help = "path to the descriptor file, only used when scenario is not provided. See https://arrow.apache.org/docs/format/Integration.html#json-test-data-format"
)]
path: Option<String>,
#[clap(long, value_enum)]
scenario: Option<Scenario>,
}
#[tokio::main]
async fn main() -> Result {
#[cfg(feature = "logging")]
tracing_subscriber::fmt::init();
let args = Args::parse();
let host = args.host;
let port = args.port;
match args.scenario {
Some(Scenario::Middleware) => {
flight_client_scenarios::middleware::run_scenario(&host, port).await?
}
Some(Scenario::AuthBasicProto) => {
flight_client_scenarios::auth_basic_proto::run_scenario(&host, port).await?
}
None => {
let path = args.path.expect("No path is given");
flight_client_scenarios::integration_test::run_scenario(&host, port, &path).await?;
}
}
Ok(())
}

Просмотреть файл

@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use arrow_integration_testing::flight_server_scenarios;
use clap::Parser;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T = (), E = Error> = std::result::Result<T, E>;
#[derive(clap::ValueEnum, Debug, Clone)]
enum Scenario {
Middleware,
#[clap(name = "auth:basic_proto")]
AuthBasicProto,
}
#[derive(Debug, Parser)]
#[clap(author, version, about("rust flight-test-integration-server"), long_about = None)]
struct Args {
#[clap(long)]
port: u16,
#[clap(long, value_enum)]
scenario: Option<Scenario>,
}
#[tokio::main]
async fn main() -> Result {
#[cfg(feature = "logging")]
tracing_subscriber::fmt::init();
let args = Args::parse();
let port = args.port;
match args.scenario {
Some(Scenario::Middleware) => {
flight_server_scenarios::middleware::scenario_setup(port).await?
}
Some(Scenario::AuthBasicProto) => {
flight_server_scenarios::auth_basic_proto::scenario_setup(port).await?
}
None => {
flight_server_scenarios::integration_test::scenario_setup(port).await?;
}
}
Ok(())
}

Просмотреть файл

@ -0,0 +1,118 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Binary that concatenates the column data of one or more parquet files
//!
//! # Install
//!
//! `parquet-concat` can be installed using `cargo`:
//! ```
//! cargo install parquet --features=cli
//! ```
//! After this `parquet-concat` should be available:
//! ```
//! parquet-concat out.parquet a.parquet b.parquet
//! ```
//!
//! The binary can also be built from the source code and run as follows:
//! ```
//! cargo run --features=cli --bin parquet-concat out.parquet a.parquet b.parquet
//! ```
//!
//! Note: this does not currently support preserving the page index or bloom filters
//!
use clap::Parser;
use parquet::column::writer::ColumnCloseResult;
use parquet::errors::{ParquetError, Result};
use parquet::file::properties::WriterProperties;
use parquet::file::writer::SerializedFileWriter;
use std::fs::File;
use std::sync::Arc;
#[derive(Debug, Parser)]
#[clap(author, version)]
/// Concatenates one or more parquet files
struct Args {
/// Path to output
output: String,
/// Path to input files
input: Vec<String>,
}
impl Args {
fn run(&self) -> Result<()> {
if self.input.is_empty() {
return Err(ParquetError::General(
"Must provide at least one input file".into(),
));
}
let output = File::create(&self.output)?;
let inputs = self
.input
.iter()
.map(|x| {
let reader = File::open(x)?;
let metadata = parquet::file::footer::parse_metadata(&reader)?;
Ok((reader, metadata))
})
.collect::<Result<Vec<_>>>()?;
let expected = inputs[0].1.file_metadata().schema();
for (_, metadata) in inputs.iter().skip(1) {
let actual = metadata.file_metadata().schema();
if expected != actual {
return Err(ParquetError::General(format!(
"inputs must have the same schema, {expected:#?} vs {actual:#?}"
)));
}
}
let props = Arc::new(WriterProperties::builder().build());
let schema = inputs[0].1.file_metadata().schema_descr().root_schema_ptr();
let mut writer = SerializedFileWriter::new(output, schema, props)?;
for (input, metadata) in inputs {
for rg in metadata.row_groups() {
let mut rg_out = writer.next_row_group()?;
for column in rg.columns() {
let result = ColumnCloseResult {
bytes_written: column.compressed_size() as _,
rows_written: rg.num_rows() as _,
metadata: column.clone(),
bloom_filter: None,
column_index: None,
offset_index: None,
};
rg_out.append_column(&input, result)?;
}
rg_out.close()?;
}
}
writer.close()?;
Ok(())
}
}
fn main() -> Result<()> {
Args::parse().run()
}

Просмотреть файл

@ -0,0 +1,75 @@
Usage: parquet [OPTIONS] --schema <SCHEMA> --input-file <INPUT_FILE> --output-file <OUTPUT_FILE>
Options:
-s, --schema <SCHEMA>
message schema for output Parquet
-i, --input-file <INPUT_FILE>
input CSV file
-o, --output-file <OUTPUT_FILE>
output Parquet file
-f, --input-format <INPUT_FORMAT>
input file format
[default: csv]
[possible values: csv, tsv]
-b, --batch-size <BATCH_SIZE>
batch size
[env: PARQUET_FROM_CSV_BATCHSIZE=]
[default: 1000]
-h, --has-header
has header
-d, --delimiter <DELIMITER>
field delimiter
default value: when input_format==CSV: ',' when input_format==TSV: 'TAB'
-r, --record-terminator <RECORD_TERMINATOR>
record terminator
[possible values: lf, crlf, cr]
-e, --escape-char <ESCAPE_CHAR>
escape character
-q, --quote-char <QUOTE_CHAR>
quote character
-D, --double-quote <DOUBLE_QUOTE>
double quote
[possible values: true, false]
-C, --csv-compression <CSV_COMPRESSION>
compression mode of csv
[default: UNCOMPRESSED]
-c, --parquet-compression <PARQUET_COMPRESSION>
compression mode of parquet
[default: SNAPPY]
-w, --writer-version <WRITER_VERSION>
writer version
-m, --max-row-group-size <MAX_ROW_GROUP_SIZE>
max row group size
--enable-bloom-filter <ENABLE_BLOOM_FILTER>
whether to enable bloom filter writing
[possible values: true, false]
--help
display usage help
-V, --version
Print version

Просмотреть файл

@ -0,0 +1,748 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Binary file to converts csv to Parquet file
//!
//! # Install
//!
//! `parquet-fromcsv` can be installed using `cargo`:
//!
//! ```text
//! cargo install parquet --features=cli
//! ```
//!
//! After this `parquet-fromcsv` should be available:
//!
//! ```text
//! parquet-fromcsv --schema message_schema_for_parquet.txt input.csv output.parquet
//! ```
//!
//! The binary can also be built from the source code and run as follows:
//!
//! ```text
//! cargo run --features=cli --bin parquet-fromcsv --schema message_schema_for_parquet.txt \
//! \ input.csv output.parquet
//! ```
//!
//! # Options
//!
//! ```text
#![doc = include_str!("./parquet-fromcsv-help.txt")] // Update for this file : Run test test_command_help
//! ```
//!
//! ## Parquet file options
//!
//! ```text
//! - `-b`, `--batch-size` : Batch size for Parquet
//! - `-c`, `--parquet-compression` : Compression option for Parquet, default is SNAPPY
//! - `-s`, `--schema` : Path to message schema for generated Parquet file
//! - `-o`, `--output-file` : Path to output Parquet file
//! - `-w`, `--writer-version` : Writer version
//! - `-m`, `--max-row-group-size` : Max row group size
//! - `--enable-bloom-filter` : Enable bloom filter during writing
//! ```
//!
//! ## Input file options
//!
//! ```text
//! - `-i`, `--input-file` : Path to input CSV file
//! - `-f`, `--input-format` : Dialect for input file, `csv` or `tsv`.
//! - `-C`, `--csv-compression` : Compression option for csv, default is UNCOMPRESSED
//! - `-d`, `--delimiter : Field delimiter for CSV file, default depends `--input-format`
//! - `-e`, `--escape` : Escape character for input file
//! - `-h`, `--has-header` : Input has header
//! - `-r`, `--record-terminator` : Record terminator character for input. default is CRLF
//! - `-q`, `--quote-char` : Input quoting character
//! ```
//!
use std::{
fmt::Display,
fs::{read_to_string, File},
io::Read,
path::{Path, PathBuf},
sync::Arc,
};
use arrow_csv::ReaderBuilder;
use arrow_schema::{ArrowError, Schema};
use clap::{Parser, ValueEnum};
use parquet::arrow::arrow_writer::ArrowWriterOptions;
use parquet::{
arrow::{parquet_to_arrow_schema, ArrowWriter},
basic::Compression,
errors::ParquetError,
file::properties::{WriterProperties, WriterVersion},
schema::{parser::parse_message_type, types::SchemaDescriptor},
};
#[derive(Debug)]
enum ParquetFromCsvError {
CommandLineParseError(clap::Error),
IoError(std::io::Error),
ArrowError(ArrowError),
ParquetError(ParquetError),
WithContext(String, Box<Self>),
}
impl From<std::io::Error> for ParquetFromCsvError {
fn from(e: std::io::Error) -> Self {
Self::IoError(e)
}
}
impl From<ArrowError> for ParquetFromCsvError {
fn from(e: ArrowError) -> Self {
Self::ArrowError(e)
}
}
impl From<ParquetError> for ParquetFromCsvError {
fn from(e: ParquetError) -> Self {
Self::ParquetError(e)
}
}
impl From<clap::Error> for ParquetFromCsvError {
fn from(e: clap::Error) -> Self {
Self::CommandLineParseError(e)
}
}
impl ParquetFromCsvError {
pub fn with_context<E: Into<ParquetFromCsvError>>(
inner_error: E,
context: &str,
) -> ParquetFromCsvError {
let inner = inner_error.into();
ParquetFromCsvError::WithContext(context.to_string(), Box::new(inner))
}
}
impl Display for ParquetFromCsvError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ParquetFromCsvError::CommandLineParseError(e) => write!(f, "{e}"),
ParquetFromCsvError::IoError(e) => write!(f, "{e}"),
ParquetFromCsvError::ArrowError(e) => write!(f, "{e}"),
ParquetFromCsvError::ParquetError(e) => write!(f, "{e}"),
ParquetFromCsvError::WithContext(c, e) => {
writeln!(f, "{e}")?;
write!(f, "context: {c}")
}
}
}
}
#[derive(Debug, Parser)]
#[clap(author, version, disable_help_flag=true, about("Binary to convert csv to Parquet"), long_about=None)]
struct Args {
/// Path to a text file containing a parquet schema definition
#[clap(short, long, help("message schema for output Parquet"))]
schema: PathBuf,
/// input CSV file path
#[clap(short, long, help("input CSV file"))]
input_file: PathBuf,
/// output Parquet file path
#[clap(short, long, help("output Parquet file"))]
output_file: PathBuf,
/// input file format
#[clap(
value_enum,
short('f'),
long,
help("input file format"),
default_value_t=CsvDialect::Csv
)]
input_format: CsvDialect,
/// batch size
#[clap(
short,
long,
help("batch size"),
default_value_t = 1000,
env = "PARQUET_FROM_CSV_BATCHSIZE"
)]
batch_size: usize,
/// has header line
#[clap(short, long, help("has header"))]
has_header: bool,
/// field delimiter
///
/// default value:
/// when input_format==CSV: ','
/// when input_format==TSV: 'TAB'
#[clap(short, long, help("field delimiter"))]
delimiter: Option<char>,
#[clap(value_enum, short, long, help("record terminator"))]
record_terminator: Option<RecordTerminator>,
#[clap(short, long, help("escape character"))]
escape_char: Option<char>,
#[clap(short, long, help("quote character"))]
quote_char: Option<char>,
#[clap(short('D'), long, help("double quote"))]
double_quote: Option<bool>,
#[clap(short('C'), long, help("compression mode of csv"), default_value_t=Compression::UNCOMPRESSED)]
#[clap(value_parser=compression_from_str)]
csv_compression: Compression,
#[clap(short('c'), long, help("compression mode of parquet"), default_value_t=Compression::SNAPPY)]
#[clap(value_parser=compression_from_str)]
parquet_compression: Compression,
#[clap(short, long, help("writer version"))]
#[clap(value_parser=writer_version_from_str)]
writer_version: Option<WriterVersion>,
#[clap(short, long, help("max row group size"))]
max_row_group_size: Option<usize>,
#[clap(long, help("whether to enable bloom filter writing"))]
enable_bloom_filter: Option<bool>,
#[clap(long, action=clap::ArgAction::Help, help("display usage help"))]
help: Option<bool>,
}
fn compression_from_str(cmp: &str) -> Result<Compression, String> {
match cmp.to_uppercase().as_str() {
"UNCOMPRESSED" => Ok(Compression::UNCOMPRESSED),
"SNAPPY" => Ok(Compression::SNAPPY),
"GZIP" => Ok(Compression::GZIP(Default::default())),
"LZO" => Ok(Compression::LZO),
"BROTLI" => Ok(Compression::BROTLI(Default::default())),
"LZ4" => Ok(Compression::LZ4),
"ZSTD" => Ok(Compression::ZSTD(Default::default())),
v => Err(
format!("Unknown compression {v} : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD \n\nFor more information try --help")
)
}
}
fn writer_version_from_str(cmp: &str) -> Result<WriterVersion, String> {
match cmp.to_uppercase().as_str() {
"1" => Ok(WriterVersion::PARQUET_1_0),
"2" => Ok(WriterVersion::PARQUET_2_0),
v => Err(format!("Unknown writer version {v} : possible values 1, 2")),
}
}
impl Args {
fn schema_path(&self) -> &Path {
self.schema.as_path()
}
fn get_delimiter(&self) -> u8 {
match self.delimiter {
Some(ch) => ch as u8,
None => match self.input_format {
CsvDialect::Csv => b',',
CsvDialect::Tsv => b'\t',
},
}
}
fn get_terminator(&self) -> Option<u8> {
match self.record_terminator {
Some(RecordTerminator::LF) => Some(0x0a),
Some(RecordTerminator::CR) => Some(0x0d),
Some(RecordTerminator::Crlf) => None,
None => match self.input_format {
CsvDialect::Csv => None,
CsvDialect::Tsv => Some(0x0a),
},
}
}
fn get_escape(&self) -> Option<u8> {
self.escape_char.map(|ch| ch as u8)
}
fn get_quote(&self) -> Option<u8> {
if self.quote_char.is_none() {
match self.input_format {
CsvDialect::Csv => Some(b'\"'),
CsvDialect::Tsv => None,
}
} else {
self.quote_char.map(|c| c as u8)
}
}
}
#[derive(Debug, Clone, Copy, ValueEnum, PartialEq)]
enum CsvDialect {
Csv,
Tsv,
}
#[derive(Debug, Clone, Copy, ValueEnum, PartialEq)]
enum RecordTerminator {
LF,
Crlf,
CR,
}
fn configure_writer_properties(args: &Args) -> WriterProperties {
let mut properties_builder =
WriterProperties::builder().set_compression(args.parquet_compression);
if let Some(writer_version) = args.writer_version {
properties_builder = properties_builder.set_writer_version(writer_version);
}
if let Some(max_row_group_size) = args.max_row_group_size {
properties_builder = properties_builder.set_max_row_group_size(max_row_group_size);
}
if let Some(enable_bloom_filter) = args.enable_bloom_filter {
properties_builder = properties_builder.set_bloom_filter_enabled(enable_bloom_filter);
}
properties_builder.build()
}
fn configure_reader_builder(args: &Args, arrow_schema: Arc<Schema>) -> ReaderBuilder {
fn configure_reader<T, F: Fn(ReaderBuilder, T) -> ReaderBuilder>(
builder: ReaderBuilder,
value: Option<T>,
fun: F,
) -> ReaderBuilder {
if let Some(val) = value {
fun(builder, val)
} else {
builder
}
}
let mut builder = ReaderBuilder::new(arrow_schema)
.with_batch_size(args.batch_size)
.with_header(args.has_header)
.with_delimiter(args.get_delimiter());
builder = configure_reader(
builder,
args.get_terminator(),
ReaderBuilder::with_terminator,
);
builder = configure_reader(builder, args.get_escape(), ReaderBuilder::with_escape);
builder = configure_reader(builder, args.get_quote(), ReaderBuilder::with_quote);
builder
}
fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> {
let schema = read_to_string(args.schema_path()).map_err(|e| {
ParquetFromCsvError::with_context(
e,
&format!("Failed to open schema file {:#?}", args.schema_path()),
)
})?;
let parquet_schema = Arc::new(parse_message_type(&schema)?);
let desc = SchemaDescriptor::new(parquet_schema);
let arrow_schema = Arc::new(parquet_to_arrow_schema(&desc, None)?);
// create output parquet writer
let parquet_file = File::create(&args.output_file).map_err(|e| {
ParquetFromCsvError::with_context(
e,
&format!("Failed to create output file {:#?}", &args.output_file),
)
})?;
let options = ArrowWriterOptions::new()
.with_properties(configure_writer_properties(args))
.with_schema_root(desc.name().to_string());
let mut arrow_writer =
ArrowWriter::try_new_with_options(parquet_file, arrow_schema.clone(), options)
.map_err(|e| ParquetFromCsvError::with_context(e, "Failed to create ArrowWriter"))?;
// open input file
let input_file = File::open(&args.input_file).map_err(|e| {
ParquetFromCsvError::with_context(
e,
&format!("Failed to open input file {:#?}", &args.input_file),
)
})?;
// open input file decoder
let input_file_decoder = match args.csv_compression {
Compression::UNCOMPRESSED => Box::new(input_file) as Box<dyn Read>,
Compression::SNAPPY => Box::new(snap::read::FrameDecoder::new(input_file)) as Box<dyn Read>,
Compression::GZIP(_) => {
Box::new(flate2::read::MultiGzDecoder::new(input_file)) as Box<dyn Read>
}
Compression::BROTLI(_) => {
Box::new(brotli::Decompressor::new(input_file, 0)) as Box<dyn Read>
}
Compression::LZ4 => {
Box::new(lz4_flex::frame::FrameDecoder::new(input_file)) as Box<dyn Read>
}
Compression::ZSTD(_) => {
Box::new(zstd::Decoder::new(input_file).map_err(|e| {
ParquetFromCsvError::with_context(e, "Failed to create zstd::Decoder")
})?) as Box<dyn Read>
}
d => unimplemented!("compression type {d}"),
};
// create input csv reader
let builder = configure_reader_builder(args, arrow_schema);
let reader = builder.build(input_file_decoder)?;
for batch_result in reader {
let batch = batch_result.map_err(|e| {
ParquetFromCsvError::with_context(e, "Failed to read RecordBatch from CSV")
})?;
arrow_writer.write(&batch).map_err(|e| {
ParquetFromCsvError::with_context(e, "Failed to write RecordBatch to parquet")
})?;
}
arrow_writer
.close()
.map_err(|e| ParquetFromCsvError::with_context(e, "Failed to close parquet"))?;
Ok(())
}
fn main() -> Result<(), ParquetFromCsvError> {
let args = Args::parse();
convert_csv_to_parquet(&args)
}
#[cfg(test)]
mod tests {
use std::{
io::Write,
path::{Path, PathBuf},
};
use super::*;
use arrow::datatypes::{DataType, Field};
use brotli::CompressorWriter;
use clap::{CommandFactory, Parser};
use flate2::write::GzEncoder;
use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
use parquet::file::reader::{FileReader, SerializedFileReader};
use snap::write::FrameEncoder;
use tempfile::NamedTempFile;
#[test]
fn test_command_help() {
let mut cmd = Args::command();
let dir = std::env::var("CARGO_MANIFEST_DIR").unwrap();
let mut path_buf = PathBuf::from(dir);
path_buf.push("src");
path_buf.push("bin");
path_buf.push("parquet-fromcsv-help.txt");
let expected = std::fs::read_to_string(path_buf).unwrap();
let mut buffer_vec = Vec::new();
let mut buffer = std::io::Cursor::new(&mut buffer_vec);
cmd.write_long_help(&mut buffer).unwrap();
// Remove Parquet version string from the help text
let mut actual = String::from_utf8(buffer_vec).unwrap();
let pos = actual.find('\n').unwrap() + 1;
actual = actual[pos..].to_string();
assert_eq!(
expected, actual,
"help text not match. please update to \n---\n{actual}\n---\n"
)
}
fn parse_args(mut extra_args: Vec<&str>) -> Result<Args, ParquetFromCsvError> {
let mut args = vec![
"test",
"--schema",
"test.schema",
"--input-file",
"infile.csv",
"--output-file",
"out.parquet",
];
args.append(&mut extra_args);
let args = Args::try_parse_from(args.iter())?;
Ok(args)
}
#[test]
fn test_parse_arg_minimum() -> Result<(), ParquetFromCsvError> {
let args = parse_args(vec![])?;
assert_eq!(args.schema, PathBuf::from(Path::new("test.schema")));
assert_eq!(args.input_file, PathBuf::from(Path::new("infile.csv")));
assert_eq!(args.output_file, PathBuf::from(Path::new("out.parquet")));
// test default values
assert_eq!(args.input_format, CsvDialect::Csv);
assert_eq!(args.batch_size, 1000);
assert!(!args.has_header);
assert_eq!(args.delimiter, None);
assert_eq!(args.get_delimiter(), b',');
assert_eq!(args.record_terminator, None);
assert_eq!(args.get_terminator(), None); // CRLF
assert_eq!(args.quote_char, None);
assert_eq!(args.get_quote(), Some(b'\"'));
assert_eq!(args.double_quote, None);
assert_eq!(args.parquet_compression, Compression::SNAPPY);
Ok(())
}
#[test]
fn test_parse_arg_format_variants() -> Result<(), ParquetFromCsvError> {
let args = parse_args(vec!["--input-format", "csv"])?;
assert_eq!(args.input_format, CsvDialect::Csv);
assert_eq!(args.get_delimiter(), b',');
assert_eq!(args.get_terminator(), None); // CRLF
assert_eq!(args.get_quote(), Some(b'\"'));
assert_eq!(args.get_escape(), None);
let args = parse_args(vec!["--input-format", "tsv"])?;
assert_eq!(args.input_format, CsvDialect::Tsv);
assert_eq!(args.get_delimiter(), b'\t');
assert_eq!(args.get_terminator(), Some(b'\x0a')); // LF
assert_eq!(args.get_quote(), None); // quote none
assert_eq!(args.get_escape(), None);
let args = parse_args(vec!["--input-format", "csv", "--escape-char", "\\"])?;
assert_eq!(args.input_format, CsvDialect::Csv);
assert_eq!(args.get_delimiter(), b',');
assert_eq!(args.get_terminator(), None); // CRLF
assert_eq!(args.get_quote(), Some(b'\"'));
assert_eq!(args.get_escape(), Some(b'\\'));
let args = parse_args(vec!["--input-format", "tsv", "--delimiter", ":"])?;
assert_eq!(args.input_format, CsvDialect::Tsv);
assert_eq!(args.get_delimiter(), b':');
assert_eq!(args.get_terminator(), Some(b'\x0a')); // LF
assert_eq!(args.get_quote(), None); // quote none
assert_eq!(args.get_escape(), None);
Ok(())
}
#[test]
#[should_panic]
fn test_parse_arg_format_error() {
parse_args(vec!["--input-format", "excel"]).unwrap();
}
#[test]
fn test_parse_arg_compression_format() {
let args = parse_args(vec!["--parquet-compression", "uncompressed"]).unwrap();
assert_eq!(args.parquet_compression, Compression::UNCOMPRESSED);
let args = parse_args(vec!["--parquet-compression", "snappy"]).unwrap();
assert_eq!(args.parquet_compression, Compression::SNAPPY);
let args = parse_args(vec!["--parquet-compression", "gzip"]).unwrap();
assert_eq!(
args.parquet_compression,
Compression::GZIP(Default::default())
);
let args = parse_args(vec!["--parquet-compression", "lzo"]).unwrap();
assert_eq!(args.parquet_compression, Compression::LZO);
let args = parse_args(vec!["--parquet-compression", "lz4"]).unwrap();
assert_eq!(args.parquet_compression, Compression::LZ4);
let args = parse_args(vec!["--parquet-compression", "brotli"]).unwrap();
assert_eq!(
args.parquet_compression,
Compression::BROTLI(Default::default())
);
let args = parse_args(vec!["--parquet-compression", "zstd"]).unwrap();
assert_eq!(
args.parquet_compression,
Compression::ZSTD(Default::default())
);
}
#[test]
fn test_parse_arg_compression_format_fail() {
match parse_args(vec!["--parquet-compression", "zip"]) {
Ok(_) => panic!("unexpected success"),
Err(e) => assert_eq!(
format!("{e}"),
"error: invalid value 'zip' for '--parquet-compression <PARQUET_COMPRESSION>': Unknown compression ZIP : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD \n\nFor more information try --help\n"),
}
}
fn assert_debug_text(debug_text: &str, name: &str, value: &str) {
let pattern = format!(" {name}: {value}");
assert!(
debug_text.contains(&pattern),
"\"{debug_text}\" not contains \"{pattern}\""
)
}
#[test]
fn test_configure_reader_builder() {
let args = Args {
schema: PathBuf::from(Path::new("schema.arvo")),
input_file: PathBuf::from(Path::new("test.csv")),
output_file: PathBuf::from(Path::new("out.parquet")),
batch_size: 1000,
input_format: CsvDialect::Csv,
has_header: false,
delimiter: None,
record_terminator: None,
escape_char: None,
quote_char: None,
double_quote: None,
csv_compression: Compression::UNCOMPRESSED,
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
enable_bloom_filter: None,
help: None,
};
let arrow_schema = Arc::new(Schema::new(vec![
Field::new("field1", DataType::Utf8, false),
Field::new("field2", DataType::Utf8, false),
Field::new("field3", DataType::Utf8, false),
Field::new("field4", DataType::Utf8, false),
Field::new("field5", DataType::Utf8, false),
]));
let reader_builder = configure_reader_builder(&args, arrow_schema);
let builder_debug = format!("{reader_builder:?}");
assert_debug_text(&builder_debug, "header", "false");
assert_debug_text(&builder_debug, "delimiter", "Some(44)");
assert_debug_text(&builder_debug, "quote", "Some(34)");
assert_debug_text(&builder_debug, "terminator", "None");
assert_debug_text(&builder_debug, "batch_size", "1000");
assert_debug_text(&builder_debug, "escape", "None");
let args = Args {
schema: PathBuf::from(Path::new("schema.arvo")),
input_file: PathBuf::from(Path::new("test.csv")),
output_file: PathBuf::from(Path::new("out.parquet")),
batch_size: 2000,
input_format: CsvDialect::Tsv,
has_header: true,
delimiter: None,
record_terminator: None,
escape_char: Some('\\'),
quote_char: None,
double_quote: None,
csv_compression: Compression::UNCOMPRESSED,
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
enable_bloom_filter: None,
help: None,
};
let arrow_schema = Arc::new(Schema::new(vec![
Field::new("field1", DataType::Utf8, false),
Field::new("field2", DataType::Utf8, false),
Field::new("field3", DataType::Utf8, false),
Field::new("field4", DataType::Utf8, false),
Field::new("field5", DataType::Utf8, false),
]));
let reader_builder = configure_reader_builder(&args, arrow_schema);
let builder_debug = format!("{reader_builder:?}");
assert_debug_text(&builder_debug, "header", "true");
assert_debug_text(&builder_debug, "delimiter", "Some(9)");
assert_debug_text(&builder_debug, "quote", "None");
assert_debug_text(&builder_debug, "terminator", "Some(10)");
assert_debug_text(&builder_debug, "batch_size", "2000");
assert_debug_text(&builder_debug, "escape", "Some(92)");
}
fn test_convert_compressed_csv_to_parquet(csv_compression: Compression) {
let schema = NamedTempFile::new().unwrap();
let schema_text = r"message my_amazing_schema {
optional int32 id;
optional binary name (STRING);
}";
schema.as_file().write_all(schema_text.as_bytes()).unwrap();
let mut input_file = NamedTempFile::new().unwrap();
fn write_tmp_file<T: Write>(w: &mut T) {
for index in 1..2000 {
write!(w, "{index},\"name_{index}\"\r\n").unwrap();
}
w.flush().unwrap();
}
// make sure the input_file's lifetime being long enough
input_file = match csv_compression {
Compression::UNCOMPRESSED => {
write_tmp_file(&mut input_file);
input_file
}
Compression::SNAPPY => {
let mut encoder = FrameEncoder::new(input_file);
write_tmp_file(&mut encoder);
encoder.into_inner().unwrap()
}
Compression::GZIP(level) => {
let mut encoder = GzEncoder::new(
input_file,
flate2::Compression::new(level.compression_level()),
);
write_tmp_file(&mut encoder);
encoder.finish().unwrap()
}
Compression::BROTLI(level) => {
let mut encoder =
CompressorWriter::new(input_file, 0, level.compression_level(), 0);
write_tmp_file(&mut encoder);
encoder.into_inner()
}
Compression::LZ4 => {
let mut encoder = lz4_flex::frame::FrameEncoder::new(input_file);
write_tmp_file(&mut encoder);
encoder.finish().unwrap()
}
Compression::ZSTD(level) => {
let mut encoder = zstd::Encoder::new(input_file, level.compression_level())
.map_err(|e| {
ParquetFromCsvError::with_context(e, "Failed to create zstd::Encoder")
})
.unwrap();
write_tmp_file(&mut encoder);
encoder.finish().unwrap()
}
d => unimplemented!("compression type {d}"),
};
let output_parquet = NamedTempFile::new().unwrap();
let args = Args {
schema: PathBuf::from(schema.path()),
input_file: PathBuf::from(input_file.path()),
output_file: PathBuf::from(output_parquet.path()),
batch_size: 1000,
input_format: CsvDialect::Csv,
has_header: false,
delimiter: None,
record_terminator: None,
escape_char: None,
quote_char: None,
double_quote: None,
csv_compression,
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
// by default we shall test bloom filter writing
enable_bloom_filter: Some(true),
help: None,
};
convert_csv_to_parquet(&args).unwrap();
let file = SerializedFileReader::new(output_parquet.into_file()).unwrap();
let schema_name = file.metadata().file_metadata().schema().name();
assert_eq!(schema_name, "my_amazing_schema");
}
#[test]
fn test_convert_csv_to_parquet() {
test_convert_compressed_csv_to_parquet(Compression::UNCOMPRESSED);
test_convert_compressed_csv_to_parquet(Compression::SNAPPY);
test_convert_compressed_csv_to_parquet(Compression::GZIP(GzipLevel::try_new(1).unwrap()));
test_convert_compressed_csv_to_parquet(Compression::BROTLI(
BrotliLevel::try_new(2).unwrap(),
));
test_convert_compressed_csv_to_parquet(Compression::LZ4);
test_convert_compressed_csv_to_parquet(Compression::ZSTD(ZstdLevel::try_new(1).unwrap()));
}
}

Просмотреть файл

@ -0,0 +1,172 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Binary that prints the [page index] of a parquet file
//!
//! # Install
//!
//! `parquet-layout` can be installed using `cargo`:
//! ```
//! cargo install parquet --features=cli
//! ```
//! After this `parquet-index` should be available:
//! ```
//! parquet-index XYZ.parquet COLUMN_NAME
//! ```
//!
//! The binary can also be built from the source code and run as follows:
//! ```
//! cargo run --features=cli --bin parquet-index XYZ.parquet COLUMN_NAME
//!
//! [page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
use clap::Parser;
use parquet::errors::{ParquetError, Result};
use parquet::file::page_index::index::{Index, PageIndex};
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::file::serialized_reader::ReadOptionsBuilder;
use parquet::format::PageLocation;
use std::fs::File;
#[derive(Debug, Parser)]
#[clap(author, version, about("Prints the page index of a parquet file"), long_about = None)]
struct Args {
#[clap(help("Path to a parquet file"))]
file: String,
#[clap(help("Column name to print"))]
column: String,
}
impl Args {
fn run(&self) -> Result<()> {
let file = File::open(&self.file)?;
let options = ReadOptionsBuilder::new().with_page_index().build();
let reader = SerializedFileReader::new_with_options(file, options)?;
let schema = reader.metadata().file_metadata().schema_descr();
let column_idx = schema
.columns()
.iter()
.position(|x| x.name() == self.column.as_str())
.ok_or_else(|| {
ParquetError::General(format!("Failed to find column {}", self.column))
})?;
// Column index data for all row groups and columns
let column_index = reader
.metadata()
.column_index()
.ok_or_else(|| ParquetError::General("Column index not found".to_string()))?;
// Offset index data for all row groups and columns
let offset_index = reader
.metadata()
.offset_index()
.ok_or_else(|| ParquetError::General("Offset index not found".to_string()))?;
// Iterate through each row group
for (row_group_idx, ((column_indices, offset_indices), row_group)) in column_index
.iter()
.zip(offset_index)
.zip(reader.metadata().row_groups())
.enumerate()
{
println!("Row Group: {row_group_idx}");
let offset_index = offset_indices.get(column_idx).ok_or_else(|| {
ParquetError::General(format!(
"No offset index for row group {row_group_idx} column chunk {column_idx}"
))
})?;
let row_counts = compute_row_counts(offset_index, row_group.num_rows());
match &column_indices[column_idx] {
Index::NONE => println!("NO INDEX"),
Index::BOOLEAN(v) => print_index(&v.indexes, offset_index, &row_counts)?,
Index::INT32(v) => print_index(&v.indexes, offset_index, &row_counts)?,
Index::INT64(v) => print_index(&v.indexes, offset_index, &row_counts)?,
Index::INT96(v) => print_index(&v.indexes, offset_index, &row_counts)?,
Index::FLOAT(v) => print_index(&v.indexes, offset_index, &row_counts)?,
Index::DOUBLE(v) => print_index(&v.indexes, offset_index, &row_counts)?,
Index::BYTE_ARRAY(v) => print_index(&v.indexes, offset_index, &row_counts)?,
Index::FIXED_LEN_BYTE_ARRAY(v) => {
print_index(&v.indexes, offset_index, &row_counts)?
}
}
}
Ok(())
}
}
/// Computes the number of rows in each page within a column chunk
fn compute_row_counts(offset_index: &[PageLocation], rows: i64) -> Vec<i64> {
if offset_index.is_empty() {
return vec![];
}
let mut last = offset_index[0].first_row_index;
let mut out = Vec::with_capacity(offset_index.len());
for o in offset_index.iter().skip(1) {
out.push(o.first_row_index - last);
last = o.first_row_index;
}
out.push(rows - last);
out
}
/// Prints index information for a single column chunk
fn print_index<T: std::fmt::Display>(
column_index: &[PageIndex<T>],
offset_index: &[PageLocation],
row_counts: &[i64],
) -> Result<()> {
if column_index.len() != offset_index.len() {
return Err(ParquetError::General(format!(
"Index length mismatch, got {} and {}",
column_index.len(),
offset_index.len()
)));
}
for (idx, ((c, o), row_count)) in column_index
.iter()
.zip(offset_index)
.zip(row_counts)
.enumerate()
{
print!(
"Page {:>5} at offset {:#010x} with length {:>10} and row count {:>10}",
idx, o.offset, o.compressed_page_size, row_count
);
match &c.min {
Some(m) => print!(", min {m:>10}"),
None => print!(", min {:>10}", "NONE"),
}
match &c.max {
Some(m) => print!(", max {m:>10}"),
None => print!(", max {:>10}", "NONE"),
}
println!()
}
Ok(())
}
fn main() -> Result<()> {
Args::parse().run()
}

Просмотреть файл

@ -0,0 +1,236 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Binary that prints the physical layout of a parquet file
//!
//! # Install
//!
//! `parquet-layout` can be installed using `cargo`:
//! ```
//! cargo install parquet --features=cli
//! ```
//! After this `parquet-layout` should be available:
//! ```
//! parquet-layout XYZ.parquet
//! ```
//!
//! The binary can also be built from the source code and run as follows:
//! ```
//! cargo run --features=cli --bin parquet-layout XYZ.parquet
//! ```
use std::fs::File;
use std::io::Read;
use clap::Parser;
use serde::Serialize;
use thrift::protocol::TCompactInputProtocol;
use parquet::basic::{Compression, Encoding};
use parquet::errors::Result;
use parquet::file::reader::ChunkReader;
use parquet::format::PageHeader;
use parquet::thrift::TSerializable;
#[derive(Serialize, Debug)]
struct ParquetFile {
row_groups: Vec<RowGroup>,
}
#[derive(Serialize, Debug)]
struct RowGroup {
columns: Vec<ColumnChunk>,
row_count: i64,
}
#[derive(Serialize, Debug)]
struct ColumnChunk {
path: String,
has_offset_index: bool,
has_column_index: bool,
has_bloom_filter: bool,
pages: Vec<Page>,
}
#[derive(Serialize, Debug)]
struct Page {
compression: Option<&'static str>,
encoding: &'static str,
page_type: &'static str,
offset: u64,
compressed_bytes: i32,
uncompressed_bytes: i32,
header_bytes: i32,
num_values: i32,
}
fn do_layout<C: ChunkReader>(reader: &C) -> Result<ParquetFile> {
let metadata = parquet::file::footer::parse_metadata(reader)?;
let schema = metadata.file_metadata().schema_descr();
let row_groups = (0..metadata.num_row_groups())
.map(|row_group_idx| {
let row_group = metadata.row_group(row_group_idx);
let columns = row_group
.columns()
.iter()
.zip(schema.columns())
.map(|(column, column_schema)| {
let compression = compression(column.compression());
let mut pages = vec![];
let mut start = column
.dictionary_page_offset()
.unwrap_or_else(|| column.data_page_offset())
as u64;
let end = start + column.compressed_size() as u64;
while start != end {
let (header_len, header) = read_page_header(reader, start)?;
if let Some(dictionary) = header.dictionary_page_header {
pages.push(Page {
compression,
encoding: encoding(dictionary.encoding),
page_type: "dictionary",
offset: start,
compressed_bytes: header.compressed_page_size,
uncompressed_bytes: header.uncompressed_page_size,
header_bytes: header_len as _,
num_values: dictionary.num_values,
})
} else if let Some(data_page) = header.data_page_header {
pages.push(Page {
compression,
encoding: encoding(data_page.encoding),
page_type: "data_page_v1",
offset: start,
compressed_bytes: header.compressed_page_size,
uncompressed_bytes: header.uncompressed_page_size,
header_bytes: header_len as _,
num_values: data_page.num_values,
})
} else if let Some(data_page) = header.data_page_header_v2 {
let is_compressed = data_page.is_compressed.unwrap_or(true);
pages.push(Page {
compression: compression.filter(|_| is_compressed),
encoding: encoding(data_page.encoding),
page_type: "data_page_v2",
offset: start,
compressed_bytes: header.compressed_page_size,
uncompressed_bytes: header.uncompressed_page_size,
header_bytes: header_len as _,
num_values: data_page.num_values,
})
}
start += header.compressed_page_size as u64 + header_len as u64;
}
Ok(ColumnChunk {
path: column_schema.path().parts().join("."),
has_offset_index: column.offset_index_offset().is_some(),
has_column_index: column.column_index_offset().is_some(),
has_bloom_filter: column.bloom_filter_offset().is_some(),
pages,
})
})
.collect::<Result<Vec<_>>>()?;
Ok(RowGroup {
columns,
row_count: row_group.num_rows(),
})
})
.collect::<Result<Vec<_>>>()?;
Ok(ParquetFile { row_groups })
}
/// Reads the page header at `offset` from `reader`, returning
/// both the `PageHeader` and its length in bytes
fn read_page_header<C: ChunkReader>(reader: &C, offset: u64) -> Result<(usize, PageHeader)> {
struct TrackedRead<R>(R, usize);
impl<R: Read> Read for TrackedRead<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let v = self.0.read(buf)?;
self.1 += v;
Ok(v)
}
}
let input = reader.get_read(offset)?;
let mut tracked = TrackedRead(input, 0);
let mut prot = TCompactInputProtocol::new(&mut tracked);
let header = PageHeader::read_from_in_protocol(&mut prot)?;
Ok((tracked.1, header))
}
/// Returns a string representation for a given compression
fn compression(compression: Compression) -> Option<&'static str> {
match compression {
Compression::UNCOMPRESSED => None,
Compression::SNAPPY => Some("snappy"),
Compression::GZIP(_) => Some("gzip"),
Compression::LZO => Some("lzo"),
Compression::BROTLI(_) => Some("brotli"),
Compression::LZ4 => Some("lz4"),
Compression::ZSTD(_) => Some("zstd"),
Compression::LZ4_RAW => Some("lz4_raw"),
}
}
/// Returns a string representation for a given encoding
fn encoding(encoding: parquet::format::Encoding) -> &'static str {
match Encoding::try_from(encoding) {
Ok(Encoding::PLAIN) => "plain",
Ok(Encoding::PLAIN_DICTIONARY) => "plain_dictionary",
Ok(Encoding::RLE) => "rle",
#[allow(deprecated)]
Ok(Encoding::BIT_PACKED) => "bit_packed",
Ok(Encoding::DELTA_BINARY_PACKED) => "delta_binary_packed",
Ok(Encoding::DELTA_LENGTH_BYTE_ARRAY) => "delta_length_byte_array",
Ok(Encoding::DELTA_BYTE_ARRAY) => "delta_byte_array",
Ok(Encoding::RLE_DICTIONARY) => "rle_dictionary",
Ok(Encoding::BYTE_STREAM_SPLIT) => "byte_stream_split",
Err(_) => "unknown",
}
}
#[derive(Debug, Parser)]
#[clap(author, version, about("Prints the physical layout of a parquet file"), long_about = None)]
struct Args {
#[clap(help("Path to a parquet file"))]
file: String,
}
impl Args {
fn run(&self) -> Result<()> {
let file = File::open(&self.file)?;
let layout = do_layout(&file)?;
let out = std::io::stdout();
let writer = out.lock();
serde_json::to_writer_pretty(writer, &layout).unwrap();
Ok(())
}
}
fn main() -> Result<()> {
Args::parse().run()
}

Просмотреть файл

@ -0,0 +1,106 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Binary file to read data from a Parquet file.
//!
//! # Install
//!
//! `parquet-read` can be installed using `cargo`:
//! ```
//! cargo install parquet --features=cli
//! ```
//! After this `parquet-read` should be available:
//! ```
//! parquet-read XYZ.parquet
//! ```
//!
//! The binary can also be built from the source code and run as follows:
//! ```
//! cargo run --features=cli --bin parquet-read XYZ.parquet
//! ```
//!
//! Note that `parquet-read` reads full file schema, no projection or filtering is
//! applied.
use clap::Parser;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::record::Row;
use std::io::{self, Read};
use std::{fs::File, path::Path};
#[derive(Debug, Parser)]
#[clap(author, version, about("Binary file to read data from a Parquet file"), long_about = None)]
struct Args {
#[clap(help("Path to a parquet file, or - for stdin"))]
file_name: String,
#[clap(
short,
long,
default_value_t = 0_usize,
help("Number of records to read. When not provided or 0, all records are read")
)]
num_records: usize,
#[clap(short, long, help("Print Parquet file in JSON lines format"))]
json: bool,
}
fn main() {
let args = Args::parse();
let filename = args.file_name;
let num_records = args.num_records;
let json = args.json;
let parquet_reader: Box<dyn FileReader> = if filename == "-" {
let mut buf = Vec::new();
io::stdin()
.read_to_end(&mut buf)
.expect("Failed to read stdin into a buffer");
Box::new(
SerializedFileReader::new(bytes::Bytes::from(buf)).expect("Failed to create reader"),
)
} else {
let path = Path::new(&filename);
let file = File::open(path).expect("Unable to open file");
Box::new(SerializedFileReader::new(file).expect("Failed to create reader"))
};
// Use full schema as projected schema
let mut iter = parquet_reader
.get_row_iter(None)
.expect("Failed to create row iterator");
let mut start = 0;
let end = num_records;
let all_records = end == 0;
while all_records || start < end {
match iter.next() {
Some(row) => print_row(&row.unwrap(), json),
None => break,
};
start += 1;
}
}
fn print_row(row: &Row, json: bool) {
if json {
println!("{}", row.to_json_value())
} else {
println!("{row}");
}
}

Просмотреть файл

@ -0,0 +1,279 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Binary file to rewrite parquet files.
//!
//! # Install
//!
//! `parquet-rewrite` can be installed using `cargo`:
//! ```
//! cargo install parquet --features=cli
//! ```
//! After this `parquet-rewrite` should be available:
//! ```
//! parquet-rewrite -i XYZ.parquet -o XYZ2.parquet
//! ```
//!
//! The binary can also be built from the source code and run as follows:
//! ```
//! cargo run --features=cli --bin parquet-rewrite -- -i XYZ.parquet -o XYZ2.parquet
//! ```
use std::fs::File;
use arrow_array::RecordBatchReader;
use clap::{builder::PossibleValue, Parser, ValueEnum};
use parquet::{
arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
basic::Compression,
file::{
properties::{EnabledStatistics, WriterProperties, WriterVersion},
reader::FileReader,
serialized_reader::SerializedFileReader,
},
};
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
enum CompressionArgs {
/// No compression.
None,
/// Snappy
Snappy,
/// GZip
Gzip,
/// LZO
Lzo,
/// Brotli
Brotli,
/// LZ4
Lz4,
/// Zstd
Zstd,
/// LZ4 Raw
Lz4Raw,
}
impl From<CompressionArgs> for Compression {
fn from(value: CompressionArgs) -> Self {
match value {
CompressionArgs::None => Self::UNCOMPRESSED,
CompressionArgs::Snappy => Self::SNAPPY,
CompressionArgs::Gzip => Self::GZIP(Default::default()),
CompressionArgs::Lzo => Self::LZO,
CompressionArgs::Brotli => Self::BROTLI(Default::default()),
CompressionArgs::Lz4 => Self::LZ4,
CompressionArgs::Zstd => Self::ZSTD(Default::default()),
CompressionArgs::Lz4Raw => Self::LZ4_RAW,
}
}
}
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
enum EnabledStatisticsArgs {
/// Compute no statistics
None,
/// Compute chunk-level statistics but not page-level
Chunk,
/// Compute page-level and chunk-level statistics
Page,
}
impl From<EnabledStatisticsArgs> for EnabledStatistics {
fn from(value: EnabledStatisticsArgs) -> Self {
match value {
EnabledStatisticsArgs::None => Self::None,
EnabledStatisticsArgs::Chunk => Self::Chunk,
EnabledStatisticsArgs::Page => Self::Page,
}
}
}
#[derive(Clone, Copy, Debug)]
enum WriterVersionArgs {
Parquet1_0,
Parquet2_0,
}
impl ValueEnum for WriterVersionArgs {
fn value_variants<'a>() -> &'a [Self] {
&[Self::Parquet1_0, Self::Parquet2_0]
}
fn to_possible_value(&self) -> Option<PossibleValue> {
match self {
WriterVersionArgs::Parquet1_0 => Some(PossibleValue::new("1.0")),
WriterVersionArgs::Parquet2_0 => Some(PossibleValue::new("2.0")),
}
}
}
impl From<WriterVersionArgs> for WriterVersion {
fn from(value: WriterVersionArgs) -> Self {
match value {
WriterVersionArgs::Parquet1_0 => Self::PARQUET_1_0,
WriterVersionArgs::Parquet2_0 => Self::PARQUET_2_0,
}
}
}
#[derive(Debug, Parser)]
#[clap(author, version, about("Read and write parquet file with potentially different settings"), long_about = None)]
struct Args {
/// Path to input parquet file.
#[clap(short, long)]
input: String,
/// Path to output parquet file.
#[clap(short, long)]
output: String,
/// Compression used.
#[clap(long, value_enum)]
compression: Option<CompressionArgs>,
/// Sets maximum number of rows in a row group.
#[clap(long)]
max_row_group_size: Option<usize>,
/// Sets best effort maximum number of rows in a data page.
#[clap(long)]
data_page_row_count_limit: Option<usize>,
/// Sets best effort maximum size of a data page in bytes.
#[clap(long)]
data_page_size_limit: Option<usize>,
/// Sets max statistics size for any column.
///
/// Applicable only if statistics are enabled.
#[clap(long)]
max_statistics_size: Option<usize>,
/// Sets best effort maximum dictionary page size, in bytes.
#[clap(long)]
dictionary_page_size_limit: Option<usize>,
/// Sets whether bloom filter is enabled for any column.
#[clap(long)]
bloom_filter_enabled: Option<bool>,
/// Sets bloom filter false positive probability (fpp) for any column.
#[clap(long)]
bloom_filter_fpp: Option<f64>,
/// Sets number of distinct values (ndv) for bloom filter for any column.
#[clap(long)]
bloom_filter_ndv: Option<u64>,
/// Sets flag to enable/disable dictionary encoding for any column.
#[clap(long)]
dictionary_enabled: Option<bool>,
/// Sets flag to enable/disable statistics for any column.
#[clap(long)]
statistics_enabled: Option<EnabledStatisticsArgs>,
/// Sets writer version.
#[clap(long)]
writer_version: Option<WriterVersionArgs>,
}
fn main() {
let args = Args::parse();
// read key-value metadata
let parquet_reader =
SerializedFileReader::new(File::open(&args.input).expect("Unable to open input file"))
.expect("Failed to create reader");
let kv_md = parquet_reader
.metadata()
.file_metadata()
.key_value_metadata()
.cloned();
// create actual parquet reader
let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(
File::open(args.input).expect("Unable to open input file"),
)
.expect("parquet open")
.build()
.expect("parquet open");
let mut writer_properties_builder = WriterProperties::builder().set_key_value_metadata(kv_md);
if let Some(value) = args.compression {
writer_properties_builder = writer_properties_builder.set_compression(value.into());
}
if let Some(value) = args.max_row_group_size {
writer_properties_builder = writer_properties_builder.set_max_row_group_size(value);
}
if let Some(value) = args.data_page_row_count_limit {
writer_properties_builder = writer_properties_builder.set_data_page_row_count_limit(value);
}
if let Some(value) = args.data_page_size_limit {
writer_properties_builder = writer_properties_builder.set_data_page_size_limit(value);
}
if let Some(value) = args.dictionary_page_size_limit {
writer_properties_builder = writer_properties_builder.set_dictionary_page_size_limit(value);
}
if let Some(value) = args.max_statistics_size {
writer_properties_builder = writer_properties_builder.set_max_statistics_size(value);
}
if let Some(value) = args.bloom_filter_enabled {
writer_properties_builder = writer_properties_builder.set_bloom_filter_enabled(value);
if value {
if let Some(value) = args.bloom_filter_fpp {
writer_properties_builder = writer_properties_builder.set_bloom_filter_fpp(value);
}
if let Some(value) = args.bloom_filter_ndv {
writer_properties_builder = writer_properties_builder.set_bloom_filter_ndv(value);
}
}
}
if let Some(value) = args.dictionary_enabled {
writer_properties_builder = writer_properties_builder.set_dictionary_enabled(value);
}
if let Some(value) = args.statistics_enabled {
writer_properties_builder = writer_properties_builder.set_statistics_enabled(value.into());
}
if let Some(value) = args.writer_version {
writer_properties_builder = writer_properties_builder.set_writer_version(value.into());
}
let writer_properties = writer_properties_builder.build();
let mut parquet_writer = ArrowWriter::try_new(
File::create(&args.output).expect("Unable to open output file"),
parquet_reader.schema(),
Some(writer_properties),
)
.expect("create arrow writer");
for maybe_batch in parquet_reader {
let batch = maybe_batch.expect("reading batch");
parquet_writer.write(&batch).expect("writing data");
}
parquet_writer.close().expect("finalizing file");
}

Просмотреть файл

@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Binary file to return the number of rows found from Parquet file(s).
//!
//! # Install
//!
//! `parquet-rowcount` can be installed using `cargo`:
//! ```
//! cargo install parquet --features=cli
//! ```
//! After this `parquet-rowcount` should be available:
//! ```
//! parquet-rowcount XYZ.parquet
//! ```
//!
//! The binary can also be built from the source code and run as follows:
//! ```
//! cargo run --features=cli --bin parquet-rowcount XYZ.parquet ABC.parquet ZXC.parquet
//! ```
//!
//! Note that `parquet-rowcount` reads full file schema, no projection or filtering is
//! applied.
use clap::Parser;
use parquet::file::reader::{FileReader, SerializedFileReader};
use std::{fs::File, path::Path};
#[derive(Debug, Parser)]
#[clap(author, version, about("Binary file to return the number of rows found from Parquet file(s)"), long_about = None)]
struct Args {
#[clap(
number_of_values(1),
help("List of Parquet files to read from separated by space")
)]
file_paths: Vec<String>,
}
fn main() {
let args = Args::parse();
for filename in args.file_paths {
let path = Path::new(&filename);
let file = File::open(path).expect("Unable to open file");
let parquet_reader = SerializedFileReader::new(file).expect("Unable to read file");
let row_group_metadata = parquet_reader.metadata().row_groups();
let mut total_num_rows = 0;
for group_metadata in row_group_metadata {
total_num_rows += group_metadata.num_rows();
}
eprintln!("File {filename}: rowcount={total_num_rows}");
}
}

Просмотреть файл

@ -0,0 +1,75 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Binary file to print the schema and metadata of a Parquet file.
//!
//! # Install
//!
//! `parquet-schema` can be installed using `cargo`:
//! ```
//! cargo install parquet --features=cli
//! ```
//! After this `parquet-schema` should be available:
//! ```
//! parquet-schema XYZ.parquet
//! ```
//!
//! The binary can also be built from the source code and run as follows:
//! ```
//! cargo run --features=cli --bin parquet-schema XYZ.parquet
//! ```
//!
//! Note that `verbose` is an optional boolean flag that allows to print schema only,
//! when not provided or print full file metadata when provided.
use clap::Parser;
use parquet::{
file::reader::{FileReader, SerializedFileReader},
schema::printer::{print_file_metadata, print_parquet_metadata},
};
use std::{fs::File, path::Path};
#[derive(Debug, Parser)]
#[clap(author, version, about("Binary file to print the schema and metadata of a Parquet file"), long_about = None)]
struct Args {
#[clap(help("Path to the parquet file"))]
file_path: String,
#[clap(short, long, help("Enable printing full file metadata"))]
verbose: bool,
}
fn main() {
let args = Args::parse();
let filename = args.file_path;
let path = Path::new(&filename);
let file = File::open(path).expect("Unable to open file");
let verbose = args.verbose;
match SerializedFileReader::new(file) {
Err(e) => panic!("Error when parsing Parquet file: {e}"),
Ok(parquet_reader) => {
let metadata = parquet_reader.metadata();
println!("Metadata for file: {}", &filename);
println!();
if verbose {
print_parquet_metadata(&mut std::io::stdout(), metadata);
} else {
print_file_metadata(&mut std::io::stdout(), metadata.file_metadata());
}
}
}
}

Просмотреть файл

@ -0,0 +1,118 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Binary file to read bloom filter data from a Parquet file.
//!
//! # Install
//!
//! `parquet-show-bloom-filter` can be installed using `cargo`:
//! ```
//! cargo install parquet --features=cli
//! ```
//! After this `parquet-show-bloom-filter` should be available:
//! ```
//! parquet-show-bloom-filter XYZ.parquet id a
//! ```
//!
//! The binary can also be built from the source code and run as follows:
//! ```
//! cargo run --features=cli --bin parquet-show-bloom-filter -- --file-name XYZ.parquet --column id --values a
//! ```
use clap::Parser;
use parquet::file::{
properties::ReaderProperties,
reader::{FileReader, SerializedFileReader},
serialized_reader::ReadOptionsBuilder,
};
use std::{fs::File, path::Path};
#[derive(Debug, Parser)]
#[clap(author, version, about("Binary file to read bloom filter data from a Parquet file"), long_about = None)]
struct Args {
#[clap(help("Path to the parquet file"))]
file_name: String,
#[clap(help("Check the bloom filter indexes for the given column"))]
column: String,
#[clap(
help(
"Check if the given values match bloom filter, the values will be evaluated as strings"
),
required = true
)]
values: Vec<String>,
}
fn main() {
let args = Args::parse();
let file_name = args.file_name;
let path = Path::new(&file_name);
let file = File::open(path).expect("Unable to open file");
let file_reader = SerializedFileReader::new_with_options(
file,
ReadOptionsBuilder::new()
.with_reader_properties(
ReaderProperties::builder()
.set_read_bloom_filter(true)
.build(),
)
.build(),
)
.expect("Unable to open file as Parquet");
let metadata = file_reader.metadata();
for (ri, row_group) in metadata.row_groups().iter().enumerate() {
println!("Row group #{ri}");
println!("{}", "=".repeat(80));
if let Some((column_index, _)) = row_group
.columns()
.iter()
.enumerate()
.find(|(_, column)| column.column_path().string() == args.column)
{
let row_group_reader = file_reader
.get_row_group(ri)
.expect("Unable to read row group");
if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
args.values.iter().for_each(|value| {
println!(
"Value {} is {} in bloom filter",
value,
if sbbf.check(&value.as_str()) {
"present"
} else {
"absent"
}
)
});
} else {
println!("No bloom filter found for column {}", args.column);
}
} else {
println!(
"No column named {} found, candidate columns are: {}",
args.column,
row_group
.columns()
.iter()
.map(|c| c.column_path().string())
.collect::<Vec<_>>()
.join(", ")
);
}
}
}