From ca4eba989fe1f5b9b4613421ff8771e9df9321c5 Mon Sep 17 00:00:00 2001 From: Michael Spector Date: Tue, 25 Jun 2024 17:22:44 +0300 Subject: [PATCH] Added missing files --- .../arrow-flight/src/bin/flight_sql_client.rs | 351 ++++++++ .../src/bin/arrow-file-to-stream.rs | 47 ++ .../src/bin/arrow-json-integration-test.rs | 183 +++++ .../src/bin/arrow-stream-to-file.rs | 34 + .../src/bin/flight-test-integration-client.rs | 70 ++ .../src/bin/flight-test-integration-server.rs | 60 ++ arrow-rs/parquet/src/bin/parquet-concat.rs | 118 +++ .../parquet/src/bin/parquet-fromcsv-help.txt | 75 ++ arrow-rs/parquet/src/bin/parquet-fromcsv.rs | 748 ++++++++++++++++++ arrow-rs/parquet/src/bin/parquet-index.rs | 172 ++++ arrow-rs/parquet/src/bin/parquet-layout.rs | 236 ++++++ arrow-rs/parquet/src/bin/parquet-read.rs | 106 +++ arrow-rs/parquet/src/bin/parquet-rewrite.rs | 279 +++++++ arrow-rs/parquet/src/bin/parquet-rowcount.rs | 69 ++ arrow-rs/parquet/src/bin/parquet-schema.rs | 75 ++ .../src/bin/parquet-show-bloom-filter.rs | 118 +++ 16 files changed, 2741 insertions(+) create mode 100644 arrow-rs/arrow-flight/src/bin/flight_sql_client.rs create mode 100644 arrow-rs/arrow-integration-testing/src/bin/arrow-file-to-stream.rs create mode 100644 arrow-rs/arrow-integration-testing/src/bin/arrow-json-integration-test.rs create mode 100644 arrow-rs/arrow-integration-testing/src/bin/arrow-stream-to-file.rs create mode 100644 arrow-rs/arrow-integration-testing/src/bin/flight-test-integration-client.rs create mode 100644 arrow-rs/arrow-integration-testing/src/bin/flight-test-integration-server.rs create mode 100644 arrow-rs/parquet/src/bin/parquet-concat.rs create mode 100644 arrow-rs/parquet/src/bin/parquet-fromcsv-help.txt create mode 100644 arrow-rs/parquet/src/bin/parquet-fromcsv.rs create mode 100644 arrow-rs/parquet/src/bin/parquet-index.rs create mode 100644 arrow-rs/parquet/src/bin/parquet-layout.rs create mode 100644 arrow-rs/parquet/src/bin/parquet-read.rs create mode 100644 arrow-rs/parquet/src/bin/parquet-rewrite.rs create mode 100644 arrow-rs/parquet/src/bin/parquet-rowcount.rs create mode 100644 arrow-rs/parquet/src/bin/parquet-schema.rs create mode 100644 arrow-rs/parquet/src/bin/parquet-show-bloom-filter.rs diff --git a/arrow-rs/arrow-flight/src/bin/flight_sql_client.rs b/arrow-rs/arrow-flight/src/bin/flight_sql_client.rs new file mode 100644 index 0000000..296efc1 --- /dev/null +++ b/arrow-rs/arrow-flight/src/bin/flight_sql_client.rs @@ -0,0 +1,351 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{sync::Arc, time::Duration}; + +use anyhow::{bail, Context, Result}; +use arrow_array::{ArrayRef, Datum, RecordBatch, StringArray}; +use arrow_cast::{cast_with_options, pretty::pretty_format_batches, CastOptions}; +use arrow_flight::{sql::client::FlightSqlServiceClient, FlightInfo}; +use arrow_schema::Schema; +use clap::{Parser, Subcommand}; +use futures::TryStreamExt; +use tonic::{ + metadata::MetadataMap, + transport::{Channel, ClientTlsConfig, Endpoint}, +}; +use tracing_log::log::info; + +/// Logging CLI config. +#[derive(Debug, Parser)] +pub struct LoggingArgs { + /// Log verbosity. + /// + /// Defaults to "warn". + /// + /// Use `-v` for "info", `-vv` for "debug", `-vvv` for "trace". + /// + /// Note you can also set logging level using `RUST_LOG` environment variable: + /// `RUST_LOG=debug`. + #[clap( + short = 'v', + long = "verbose", + action = clap::ArgAction::Count, + )] + log_verbose_count: u8, +} + +#[derive(Debug, Parser)] +struct ClientArgs { + /// Additional headers. + /// + /// Can be given multiple times. Headers and values are separated by '='. + /// + /// Example: `-H foo=bar -H baz=42` + #[clap(long = "header", short = 'H', value_parser = parse_key_val)] + headers: Vec<(String, String)>, + + /// Username. + /// + /// Optional. If given, `password` must also be set. + #[clap(long, requires = "password")] + username: Option, + + /// Password. + /// + /// Optional. If given, `username` must also be set. + #[clap(long, requires = "username")] + password: Option, + + /// Auth token. + #[clap(long)] + token: Option, + + /// Use TLS. + /// + /// If not provided, use cleartext connection. + #[clap(long)] + tls: bool, + + /// Server host. + /// + /// Required. + #[clap(long)] + host: String, + + /// Server port. + /// + /// Defaults to `443` if `tls` is set, otherwise defaults to `80`. + #[clap(long)] + port: Option, +} + +#[derive(Debug, Parser)] +struct Args { + /// Logging args. + #[clap(flatten)] + logging_args: LoggingArgs, + + /// Client args. + #[clap(flatten)] + client_args: ClientArgs, + + #[clap(subcommand)] + cmd: Command, +} + +/// Different available commands. +#[derive(Debug, Subcommand)] +enum Command { + /// Execute given statement. + StatementQuery { + /// SQL query. + /// + /// Required. + query: String, + }, + + /// Prepare given statement and then execute it. + PreparedStatementQuery { + /// SQL query. + /// + /// Required. + /// + /// Can contains placeholders like `$1`. + /// + /// Example: `SELECT * FROM t WHERE x = $1` + query: String, + + /// Additional parameters. + /// + /// Can be given multiple times. Names and values are separated by '='. Values will be + /// converted to the type that the server reported for the prepared statement. + /// + /// Example: `-p $1=42` + #[clap(short, value_parser = parse_key_val)] + params: Vec<(String, String)>, + }, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + setup_logging(args.logging_args)?; + let mut client = setup_client(args.client_args) + .await + .context("setup client")?; + + let flight_info = match args.cmd { + Command::StatementQuery { query } => client + .execute(query, None) + .await + .context("execute statement")?, + Command::PreparedStatementQuery { query, params } => { + let mut prepared_stmt = client + .prepare(query, None) + .await + .context("prepare statement")?; + + if !params.is_empty() { + prepared_stmt + .set_parameters( + construct_record_batch_from_params( + ¶ms, + prepared_stmt + .parameter_schema() + .context("get parameter schema")?, + ) + .context("construct parameters")?, + ) + .context("bind parameters")?; + } + + prepared_stmt + .execute() + .await + .context("execute prepared statement")? + } + }; + + let batches = execute_flight(&mut client, flight_info) + .await + .context("read flight data")?; + + let res = pretty_format_batches(batches.as_slice()).context("format results")?; + println!("{res}"); + + Ok(()) +} + +async fn execute_flight( + client: &mut FlightSqlServiceClient, + info: FlightInfo, +) -> Result> { + let schema = Arc::new(Schema::try_from(info.clone()).context("valid schema")?); + let mut batches = Vec::with_capacity(info.endpoint.len() + 1); + batches.push(RecordBatch::new_empty(schema)); + info!("decoded schema"); + + for endpoint in info.endpoint { + let Some(ticket) = &endpoint.ticket else { + bail!("did not get ticket"); + }; + + let mut flight_data = client.do_get(ticket.clone()).await.context("do get")?; + log_metadata(flight_data.headers(), "header"); + + let mut endpoint_batches: Vec<_> = (&mut flight_data) + .try_collect() + .await + .context("collect data stream")?; + batches.append(&mut endpoint_batches); + + if let Some(trailers) = flight_data.trailers() { + log_metadata(&trailers, "trailer"); + } + } + info!("received data"); + + Ok(batches) +} + +fn construct_record_batch_from_params( + params: &[(String, String)], + parameter_schema: &Schema, +) -> Result { + let mut items = Vec::<(&String, ArrayRef)>::new(); + + for (name, value) in params { + let field = parameter_schema.field_with_name(name)?; + let value_as_array = StringArray::new_scalar(value); + let casted = cast_with_options( + value_as_array.get().0, + field.data_type(), + &CastOptions::default(), + )?; + items.push((name, casted)) + } + + Ok(RecordBatch::try_from_iter(items)?) +} + +fn setup_logging(args: LoggingArgs) -> Result<()> { + use tracing_subscriber::{util::SubscriberInitExt, EnvFilter, FmtSubscriber}; + + tracing_log::LogTracer::init().context("tracing log init")?; + + let filter = match args.log_verbose_count { + 0 => "warn", + 1 => "info", + 2 => "debug", + _ => "trace", + }; + let filter = EnvFilter::try_new(filter).context("set up log env filter")?; + + let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish(); + subscriber.try_init().context("init logging subscriber")?; + + Ok(()) +} + +async fn setup_client(args: ClientArgs) -> Result> { + let port = args.port.unwrap_or(if args.tls { 443 } else { 80 }); + + let protocol = if args.tls { "https" } else { "http" }; + + let mut endpoint = Endpoint::new(format!("{}://{}:{}", protocol, args.host, port)) + .context("create endpoint")? + .connect_timeout(Duration::from_secs(20)) + .timeout(Duration::from_secs(20)) + .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait + .tcp_keepalive(Option::Some(Duration::from_secs(3600))) + .http2_keep_alive_interval(Duration::from_secs(300)) + .keep_alive_timeout(Duration::from_secs(20)) + .keep_alive_while_idle(true); + + if args.tls { + let tls_config = ClientTlsConfig::new(); + endpoint = endpoint + .tls_config(tls_config) + .context("create TLS endpoint")?; + } + + let channel = endpoint.connect().await.context("connect to endpoint")?; + + let mut client = FlightSqlServiceClient::new(channel); + info!("connected"); + + for (k, v) in args.headers { + client.set_header(k, v); + } + + if let Some(token) = args.token { + client.set_token(token); + info!("token set"); + } + + match (args.username, args.password) { + (None, None) => {} + (Some(username), Some(password)) => { + client + .handshake(&username, &password) + .await + .context("handshake")?; + info!("performed handshake"); + } + (Some(_), None) => { + bail!("when username is set, you also need to set a password") + } + (None, Some(_)) => { + bail!("when password is set, you also need to set a username") + } + } + + Ok(client) +} + +/// Parse a single key-value pair +fn parse_key_val(s: &str) -> Result<(String, String), String> { + let pos = s + .find('=') + .ok_or_else(|| format!("invalid KEY=value: no `=` found in `{s}`"))?; + Ok((s[..pos].to_owned(), s[pos + 1..].to_owned())) +} + +/// Log headers/trailers. +fn log_metadata(map: &MetadataMap, what: &'static str) { + for k_v in map.iter() { + match k_v { + tonic::metadata::KeyAndValueRef::Ascii(k, v) => { + info!( + "{}: {}={}", + what, + k.as_str(), + v.to_str().unwrap_or(""), + ); + } + tonic::metadata::KeyAndValueRef::Binary(k, v) => { + info!( + "{}: {}={}", + what, + k.as_str(), + String::from_utf8_lossy(v.as_ref()), + ); + } + } + } +} diff --git a/arrow-rs/arrow-integration-testing/src/bin/arrow-file-to-stream.rs b/arrow-rs/arrow-integration-testing/src/bin/arrow-file-to-stream.rs new file mode 100644 index 0000000..3e027fa --- /dev/null +++ b/arrow-rs/arrow-integration-testing/src/bin/arrow-file-to-stream.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::error::Result; +use arrow::ipc::reader::FileReader; +use arrow::ipc::writer::StreamWriter; +use clap::Parser; +use std::fs::File; +use std::io::{self, BufReader}; + +#[derive(Debug, Parser)] +#[clap(author, version, about("Read an arrow file and stream to stdout"), long_about = None)] +struct Args { + file_name: String, +} + +fn main() -> Result<()> { + let args = Args::parse(); + let f = File::open(args.file_name)?; + let reader = BufReader::new(f); + let mut reader = FileReader::try_new(reader, None)?; + let schema = reader.schema(); + + let mut writer = StreamWriter::try_new(io::stdout(), &schema)?; + + reader.try_for_each(|batch| { + let batch = batch?; + writer.write(&batch) + })?; + writer.finish()?; + + Ok(()) +} diff --git a/arrow-rs/arrow-integration-testing/src/bin/arrow-json-integration-test.rs b/arrow-rs/arrow-integration-testing/src/bin/arrow-json-integration-test.rs new file mode 100644 index 0000000..cc3dd21 --- /dev/null +++ b/arrow-rs/arrow-integration-testing/src/bin/arrow-json-integration-test.rs @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::error::{ArrowError, Result}; +use arrow::ipc::reader::FileReader; +use arrow::ipc::writer::FileWriter; +use arrow_integration_test::*; +use arrow_integration_testing::{canonicalize_schema, open_json_file}; +use clap::Parser; +use std::fs::File; + +#[derive(clap::ValueEnum, Debug, Clone)] +#[clap(rename_all = "SCREAMING_SNAKE_CASE")] +enum Mode { + ArrowToJson, + JsonToArrow, + Validate, +} + +#[derive(Debug, Parser)] +#[clap(author, version, about("rust arrow-json-integration-test"), long_about = None)] +struct Args { + #[clap(short, long)] + integration: bool, + #[clap(short, long, help("Path to ARROW file"))] + arrow: String, + #[clap(short, long, help("Path to JSON file"))] + json: String, + #[clap( + value_enum, + short, + long, + default_value = "VALIDATE", + help = "Mode of integration testing tool" + )] + mode: Mode, + #[clap(short, long)] + verbose: bool, +} + +fn main() -> Result<()> { + let args = Args::parse(); + let arrow_file = args.arrow; + let json_file = args.json; + let verbose = args.verbose; + match args.mode { + Mode::JsonToArrow => json_to_arrow(&json_file, &arrow_file, verbose), + Mode::ArrowToJson => arrow_to_json(&arrow_file, &json_file, verbose), + Mode::Validate => validate(&arrow_file, &json_file, verbose), + } +} + +fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()> { + if verbose { + eprintln!("Converting {json_name} to {arrow_name}"); + } + + let json_file = open_json_file(json_name)?; + + let arrow_file = File::create(arrow_name)?; + let mut writer = FileWriter::try_new(arrow_file, &json_file.schema)?; + + for b in json_file.read_batches()? { + writer.write(&b)?; + } + + writer.finish()?; + + Ok(()) +} + +fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { + if verbose { + eprintln!("Converting {arrow_name} to {json_name}"); + } + + let arrow_file = File::open(arrow_name)?; + let reader = FileReader::try_new(arrow_file, None)?; + + let mut fields: Vec = vec![]; + for f in reader.schema().fields() { + fields.push(ArrowJsonField::from(f)); + } + let schema = ArrowJsonSchema { + fields, + metadata: None, + }; + + let batches = reader + .map(|batch| Ok(ArrowJsonBatch::from_batch(&batch?))) + .collect::>>()?; + + let arrow_json = ArrowJson { + schema, + batches, + dictionaries: None, + }; + + let json_file = File::create(json_name)?; + serde_json::to_writer(&json_file, &arrow_json).unwrap(); + + Ok(()) +} + +fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { + if verbose { + eprintln!("Validating {arrow_name} and {json_name}"); + } + + // open JSON file + let json_file = open_json_file(json_name)?; + + // open Arrow file + let arrow_file = File::open(arrow_name)?; + let mut arrow_reader = FileReader::try_new(arrow_file, None)?; + let arrow_schema = arrow_reader.schema().as_ref().to_owned(); + + // compare schemas + if canonicalize_schema(&json_file.schema) != canonicalize_schema(&arrow_schema) { + return Err(ArrowError::ComputeError(format!( + "Schemas do not match. JSON: {:?}. Arrow: {:?}", + json_file.schema, arrow_schema + ))); + } + + let json_batches = json_file.read_batches()?; + + // compare number of batches + assert!( + json_batches.len() == arrow_reader.num_batches(), + "JSON batches and Arrow batches are unequal" + ); + + if verbose { + eprintln!( + "Schemas match. JSON file has {} batches.", + json_batches.len() + ); + } + + for json_batch in json_batches { + if let Some(Ok(arrow_batch)) = arrow_reader.next() { + // compare batches + let num_columns = arrow_batch.num_columns(); + assert!(num_columns == json_batch.num_columns()); + assert!(arrow_batch.num_rows() == json_batch.num_rows()); + + for i in 0..num_columns { + assert_eq!( + arrow_batch.column(i).as_ref(), + json_batch.column(i).as_ref(), + "Arrow and JSON batch columns not the same" + ); + } + } else { + return Err(ArrowError::ComputeError( + "no more arrow batches left".to_owned(), + )); + } + } + + if arrow_reader.next().is_some() { + return Err(ArrowError::ComputeError( + "no more json batches left".to_owned(), + )); + } + + Ok(()) +} diff --git a/arrow-rs/arrow-integration-testing/src/bin/arrow-stream-to-file.rs b/arrow-rs/arrow-integration-testing/src/bin/arrow-stream-to-file.rs new file mode 100644 index 0000000..07ac5c7 --- /dev/null +++ b/arrow-rs/arrow-integration-testing/src/bin/arrow-stream-to-file.rs @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::io; + +use arrow::error::Result; +use arrow::ipc::reader::StreamReader; +use arrow::ipc::writer::FileWriter; + +fn main() -> Result<()> { + let mut arrow_stream_reader = StreamReader::try_new(io::stdin(), None)?; + let schema = arrow_stream_reader.schema(); + + let mut writer = FileWriter::try_new(io::stdout(), &schema)?; + + arrow_stream_reader.try_for_each(|batch| writer.write(&batch?))?; + writer.finish()?; + + Ok(()) +} diff --git a/arrow-rs/arrow-integration-testing/src/bin/flight-test-integration-client.rs b/arrow-rs/arrow-integration-testing/src/bin/flight-test-integration-client.rs new file mode 100644 index 0000000..b8bbb95 --- /dev/null +++ b/arrow-rs/arrow-integration-testing/src/bin/flight-test-integration-client.rs @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_integration_testing::flight_client_scenarios; +use clap::Parser; +type Error = Box; +type Result = std::result::Result; + +#[derive(clap::ValueEnum, Debug, Clone)] +enum Scenario { + Middleware, + #[clap(name = "auth:basic_proto")] + AuthBasicProto, +} + +#[derive(Debug, Parser)] +#[clap(author, version, about("rust flight-test-integration-client"), long_about = None)] +struct Args { + #[clap(long, help = "host of flight server")] + host: String, + #[clap(long, help = "port of flight server")] + port: u16, + #[clap( + short, + long, + help = "path to the descriptor file, only used when scenario is not provided. See https://arrow.apache.org/docs/format/Integration.html#json-test-data-format" + )] + path: Option, + #[clap(long, value_enum)] + scenario: Option, +} + +#[tokio::main] +async fn main() -> Result { + #[cfg(feature = "logging")] + tracing_subscriber::fmt::init(); + + let args = Args::parse(); + let host = args.host; + let port = args.port; + + match args.scenario { + Some(Scenario::Middleware) => { + flight_client_scenarios::middleware::run_scenario(&host, port).await? + } + Some(Scenario::AuthBasicProto) => { + flight_client_scenarios::auth_basic_proto::run_scenario(&host, port).await? + } + None => { + let path = args.path.expect("No path is given"); + flight_client_scenarios::integration_test::run_scenario(&host, port, &path).await?; + } + } + + Ok(()) +} diff --git a/arrow-rs/arrow-integration-testing/src/bin/flight-test-integration-server.rs b/arrow-rs/arrow-integration-testing/src/bin/flight-test-integration-server.rs new file mode 100644 index 0000000..5310d07 --- /dev/null +++ b/arrow-rs/arrow-integration-testing/src/bin/flight-test-integration-server.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_integration_testing::flight_server_scenarios; +use clap::Parser; + +type Error = Box; +type Result = std::result::Result; + +#[derive(clap::ValueEnum, Debug, Clone)] +enum Scenario { + Middleware, + #[clap(name = "auth:basic_proto")] + AuthBasicProto, +} + +#[derive(Debug, Parser)] +#[clap(author, version, about("rust flight-test-integration-server"), long_about = None)] +struct Args { + #[clap(long)] + port: u16, + #[clap(long, value_enum)] + scenario: Option, +} + +#[tokio::main] +async fn main() -> Result { + #[cfg(feature = "logging")] + tracing_subscriber::fmt::init(); + + let args = Args::parse(); + let port = args.port; + + match args.scenario { + Some(Scenario::Middleware) => { + flight_server_scenarios::middleware::scenario_setup(port).await? + } + Some(Scenario::AuthBasicProto) => { + flight_server_scenarios::auth_basic_proto::scenario_setup(port).await? + } + None => { + flight_server_scenarios::integration_test::scenario_setup(port).await?; + } + } + Ok(()) +} diff --git a/arrow-rs/parquet/src/bin/parquet-concat.rs b/arrow-rs/parquet/src/bin/parquet-concat.rs new file mode 100644 index 0000000..9cbdf8e --- /dev/null +++ b/arrow-rs/parquet/src/bin/parquet-concat.rs @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary that concatenates the column data of one or more parquet files +//! +//! # Install +//! +//! `parquet-concat` can be installed using `cargo`: +//! ``` +//! cargo install parquet --features=cli +//! ``` +//! After this `parquet-concat` should be available: +//! ``` +//! parquet-concat out.parquet a.parquet b.parquet +//! ``` +//! +//! The binary can also be built from the source code and run as follows: +//! ``` +//! cargo run --features=cli --bin parquet-concat out.parquet a.parquet b.parquet +//! ``` +//! +//! Note: this does not currently support preserving the page index or bloom filters +//! + +use clap::Parser; +use parquet::column::writer::ColumnCloseResult; +use parquet::errors::{ParquetError, Result}; +use parquet::file::properties::WriterProperties; +use parquet::file::writer::SerializedFileWriter; +use std::fs::File; +use std::sync::Arc; + +#[derive(Debug, Parser)] +#[clap(author, version)] +/// Concatenates one or more parquet files +struct Args { + /// Path to output + output: String, + + /// Path to input files + input: Vec, +} + +impl Args { + fn run(&self) -> Result<()> { + if self.input.is_empty() { + return Err(ParquetError::General( + "Must provide at least one input file".into(), + )); + } + + let output = File::create(&self.output)?; + + let inputs = self + .input + .iter() + .map(|x| { + let reader = File::open(x)?; + let metadata = parquet::file::footer::parse_metadata(&reader)?; + Ok((reader, metadata)) + }) + .collect::>>()?; + + let expected = inputs[0].1.file_metadata().schema(); + for (_, metadata) in inputs.iter().skip(1) { + let actual = metadata.file_metadata().schema(); + if expected != actual { + return Err(ParquetError::General(format!( + "inputs must have the same schema, {expected:#?} vs {actual:#?}" + ))); + } + } + + let props = Arc::new(WriterProperties::builder().build()); + let schema = inputs[0].1.file_metadata().schema_descr().root_schema_ptr(); + let mut writer = SerializedFileWriter::new(output, schema, props)?; + + for (input, metadata) in inputs { + for rg in metadata.row_groups() { + let mut rg_out = writer.next_row_group()?; + for column in rg.columns() { + let result = ColumnCloseResult { + bytes_written: column.compressed_size() as _, + rows_written: rg.num_rows() as _, + metadata: column.clone(), + bloom_filter: None, + column_index: None, + offset_index: None, + }; + rg_out.append_column(&input, result)?; + } + rg_out.close()?; + } + } + + writer.close()?; + + Ok(()) + } +} + +fn main() -> Result<()> { + Args::parse().run() +} diff --git a/arrow-rs/parquet/src/bin/parquet-fromcsv-help.txt b/arrow-rs/parquet/src/bin/parquet-fromcsv-help.txt new file mode 100644 index 0000000..ac38c56 --- /dev/null +++ b/arrow-rs/parquet/src/bin/parquet-fromcsv-help.txt @@ -0,0 +1,75 @@ + +Usage: parquet [OPTIONS] --schema --input-file --output-file + +Options: + -s, --schema + message schema for output Parquet + + -i, --input-file + input CSV file + + -o, --output-file + output Parquet file + + -f, --input-format + input file format + + [default: csv] + [possible values: csv, tsv] + + -b, --batch-size + batch size + + [env: PARQUET_FROM_CSV_BATCHSIZE=] + [default: 1000] + + -h, --has-header + has header + + -d, --delimiter + field delimiter + + default value: when input_format==CSV: ',' when input_format==TSV: 'TAB' + + -r, --record-terminator + record terminator + + [possible values: lf, crlf, cr] + + -e, --escape-char + escape character + + -q, --quote-char + quote character + + -D, --double-quote + double quote + + [possible values: true, false] + + -C, --csv-compression + compression mode of csv + + [default: UNCOMPRESSED] + + -c, --parquet-compression + compression mode of parquet + + [default: SNAPPY] + + -w, --writer-version + writer version + + -m, --max-row-group-size + max row group size + + --enable-bloom-filter + whether to enable bloom filter writing + + [possible values: true, false] + + --help + display usage help + + -V, --version + Print version diff --git a/arrow-rs/parquet/src/bin/parquet-fromcsv.rs b/arrow-rs/parquet/src/bin/parquet-fromcsv.rs new file mode 100644 index 0000000..140a367 --- /dev/null +++ b/arrow-rs/parquet/src/bin/parquet-fromcsv.rs @@ -0,0 +1,748 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to converts csv to Parquet file +//! +//! # Install +//! +//! `parquet-fromcsv` can be installed using `cargo`: +//! +//! ```text +//! cargo install parquet --features=cli +//! ``` +//! +//! After this `parquet-fromcsv` should be available: +//! +//! ```text +//! parquet-fromcsv --schema message_schema_for_parquet.txt input.csv output.parquet +//! ``` +//! +//! The binary can also be built from the source code and run as follows: +//! +//! ```text +//! cargo run --features=cli --bin parquet-fromcsv --schema message_schema_for_parquet.txt \ +//! \ input.csv output.parquet +//! ``` +//! +//! # Options +//! +//! ```text +#![doc = include_str!("./parquet-fromcsv-help.txt")] // Update for this file : Run test test_command_help +//! ``` +//! +//! ## Parquet file options +//! +//! ```text +//! - `-b`, `--batch-size` : Batch size for Parquet +//! - `-c`, `--parquet-compression` : Compression option for Parquet, default is SNAPPY +//! - `-s`, `--schema` : Path to message schema for generated Parquet file +//! - `-o`, `--output-file` : Path to output Parquet file +//! - `-w`, `--writer-version` : Writer version +//! - `-m`, `--max-row-group-size` : Max row group size +//! - `--enable-bloom-filter` : Enable bloom filter during writing +//! ``` +//! +//! ## Input file options +//! +//! ```text +//! - `-i`, `--input-file` : Path to input CSV file +//! - `-f`, `--input-format` : Dialect for input file, `csv` or `tsv`. +//! - `-C`, `--csv-compression` : Compression option for csv, default is UNCOMPRESSED +//! - `-d`, `--delimiter : Field delimiter for CSV file, default depends `--input-format` +//! - `-e`, `--escape` : Escape character for input file +//! - `-h`, `--has-header` : Input has header +//! - `-r`, `--record-terminator` : Record terminator character for input. default is CRLF +//! - `-q`, `--quote-char` : Input quoting character +//! ``` +//! + +use std::{ + fmt::Display, + fs::{read_to_string, File}, + io::Read, + path::{Path, PathBuf}, + sync::Arc, +}; + +use arrow_csv::ReaderBuilder; +use arrow_schema::{ArrowError, Schema}; +use clap::{Parser, ValueEnum}; +use parquet::arrow::arrow_writer::ArrowWriterOptions; +use parquet::{ + arrow::{parquet_to_arrow_schema, ArrowWriter}, + basic::Compression, + errors::ParquetError, + file::properties::{WriterProperties, WriterVersion}, + schema::{parser::parse_message_type, types::SchemaDescriptor}, +}; + +#[derive(Debug)] +enum ParquetFromCsvError { + CommandLineParseError(clap::Error), + IoError(std::io::Error), + ArrowError(ArrowError), + ParquetError(ParquetError), + WithContext(String, Box), +} + +impl From for ParquetFromCsvError { + fn from(e: std::io::Error) -> Self { + Self::IoError(e) + } +} + +impl From for ParquetFromCsvError { + fn from(e: ArrowError) -> Self { + Self::ArrowError(e) + } +} + +impl From for ParquetFromCsvError { + fn from(e: ParquetError) -> Self { + Self::ParquetError(e) + } +} + +impl From for ParquetFromCsvError { + fn from(e: clap::Error) -> Self { + Self::CommandLineParseError(e) + } +} + +impl ParquetFromCsvError { + pub fn with_context>( + inner_error: E, + context: &str, + ) -> ParquetFromCsvError { + let inner = inner_error.into(); + ParquetFromCsvError::WithContext(context.to_string(), Box::new(inner)) + } +} + +impl Display for ParquetFromCsvError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ParquetFromCsvError::CommandLineParseError(e) => write!(f, "{e}"), + ParquetFromCsvError::IoError(e) => write!(f, "{e}"), + ParquetFromCsvError::ArrowError(e) => write!(f, "{e}"), + ParquetFromCsvError::ParquetError(e) => write!(f, "{e}"), + ParquetFromCsvError::WithContext(c, e) => { + writeln!(f, "{e}")?; + write!(f, "context: {c}") + } + } + } +} + +#[derive(Debug, Parser)] +#[clap(author, version, disable_help_flag=true, about("Binary to convert csv to Parquet"), long_about=None)] +struct Args { + /// Path to a text file containing a parquet schema definition + #[clap(short, long, help("message schema for output Parquet"))] + schema: PathBuf, + /// input CSV file path + #[clap(short, long, help("input CSV file"))] + input_file: PathBuf, + /// output Parquet file path + #[clap(short, long, help("output Parquet file"))] + output_file: PathBuf, + /// input file format + #[clap( + value_enum, + short('f'), + long, + help("input file format"), + default_value_t=CsvDialect::Csv + )] + input_format: CsvDialect, + /// batch size + #[clap( + short, + long, + help("batch size"), + default_value_t = 1000, + env = "PARQUET_FROM_CSV_BATCHSIZE" + )] + batch_size: usize, + /// has header line + #[clap(short, long, help("has header"))] + has_header: bool, + /// field delimiter + /// + /// default value: + /// when input_format==CSV: ',' + /// when input_format==TSV: 'TAB' + #[clap(short, long, help("field delimiter"))] + delimiter: Option, + #[clap(value_enum, short, long, help("record terminator"))] + record_terminator: Option, + #[clap(short, long, help("escape character"))] + escape_char: Option, + #[clap(short, long, help("quote character"))] + quote_char: Option, + #[clap(short('D'), long, help("double quote"))] + double_quote: Option, + #[clap(short('C'), long, help("compression mode of csv"), default_value_t=Compression::UNCOMPRESSED)] + #[clap(value_parser=compression_from_str)] + csv_compression: Compression, + #[clap(short('c'), long, help("compression mode of parquet"), default_value_t=Compression::SNAPPY)] + #[clap(value_parser=compression_from_str)] + parquet_compression: Compression, + + #[clap(short, long, help("writer version"))] + #[clap(value_parser=writer_version_from_str)] + writer_version: Option, + #[clap(short, long, help("max row group size"))] + max_row_group_size: Option, + #[clap(long, help("whether to enable bloom filter writing"))] + enable_bloom_filter: Option, + + #[clap(long, action=clap::ArgAction::Help, help("display usage help"))] + help: Option, +} + +fn compression_from_str(cmp: &str) -> Result { + match cmp.to_uppercase().as_str() { + "UNCOMPRESSED" => Ok(Compression::UNCOMPRESSED), + "SNAPPY" => Ok(Compression::SNAPPY), + "GZIP" => Ok(Compression::GZIP(Default::default())), + "LZO" => Ok(Compression::LZO), + "BROTLI" => Ok(Compression::BROTLI(Default::default())), + "LZ4" => Ok(Compression::LZ4), + "ZSTD" => Ok(Compression::ZSTD(Default::default())), + v => Err( + format!("Unknown compression {v} : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD \n\nFor more information try --help") + ) + } +} + +fn writer_version_from_str(cmp: &str) -> Result { + match cmp.to_uppercase().as_str() { + "1" => Ok(WriterVersion::PARQUET_1_0), + "2" => Ok(WriterVersion::PARQUET_2_0), + v => Err(format!("Unknown writer version {v} : possible values 1, 2")), + } +} + +impl Args { + fn schema_path(&self) -> &Path { + self.schema.as_path() + } + fn get_delimiter(&self) -> u8 { + match self.delimiter { + Some(ch) => ch as u8, + None => match self.input_format { + CsvDialect::Csv => b',', + CsvDialect::Tsv => b'\t', + }, + } + } + fn get_terminator(&self) -> Option { + match self.record_terminator { + Some(RecordTerminator::LF) => Some(0x0a), + Some(RecordTerminator::CR) => Some(0x0d), + Some(RecordTerminator::Crlf) => None, + None => match self.input_format { + CsvDialect::Csv => None, + CsvDialect::Tsv => Some(0x0a), + }, + } + } + fn get_escape(&self) -> Option { + self.escape_char.map(|ch| ch as u8) + } + fn get_quote(&self) -> Option { + if self.quote_char.is_none() { + match self.input_format { + CsvDialect::Csv => Some(b'\"'), + CsvDialect::Tsv => None, + } + } else { + self.quote_char.map(|c| c as u8) + } + } +} + +#[derive(Debug, Clone, Copy, ValueEnum, PartialEq)] +enum CsvDialect { + Csv, + Tsv, +} + +#[derive(Debug, Clone, Copy, ValueEnum, PartialEq)] +enum RecordTerminator { + LF, + Crlf, + CR, +} + +fn configure_writer_properties(args: &Args) -> WriterProperties { + let mut properties_builder = + WriterProperties::builder().set_compression(args.parquet_compression); + if let Some(writer_version) = args.writer_version { + properties_builder = properties_builder.set_writer_version(writer_version); + } + if let Some(max_row_group_size) = args.max_row_group_size { + properties_builder = properties_builder.set_max_row_group_size(max_row_group_size); + } + if let Some(enable_bloom_filter) = args.enable_bloom_filter { + properties_builder = properties_builder.set_bloom_filter_enabled(enable_bloom_filter); + } + properties_builder.build() +} + +fn configure_reader_builder(args: &Args, arrow_schema: Arc) -> ReaderBuilder { + fn configure_reader ReaderBuilder>( + builder: ReaderBuilder, + value: Option, + fun: F, + ) -> ReaderBuilder { + if let Some(val) = value { + fun(builder, val) + } else { + builder + } + } + + let mut builder = ReaderBuilder::new(arrow_schema) + .with_batch_size(args.batch_size) + .with_header(args.has_header) + .with_delimiter(args.get_delimiter()); + + builder = configure_reader( + builder, + args.get_terminator(), + ReaderBuilder::with_terminator, + ); + builder = configure_reader(builder, args.get_escape(), ReaderBuilder::with_escape); + builder = configure_reader(builder, args.get_quote(), ReaderBuilder::with_quote); + + builder +} + +fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> { + let schema = read_to_string(args.schema_path()).map_err(|e| { + ParquetFromCsvError::with_context( + e, + &format!("Failed to open schema file {:#?}", args.schema_path()), + ) + })?; + let parquet_schema = Arc::new(parse_message_type(&schema)?); + let desc = SchemaDescriptor::new(parquet_schema); + let arrow_schema = Arc::new(parquet_to_arrow_schema(&desc, None)?); + + // create output parquet writer + let parquet_file = File::create(&args.output_file).map_err(|e| { + ParquetFromCsvError::with_context( + e, + &format!("Failed to create output file {:#?}", &args.output_file), + ) + })?; + + let options = ArrowWriterOptions::new() + .with_properties(configure_writer_properties(args)) + .with_schema_root(desc.name().to_string()); + + let mut arrow_writer = + ArrowWriter::try_new_with_options(parquet_file, arrow_schema.clone(), options) + .map_err(|e| ParquetFromCsvError::with_context(e, "Failed to create ArrowWriter"))?; + + // open input file + let input_file = File::open(&args.input_file).map_err(|e| { + ParquetFromCsvError::with_context( + e, + &format!("Failed to open input file {:#?}", &args.input_file), + ) + })?; + + // open input file decoder + let input_file_decoder = match args.csv_compression { + Compression::UNCOMPRESSED => Box::new(input_file) as Box, + Compression::SNAPPY => Box::new(snap::read::FrameDecoder::new(input_file)) as Box, + Compression::GZIP(_) => { + Box::new(flate2::read::MultiGzDecoder::new(input_file)) as Box + } + Compression::BROTLI(_) => { + Box::new(brotli::Decompressor::new(input_file, 0)) as Box + } + Compression::LZ4 => { + Box::new(lz4_flex::frame::FrameDecoder::new(input_file)) as Box + } + Compression::ZSTD(_) => { + Box::new(zstd::Decoder::new(input_file).map_err(|e| { + ParquetFromCsvError::with_context(e, "Failed to create zstd::Decoder") + })?) as Box + } + d => unimplemented!("compression type {d}"), + }; + + // create input csv reader + let builder = configure_reader_builder(args, arrow_schema); + let reader = builder.build(input_file_decoder)?; + for batch_result in reader { + let batch = batch_result.map_err(|e| { + ParquetFromCsvError::with_context(e, "Failed to read RecordBatch from CSV") + })?; + arrow_writer.write(&batch).map_err(|e| { + ParquetFromCsvError::with_context(e, "Failed to write RecordBatch to parquet") + })?; + } + arrow_writer + .close() + .map_err(|e| ParquetFromCsvError::with_context(e, "Failed to close parquet"))?; + Ok(()) +} + +fn main() -> Result<(), ParquetFromCsvError> { + let args = Args::parse(); + convert_csv_to_parquet(&args) +} + +#[cfg(test)] +mod tests { + use std::{ + io::Write, + path::{Path, PathBuf}, + }; + + use super::*; + use arrow::datatypes::{DataType, Field}; + use brotli::CompressorWriter; + use clap::{CommandFactory, Parser}; + use flate2::write::GzEncoder; + use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel}; + use parquet::file::reader::{FileReader, SerializedFileReader}; + use snap::write::FrameEncoder; + use tempfile::NamedTempFile; + + #[test] + fn test_command_help() { + let mut cmd = Args::command(); + let dir = std::env::var("CARGO_MANIFEST_DIR").unwrap(); + let mut path_buf = PathBuf::from(dir); + path_buf.push("src"); + path_buf.push("bin"); + path_buf.push("parquet-fromcsv-help.txt"); + let expected = std::fs::read_to_string(path_buf).unwrap(); + let mut buffer_vec = Vec::new(); + let mut buffer = std::io::Cursor::new(&mut buffer_vec); + cmd.write_long_help(&mut buffer).unwrap(); + // Remove Parquet version string from the help text + let mut actual = String::from_utf8(buffer_vec).unwrap(); + let pos = actual.find('\n').unwrap() + 1; + actual = actual[pos..].to_string(); + assert_eq!( + expected, actual, + "help text not match. please update to \n---\n{actual}\n---\n" + ) + } + + fn parse_args(mut extra_args: Vec<&str>) -> Result { + let mut args = vec![ + "test", + "--schema", + "test.schema", + "--input-file", + "infile.csv", + "--output-file", + "out.parquet", + ]; + args.append(&mut extra_args); + let args = Args::try_parse_from(args.iter())?; + Ok(args) + } + + #[test] + fn test_parse_arg_minimum() -> Result<(), ParquetFromCsvError> { + let args = parse_args(vec![])?; + + assert_eq!(args.schema, PathBuf::from(Path::new("test.schema"))); + assert_eq!(args.input_file, PathBuf::from(Path::new("infile.csv"))); + assert_eq!(args.output_file, PathBuf::from(Path::new("out.parquet"))); + // test default values + assert_eq!(args.input_format, CsvDialect::Csv); + assert_eq!(args.batch_size, 1000); + assert!(!args.has_header); + assert_eq!(args.delimiter, None); + assert_eq!(args.get_delimiter(), b','); + assert_eq!(args.record_terminator, None); + assert_eq!(args.get_terminator(), None); // CRLF + assert_eq!(args.quote_char, None); + assert_eq!(args.get_quote(), Some(b'\"')); + assert_eq!(args.double_quote, None); + assert_eq!(args.parquet_compression, Compression::SNAPPY); + Ok(()) + } + + #[test] + fn test_parse_arg_format_variants() -> Result<(), ParquetFromCsvError> { + let args = parse_args(vec!["--input-format", "csv"])?; + assert_eq!(args.input_format, CsvDialect::Csv); + assert_eq!(args.get_delimiter(), b','); + assert_eq!(args.get_terminator(), None); // CRLF + assert_eq!(args.get_quote(), Some(b'\"')); + assert_eq!(args.get_escape(), None); + let args = parse_args(vec!["--input-format", "tsv"])?; + assert_eq!(args.input_format, CsvDialect::Tsv); + assert_eq!(args.get_delimiter(), b'\t'); + assert_eq!(args.get_terminator(), Some(b'\x0a')); // LF + assert_eq!(args.get_quote(), None); // quote none + assert_eq!(args.get_escape(), None); + + let args = parse_args(vec!["--input-format", "csv", "--escape-char", "\\"])?; + assert_eq!(args.input_format, CsvDialect::Csv); + assert_eq!(args.get_delimiter(), b','); + assert_eq!(args.get_terminator(), None); // CRLF + assert_eq!(args.get_quote(), Some(b'\"')); + assert_eq!(args.get_escape(), Some(b'\\')); + + let args = parse_args(vec!["--input-format", "tsv", "--delimiter", ":"])?; + assert_eq!(args.input_format, CsvDialect::Tsv); + assert_eq!(args.get_delimiter(), b':'); + assert_eq!(args.get_terminator(), Some(b'\x0a')); // LF + assert_eq!(args.get_quote(), None); // quote none + assert_eq!(args.get_escape(), None); + + Ok(()) + } + + #[test] + #[should_panic] + fn test_parse_arg_format_error() { + parse_args(vec!["--input-format", "excel"]).unwrap(); + } + + #[test] + fn test_parse_arg_compression_format() { + let args = parse_args(vec!["--parquet-compression", "uncompressed"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::UNCOMPRESSED); + let args = parse_args(vec!["--parquet-compression", "snappy"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::SNAPPY); + let args = parse_args(vec!["--parquet-compression", "gzip"]).unwrap(); + assert_eq!( + args.parquet_compression, + Compression::GZIP(Default::default()) + ); + let args = parse_args(vec!["--parquet-compression", "lzo"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::LZO); + let args = parse_args(vec!["--parquet-compression", "lz4"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::LZ4); + let args = parse_args(vec!["--parquet-compression", "brotli"]).unwrap(); + assert_eq!( + args.parquet_compression, + Compression::BROTLI(Default::default()) + ); + let args = parse_args(vec!["--parquet-compression", "zstd"]).unwrap(); + assert_eq!( + args.parquet_compression, + Compression::ZSTD(Default::default()) + ); + } + + #[test] + fn test_parse_arg_compression_format_fail() { + match parse_args(vec!["--parquet-compression", "zip"]) { + Ok(_) => panic!("unexpected success"), + Err(e) => assert_eq!( + format!("{e}"), + "error: invalid value 'zip' for '--parquet-compression ': Unknown compression ZIP : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD \n\nFor more information try --help\n"), + } + } + + fn assert_debug_text(debug_text: &str, name: &str, value: &str) { + let pattern = format!(" {name}: {value}"); + assert!( + debug_text.contains(&pattern), + "\"{debug_text}\" not contains \"{pattern}\"" + ) + } + + #[test] + fn test_configure_reader_builder() { + let args = Args { + schema: PathBuf::from(Path::new("schema.arvo")), + input_file: PathBuf::from(Path::new("test.csv")), + output_file: PathBuf::from(Path::new("out.parquet")), + batch_size: 1000, + input_format: CsvDialect::Csv, + has_header: false, + delimiter: None, + record_terminator: None, + escape_char: None, + quote_char: None, + double_quote: None, + csv_compression: Compression::UNCOMPRESSED, + parquet_compression: Compression::SNAPPY, + writer_version: None, + max_row_group_size: None, + enable_bloom_filter: None, + help: None, + }; + let arrow_schema = Arc::new(Schema::new(vec![ + Field::new("field1", DataType::Utf8, false), + Field::new("field2", DataType::Utf8, false), + Field::new("field3", DataType::Utf8, false), + Field::new("field4", DataType::Utf8, false), + Field::new("field5", DataType::Utf8, false), + ])); + + let reader_builder = configure_reader_builder(&args, arrow_schema); + let builder_debug = format!("{reader_builder:?}"); + assert_debug_text(&builder_debug, "header", "false"); + assert_debug_text(&builder_debug, "delimiter", "Some(44)"); + assert_debug_text(&builder_debug, "quote", "Some(34)"); + assert_debug_text(&builder_debug, "terminator", "None"); + assert_debug_text(&builder_debug, "batch_size", "1000"); + assert_debug_text(&builder_debug, "escape", "None"); + + let args = Args { + schema: PathBuf::from(Path::new("schema.arvo")), + input_file: PathBuf::from(Path::new("test.csv")), + output_file: PathBuf::from(Path::new("out.parquet")), + batch_size: 2000, + input_format: CsvDialect::Tsv, + has_header: true, + delimiter: None, + record_terminator: None, + escape_char: Some('\\'), + quote_char: None, + double_quote: None, + csv_compression: Compression::UNCOMPRESSED, + parquet_compression: Compression::SNAPPY, + writer_version: None, + max_row_group_size: None, + enable_bloom_filter: None, + help: None, + }; + let arrow_schema = Arc::new(Schema::new(vec![ + Field::new("field1", DataType::Utf8, false), + Field::new("field2", DataType::Utf8, false), + Field::new("field3", DataType::Utf8, false), + Field::new("field4", DataType::Utf8, false), + Field::new("field5", DataType::Utf8, false), + ])); + let reader_builder = configure_reader_builder(&args, arrow_schema); + let builder_debug = format!("{reader_builder:?}"); + assert_debug_text(&builder_debug, "header", "true"); + assert_debug_text(&builder_debug, "delimiter", "Some(9)"); + assert_debug_text(&builder_debug, "quote", "None"); + assert_debug_text(&builder_debug, "terminator", "Some(10)"); + assert_debug_text(&builder_debug, "batch_size", "2000"); + assert_debug_text(&builder_debug, "escape", "Some(92)"); + } + + fn test_convert_compressed_csv_to_parquet(csv_compression: Compression) { + let schema = NamedTempFile::new().unwrap(); + let schema_text = r"message my_amazing_schema { + optional int32 id; + optional binary name (STRING); + }"; + schema.as_file().write_all(schema_text.as_bytes()).unwrap(); + + let mut input_file = NamedTempFile::new().unwrap(); + + fn write_tmp_file(w: &mut T) { + for index in 1..2000 { + write!(w, "{index},\"name_{index}\"\r\n").unwrap(); + } + w.flush().unwrap(); + } + + // make sure the input_file's lifetime being long enough + input_file = match csv_compression { + Compression::UNCOMPRESSED => { + write_tmp_file(&mut input_file); + input_file + } + Compression::SNAPPY => { + let mut encoder = FrameEncoder::new(input_file); + write_tmp_file(&mut encoder); + encoder.into_inner().unwrap() + } + Compression::GZIP(level) => { + let mut encoder = GzEncoder::new( + input_file, + flate2::Compression::new(level.compression_level()), + ); + write_tmp_file(&mut encoder); + encoder.finish().unwrap() + } + Compression::BROTLI(level) => { + let mut encoder = + CompressorWriter::new(input_file, 0, level.compression_level(), 0); + write_tmp_file(&mut encoder); + encoder.into_inner() + } + Compression::LZ4 => { + let mut encoder = lz4_flex::frame::FrameEncoder::new(input_file); + write_tmp_file(&mut encoder); + encoder.finish().unwrap() + } + + Compression::ZSTD(level) => { + let mut encoder = zstd::Encoder::new(input_file, level.compression_level()) + .map_err(|e| { + ParquetFromCsvError::with_context(e, "Failed to create zstd::Encoder") + }) + .unwrap(); + write_tmp_file(&mut encoder); + encoder.finish().unwrap() + } + d => unimplemented!("compression type {d}"), + }; + + let output_parquet = NamedTempFile::new().unwrap(); + + let args = Args { + schema: PathBuf::from(schema.path()), + input_file: PathBuf::from(input_file.path()), + output_file: PathBuf::from(output_parquet.path()), + batch_size: 1000, + input_format: CsvDialect::Csv, + has_header: false, + delimiter: None, + record_terminator: None, + escape_char: None, + quote_char: None, + double_quote: None, + csv_compression, + parquet_compression: Compression::SNAPPY, + writer_version: None, + max_row_group_size: None, + // by default we shall test bloom filter writing + enable_bloom_filter: Some(true), + help: None, + }; + convert_csv_to_parquet(&args).unwrap(); + + let file = SerializedFileReader::new(output_parquet.into_file()).unwrap(); + let schema_name = file.metadata().file_metadata().schema().name(); + assert_eq!(schema_name, "my_amazing_schema"); + } + + #[test] + fn test_convert_csv_to_parquet() { + test_convert_compressed_csv_to_parquet(Compression::UNCOMPRESSED); + test_convert_compressed_csv_to_parquet(Compression::SNAPPY); + test_convert_compressed_csv_to_parquet(Compression::GZIP(GzipLevel::try_new(1).unwrap())); + test_convert_compressed_csv_to_parquet(Compression::BROTLI( + BrotliLevel::try_new(2).unwrap(), + )); + test_convert_compressed_csv_to_parquet(Compression::LZ4); + test_convert_compressed_csv_to_parquet(Compression::ZSTD(ZstdLevel::try_new(1).unwrap())); + } +} diff --git a/arrow-rs/parquet/src/bin/parquet-index.rs b/arrow-rs/parquet/src/bin/parquet-index.rs new file mode 100644 index 0000000..86e08b6 --- /dev/null +++ b/arrow-rs/parquet/src/bin/parquet-index.rs @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary that prints the [page index] of a parquet file +//! +//! # Install +//! +//! `parquet-layout` can be installed using `cargo`: +//! ``` +//! cargo install parquet --features=cli +//! ``` +//! After this `parquet-index` should be available: +//! ``` +//! parquet-index XYZ.parquet COLUMN_NAME +//! ``` +//! +//! The binary can also be built from the source code and run as follows: +//! ``` +//! cargo run --features=cli --bin parquet-index XYZ.parquet COLUMN_NAME +//! +//! [page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + +use clap::Parser; +use parquet::errors::{ParquetError, Result}; +use parquet::file::page_index::index::{Index, PageIndex}; +use parquet::file::reader::{FileReader, SerializedFileReader}; +use parquet::file::serialized_reader::ReadOptionsBuilder; +use parquet::format::PageLocation; +use std::fs::File; + +#[derive(Debug, Parser)] +#[clap(author, version, about("Prints the page index of a parquet file"), long_about = None)] +struct Args { + #[clap(help("Path to a parquet file"))] + file: String, + + #[clap(help("Column name to print"))] + column: String, +} + +impl Args { + fn run(&self) -> Result<()> { + let file = File::open(&self.file)?; + let options = ReadOptionsBuilder::new().with_page_index().build(); + let reader = SerializedFileReader::new_with_options(file, options)?; + + let schema = reader.metadata().file_metadata().schema_descr(); + let column_idx = schema + .columns() + .iter() + .position(|x| x.name() == self.column.as_str()) + .ok_or_else(|| { + ParquetError::General(format!("Failed to find column {}", self.column)) + })?; + + // Column index data for all row groups and columns + let column_index = reader + .metadata() + .column_index() + .ok_or_else(|| ParquetError::General("Column index not found".to_string()))?; + + // Offset index data for all row groups and columns + let offset_index = reader + .metadata() + .offset_index() + .ok_or_else(|| ParquetError::General("Offset index not found".to_string()))?; + + // Iterate through each row group + for (row_group_idx, ((column_indices, offset_indices), row_group)) in column_index + .iter() + .zip(offset_index) + .zip(reader.metadata().row_groups()) + .enumerate() + { + println!("Row Group: {row_group_idx}"); + let offset_index = offset_indices.get(column_idx).ok_or_else(|| { + ParquetError::General(format!( + "No offset index for row group {row_group_idx} column chunk {column_idx}" + )) + })?; + + let row_counts = compute_row_counts(offset_index, row_group.num_rows()); + match &column_indices[column_idx] { + Index::NONE => println!("NO INDEX"), + Index::BOOLEAN(v) => print_index(&v.indexes, offset_index, &row_counts)?, + Index::INT32(v) => print_index(&v.indexes, offset_index, &row_counts)?, + Index::INT64(v) => print_index(&v.indexes, offset_index, &row_counts)?, + Index::INT96(v) => print_index(&v.indexes, offset_index, &row_counts)?, + Index::FLOAT(v) => print_index(&v.indexes, offset_index, &row_counts)?, + Index::DOUBLE(v) => print_index(&v.indexes, offset_index, &row_counts)?, + Index::BYTE_ARRAY(v) => print_index(&v.indexes, offset_index, &row_counts)?, + Index::FIXED_LEN_BYTE_ARRAY(v) => { + print_index(&v.indexes, offset_index, &row_counts)? + } + } + } + Ok(()) + } +} + +/// Computes the number of rows in each page within a column chunk +fn compute_row_counts(offset_index: &[PageLocation], rows: i64) -> Vec { + if offset_index.is_empty() { + return vec![]; + } + + let mut last = offset_index[0].first_row_index; + let mut out = Vec::with_capacity(offset_index.len()); + for o in offset_index.iter().skip(1) { + out.push(o.first_row_index - last); + last = o.first_row_index; + } + out.push(rows - last); + out +} + +/// Prints index information for a single column chunk +fn print_index( + column_index: &[PageIndex], + offset_index: &[PageLocation], + row_counts: &[i64], +) -> Result<()> { + if column_index.len() != offset_index.len() { + return Err(ParquetError::General(format!( + "Index length mismatch, got {} and {}", + column_index.len(), + offset_index.len() + ))); + } + + for (idx, ((c, o), row_count)) in column_index + .iter() + .zip(offset_index) + .zip(row_counts) + .enumerate() + { + print!( + "Page {:>5} at offset {:#010x} with length {:>10} and row count {:>10}", + idx, o.offset, o.compressed_page_size, row_count + ); + match &c.min { + Some(m) => print!(", min {m:>10}"), + None => print!(", min {:>10}", "NONE"), + } + + match &c.max { + Some(m) => print!(", max {m:>10}"), + None => print!(", max {:>10}", "NONE"), + } + println!() + } + + Ok(()) +} + +fn main() -> Result<()> { + Args::parse().run() +} diff --git a/arrow-rs/parquet/src/bin/parquet-layout.rs b/arrow-rs/parquet/src/bin/parquet-layout.rs new file mode 100644 index 0000000..79a0acb --- /dev/null +++ b/arrow-rs/parquet/src/bin/parquet-layout.rs @@ -0,0 +1,236 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary that prints the physical layout of a parquet file +//! +//! # Install +//! +//! `parquet-layout` can be installed using `cargo`: +//! ``` +//! cargo install parquet --features=cli +//! ``` +//! After this `parquet-layout` should be available: +//! ``` +//! parquet-layout XYZ.parquet +//! ``` +//! +//! The binary can also be built from the source code and run as follows: +//! ``` +//! cargo run --features=cli --bin parquet-layout XYZ.parquet +//! ``` + +use std::fs::File; +use std::io::Read; + +use clap::Parser; +use serde::Serialize; +use thrift::protocol::TCompactInputProtocol; + +use parquet::basic::{Compression, Encoding}; +use parquet::errors::Result; +use parquet::file::reader::ChunkReader; +use parquet::format::PageHeader; +use parquet::thrift::TSerializable; + +#[derive(Serialize, Debug)] +struct ParquetFile { + row_groups: Vec, +} + +#[derive(Serialize, Debug)] +struct RowGroup { + columns: Vec, + row_count: i64, +} + +#[derive(Serialize, Debug)] +struct ColumnChunk { + path: String, + has_offset_index: bool, + has_column_index: bool, + has_bloom_filter: bool, + pages: Vec, +} + +#[derive(Serialize, Debug)] +struct Page { + compression: Option<&'static str>, + encoding: &'static str, + page_type: &'static str, + offset: u64, + compressed_bytes: i32, + uncompressed_bytes: i32, + header_bytes: i32, + num_values: i32, +} + +fn do_layout(reader: &C) -> Result { + let metadata = parquet::file::footer::parse_metadata(reader)?; + let schema = metadata.file_metadata().schema_descr(); + + let row_groups = (0..metadata.num_row_groups()) + .map(|row_group_idx| { + let row_group = metadata.row_group(row_group_idx); + let columns = row_group + .columns() + .iter() + .zip(schema.columns()) + .map(|(column, column_schema)| { + let compression = compression(column.compression()); + let mut pages = vec![]; + + let mut start = column + .dictionary_page_offset() + .unwrap_or_else(|| column.data_page_offset()) + as u64; + + let end = start + column.compressed_size() as u64; + while start != end { + let (header_len, header) = read_page_header(reader, start)?; + if let Some(dictionary) = header.dictionary_page_header { + pages.push(Page { + compression, + encoding: encoding(dictionary.encoding), + page_type: "dictionary", + offset: start, + compressed_bytes: header.compressed_page_size, + uncompressed_bytes: header.uncompressed_page_size, + header_bytes: header_len as _, + num_values: dictionary.num_values, + }) + } else if let Some(data_page) = header.data_page_header { + pages.push(Page { + compression, + encoding: encoding(data_page.encoding), + page_type: "data_page_v1", + offset: start, + compressed_bytes: header.compressed_page_size, + uncompressed_bytes: header.uncompressed_page_size, + header_bytes: header_len as _, + num_values: data_page.num_values, + }) + } else if let Some(data_page) = header.data_page_header_v2 { + let is_compressed = data_page.is_compressed.unwrap_or(true); + + pages.push(Page { + compression: compression.filter(|_| is_compressed), + encoding: encoding(data_page.encoding), + page_type: "data_page_v2", + offset: start, + compressed_bytes: header.compressed_page_size, + uncompressed_bytes: header.uncompressed_page_size, + header_bytes: header_len as _, + num_values: data_page.num_values, + }) + } + start += header.compressed_page_size as u64 + header_len as u64; + } + + Ok(ColumnChunk { + path: column_schema.path().parts().join("."), + has_offset_index: column.offset_index_offset().is_some(), + has_column_index: column.column_index_offset().is_some(), + has_bloom_filter: column.bloom_filter_offset().is_some(), + pages, + }) + }) + .collect::>>()?; + + Ok(RowGroup { + columns, + row_count: row_group.num_rows(), + }) + }) + .collect::>>()?; + + Ok(ParquetFile { row_groups }) +} + +/// Reads the page header at `offset` from `reader`, returning +/// both the `PageHeader` and its length in bytes +fn read_page_header(reader: &C, offset: u64) -> Result<(usize, PageHeader)> { + struct TrackedRead(R, usize); + + impl Read for TrackedRead { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let v = self.0.read(buf)?; + self.1 += v; + Ok(v) + } + } + + let input = reader.get_read(offset)?; + let mut tracked = TrackedRead(input, 0); + let mut prot = TCompactInputProtocol::new(&mut tracked); + let header = PageHeader::read_from_in_protocol(&mut prot)?; + Ok((tracked.1, header)) +} + +/// Returns a string representation for a given compression +fn compression(compression: Compression) -> Option<&'static str> { + match compression { + Compression::UNCOMPRESSED => None, + Compression::SNAPPY => Some("snappy"), + Compression::GZIP(_) => Some("gzip"), + Compression::LZO => Some("lzo"), + Compression::BROTLI(_) => Some("brotli"), + Compression::LZ4 => Some("lz4"), + Compression::ZSTD(_) => Some("zstd"), + Compression::LZ4_RAW => Some("lz4_raw"), + } +} + +/// Returns a string representation for a given encoding +fn encoding(encoding: parquet::format::Encoding) -> &'static str { + match Encoding::try_from(encoding) { + Ok(Encoding::PLAIN) => "plain", + Ok(Encoding::PLAIN_DICTIONARY) => "plain_dictionary", + Ok(Encoding::RLE) => "rle", + #[allow(deprecated)] + Ok(Encoding::BIT_PACKED) => "bit_packed", + Ok(Encoding::DELTA_BINARY_PACKED) => "delta_binary_packed", + Ok(Encoding::DELTA_LENGTH_BYTE_ARRAY) => "delta_length_byte_array", + Ok(Encoding::DELTA_BYTE_ARRAY) => "delta_byte_array", + Ok(Encoding::RLE_DICTIONARY) => "rle_dictionary", + Ok(Encoding::BYTE_STREAM_SPLIT) => "byte_stream_split", + Err(_) => "unknown", + } +} + +#[derive(Debug, Parser)] +#[clap(author, version, about("Prints the physical layout of a parquet file"), long_about = None)] +struct Args { + #[clap(help("Path to a parquet file"))] + file: String, +} + +impl Args { + fn run(&self) -> Result<()> { + let file = File::open(&self.file)?; + let layout = do_layout(&file)?; + + let out = std::io::stdout(); + let writer = out.lock(); + + serde_json::to_writer_pretty(writer, &layout).unwrap(); + Ok(()) + } +} + +fn main() -> Result<()> { + Args::parse().run() +} diff --git a/arrow-rs/parquet/src/bin/parquet-read.rs b/arrow-rs/parquet/src/bin/parquet-read.rs new file mode 100644 index 0000000..fe486e6 --- /dev/null +++ b/arrow-rs/parquet/src/bin/parquet-read.rs @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to read data from a Parquet file. +//! +//! # Install +//! +//! `parquet-read` can be installed using `cargo`: +//! ``` +//! cargo install parquet --features=cli +//! ``` +//! After this `parquet-read` should be available: +//! ``` +//! parquet-read XYZ.parquet +//! ``` +//! +//! The binary can also be built from the source code and run as follows: +//! ``` +//! cargo run --features=cli --bin parquet-read XYZ.parquet +//! ``` +//! +//! Note that `parquet-read` reads full file schema, no projection or filtering is +//! applied. + +use clap::Parser; +use parquet::file::reader::{FileReader, SerializedFileReader}; +use parquet::record::Row; +use std::io::{self, Read}; +use std::{fs::File, path::Path}; + +#[derive(Debug, Parser)] +#[clap(author, version, about("Binary file to read data from a Parquet file"), long_about = None)] +struct Args { + #[clap(help("Path to a parquet file, or - for stdin"))] + file_name: String, + #[clap( + short, + long, + default_value_t = 0_usize, + help("Number of records to read. When not provided or 0, all records are read") + )] + num_records: usize, + #[clap(short, long, help("Print Parquet file in JSON lines format"))] + json: bool, +} + +fn main() { + let args = Args::parse(); + + let filename = args.file_name; + let num_records = args.num_records; + let json = args.json; + + let parquet_reader: Box = if filename == "-" { + let mut buf = Vec::new(); + io::stdin() + .read_to_end(&mut buf) + .expect("Failed to read stdin into a buffer"); + Box::new( + SerializedFileReader::new(bytes::Bytes::from(buf)).expect("Failed to create reader"), + ) + } else { + let path = Path::new(&filename); + let file = File::open(path).expect("Unable to open file"); + Box::new(SerializedFileReader::new(file).expect("Failed to create reader")) + }; + + // Use full schema as projected schema + let mut iter = parquet_reader + .get_row_iter(None) + .expect("Failed to create row iterator"); + + let mut start = 0; + let end = num_records; + let all_records = end == 0; + + while all_records || start < end { + match iter.next() { + Some(row) => print_row(&row.unwrap(), json), + None => break, + }; + start += 1; + } +} + +fn print_row(row: &Row, json: bool) { + if json { + println!("{}", row.to_json_value()) + } else { + println!("{row}"); + } +} diff --git a/arrow-rs/parquet/src/bin/parquet-rewrite.rs b/arrow-rs/parquet/src/bin/parquet-rewrite.rs new file mode 100644 index 0000000..ad0f7ae --- /dev/null +++ b/arrow-rs/parquet/src/bin/parquet-rewrite.rs @@ -0,0 +1,279 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to rewrite parquet files. +//! +//! # Install +//! +//! `parquet-rewrite` can be installed using `cargo`: +//! ``` +//! cargo install parquet --features=cli +//! ``` +//! After this `parquet-rewrite` should be available: +//! ``` +//! parquet-rewrite -i XYZ.parquet -o XYZ2.parquet +//! ``` +//! +//! The binary can also be built from the source code and run as follows: +//! ``` +//! cargo run --features=cli --bin parquet-rewrite -- -i XYZ.parquet -o XYZ2.parquet +//! ``` + +use std::fs::File; + +use arrow_array::RecordBatchReader; +use clap::{builder::PossibleValue, Parser, ValueEnum}; +use parquet::{ + arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter}, + basic::Compression, + file::{ + properties::{EnabledStatistics, WriterProperties, WriterVersion}, + reader::FileReader, + serialized_reader::SerializedFileReader, + }, +}; + +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)] +enum CompressionArgs { + /// No compression. + None, + + /// Snappy + Snappy, + + /// GZip + Gzip, + + /// LZO + Lzo, + + /// Brotli + Brotli, + + /// LZ4 + Lz4, + + /// Zstd + Zstd, + + /// LZ4 Raw + Lz4Raw, +} + +impl From for Compression { + fn from(value: CompressionArgs) -> Self { + match value { + CompressionArgs::None => Self::UNCOMPRESSED, + CompressionArgs::Snappy => Self::SNAPPY, + CompressionArgs::Gzip => Self::GZIP(Default::default()), + CompressionArgs::Lzo => Self::LZO, + CompressionArgs::Brotli => Self::BROTLI(Default::default()), + CompressionArgs::Lz4 => Self::LZ4, + CompressionArgs::Zstd => Self::ZSTD(Default::default()), + CompressionArgs::Lz4Raw => Self::LZ4_RAW, + } + } +} + +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)] +enum EnabledStatisticsArgs { + /// Compute no statistics + None, + + /// Compute chunk-level statistics but not page-level + Chunk, + + /// Compute page-level and chunk-level statistics + Page, +} + +impl From for EnabledStatistics { + fn from(value: EnabledStatisticsArgs) -> Self { + match value { + EnabledStatisticsArgs::None => Self::None, + EnabledStatisticsArgs::Chunk => Self::Chunk, + EnabledStatisticsArgs::Page => Self::Page, + } + } +} + +#[derive(Clone, Copy, Debug)] +enum WriterVersionArgs { + Parquet1_0, + Parquet2_0, +} + +impl ValueEnum for WriterVersionArgs { + fn value_variants<'a>() -> &'a [Self] { + &[Self::Parquet1_0, Self::Parquet2_0] + } + + fn to_possible_value(&self) -> Option { + match self { + WriterVersionArgs::Parquet1_0 => Some(PossibleValue::new("1.0")), + WriterVersionArgs::Parquet2_0 => Some(PossibleValue::new("2.0")), + } + } +} + +impl From for WriterVersion { + fn from(value: WriterVersionArgs) -> Self { + match value { + WriterVersionArgs::Parquet1_0 => Self::PARQUET_1_0, + WriterVersionArgs::Parquet2_0 => Self::PARQUET_2_0, + } + } +} + +#[derive(Debug, Parser)] +#[clap(author, version, about("Read and write parquet file with potentially different settings"), long_about = None)] +struct Args { + /// Path to input parquet file. + #[clap(short, long)] + input: String, + + /// Path to output parquet file. + #[clap(short, long)] + output: String, + + /// Compression used. + #[clap(long, value_enum)] + compression: Option, + + /// Sets maximum number of rows in a row group. + #[clap(long)] + max_row_group_size: Option, + + /// Sets best effort maximum number of rows in a data page. + #[clap(long)] + data_page_row_count_limit: Option, + + /// Sets best effort maximum size of a data page in bytes. + #[clap(long)] + data_page_size_limit: Option, + + /// Sets max statistics size for any column. + /// + /// Applicable only if statistics are enabled. + #[clap(long)] + max_statistics_size: Option, + + /// Sets best effort maximum dictionary page size, in bytes. + #[clap(long)] + dictionary_page_size_limit: Option, + + /// Sets whether bloom filter is enabled for any column. + #[clap(long)] + bloom_filter_enabled: Option, + + /// Sets bloom filter false positive probability (fpp) for any column. + #[clap(long)] + bloom_filter_fpp: Option, + + /// Sets number of distinct values (ndv) for bloom filter for any column. + #[clap(long)] + bloom_filter_ndv: Option, + + /// Sets flag to enable/disable dictionary encoding for any column. + #[clap(long)] + dictionary_enabled: Option, + + /// Sets flag to enable/disable statistics for any column. + #[clap(long)] + statistics_enabled: Option, + + /// Sets writer version. + #[clap(long)] + writer_version: Option, +} + +fn main() { + let args = Args::parse(); + + // read key-value metadata + let parquet_reader = + SerializedFileReader::new(File::open(&args.input).expect("Unable to open input file")) + .expect("Failed to create reader"); + let kv_md = parquet_reader + .metadata() + .file_metadata() + .key_value_metadata() + .cloned(); + + // create actual parquet reader + let parquet_reader = ParquetRecordBatchReaderBuilder::try_new( + File::open(args.input).expect("Unable to open input file"), + ) + .expect("parquet open") + .build() + .expect("parquet open"); + + let mut writer_properties_builder = WriterProperties::builder().set_key_value_metadata(kv_md); + if let Some(value) = args.compression { + writer_properties_builder = writer_properties_builder.set_compression(value.into()); + } + if let Some(value) = args.max_row_group_size { + writer_properties_builder = writer_properties_builder.set_max_row_group_size(value); + } + if let Some(value) = args.data_page_row_count_limit { + writer_properties_builder = writer_properties_builder.set_data_page_row_count_limit(value); + } + if let Some(value) = args.data_page_size_limit { + writer_properties_builder = writer_properties_builder.set_data_page_size_limit(value); + } + if let Some(value) = args.dictionary_page_size_limit { + writer_properties_builder = writer_properties_builder.set_dictionary_page_size_limit(value); + } + if let Some(value) = args.max_statistics_size { + writer_properties_builder = writer_properties_builder.set_max_statistics_size(value); + } + if let Some(value) = args.bloom_filter_enabled { + writer_properties_builder = writer_properties_builder.set_bloom_filter_enabled(value); + + if value { + if let Some(value) = args.bloom_filter_fpp { + writer_properties_builder = writer_properties_builder.set_bloom_filter_fpp(value); + } + if let Some(value) = args.bloom_filter_ndv { + writer_properties_builder = writer_properties_builder.set_bloom_filter_ndv(value); + } + } + } + if let Some(value) = args.dictionary_enabled { + writer_properties_builder = writer_properties_builder.set_dictionary_enabled(value); + } + if let Some(value) = args.statistics_enabled { + writer_properties_builder = writer_properties_builder.set_statistics_enabled(value.into()); + } + if let Some(value) = args.writer_version { + writer_properties_builder = writer_properties_builder.set_writer_version(value.into()); + } + let writer_properties = writer_properties_builder.build(); + let mut parquet_writer = ArrowWriter::try_new( + File::create(&args.output).expect("Unable to open output file"), + parquet_reader.schema(), + Some(writer_properties), + ) + .expect("create arrow writer"); + + for maybe_batch in parquet_reader { + let batch = maybe_batch.expect("reading batch"); + parquet_writer.write(&batch).expect("writing data"); + } + + parquet_writer.close().expect("finalizing file"); +} diff --git a/arrow-rs/parquet/src/bin/parquet-rowcount.rs b/arrow-rs/parquet/src/bin/parquet-rowcount.rs new file mode 100644 index 0000000..07e4bd1 --- /dev/null +++ b/arrow-rs/parquet/src/bin/parquet-rowcount.rs @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to return the number of rows found from Parquet file(s). +//! +//! # Install +//! +//! `parquet-rowcount` can be installed using `cargo`: +//! ``` +//! cargo install parquet --features=cli +//! ``` +//! After this `parquet-rowcount` should be available: +//! ``` +//! parquet-rowcount XYZ.parquet +//! ``` +//! +//! The binary can also be built from the source code and run as follows: +//! ``` +//! cargo run --features=cli --bin parquet-rowcount XYZ.parquet ABC.parquet ZXC.parquet +//! ``` +//! +//! Note that `parquet-rowcount` reads full file schema, no projection or filtering is +//! applied. + +use clap::Parser; +use parquet::file::reader::{FileReader, SerializedFileReader}; +use std::{fs::File, path::Path}; + +#[derive(Debug, Parser)] +#[clap(author, version, about("Binary file to return the number of rows found from Parquet file(s)"), long_about = None)] +struct Args { + #[clap( + number_of_values(1), + help("List of Parquet files to read from separated by space") + )] + file_paths: Vec, +} + +fn main() { + let args = Args::parse(); + + for filename in args.file_paths { + let path = Path::new(&filename); + let file = File::open(path).expect("Unable to open file"); + let parquet_reader = SerializedFileReader::new(file).expect("Unable to read file"); + let row_group_metadata = parquet_reader.metadata().row_groups(); + let mut total_num_rows = 0; + + for group_metadata in row_group_metadata { + total_num_rows += group_metadata.num_rows(); + } + + eprintln!("File {filename}: rowcount={total_num_rows}"); + } +} diff --git a/arrow-rs/parquet/src/bin/parquet-schema.rs b/arrow-rs/parquet/src/bin/parquet-schema.rs new file mode 100644 index 0000000..bfcb77d --- /dev/null +++ b/arrow-rs/parquet/src/bin/parquet-schema.rs @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to print the schema and metadata of a Parquet file. +//! +//! # Install +//! +//! `parquet-schema` can be installed using `cargo`: +//! ``` +//! cargo install parquet --features=cli +//! ``` +//! After this `parquet-schema` should be available: +//! ``` +//! parquet-schema XYZ.parquet +//! ``` +//! +//! The binary can also be built from the source code and run as follows: +//! ``` +//! cargo run --features=cli --bin parquet-schema XYZ.parquet +//! ``` +//! +//! Note that `verbose` is an optional boolean flag that allows to print schema only, +//! when not provided or print full file metadata when provided. + +use clap::Parser; +use parquet::{ + file::reader::{FileReader, SerializedFileReader}, + schema::printer::{print_file_metadata, print_parquet_metadata}, +}; +use std::{fs::File, path::Path}; + +#[derive(Debug, Parser)] +#[clap(author, version, about("Binary file to print the schema and metadata of a Parquet file"), long_about = None)] +struct Args { + #[clap(help("Path to the parquet file"))] + file_path: String, + #[clap(short, long, help("Enable printing full file metadata"))] + verbose: bool, +} + +fn main() { + let args = Args::parse(); + let filename = args.file_path; + let path = Path::new(&filename); + let file = File::open(path).expect("Unable to open file"); + let verbose = args.verbose; + + match SerializedFileReader::new(file) { + Err(e) => panic!("Error when parsing Parquet file: {e}"), + Ok(parquet_reader) => { + let metadata = parquet_reader.metadata(); + println!("Metadata for file: {}", &filename); + println!(); + if verbose { + print_parquet_metadata(&mut std::io::stdout(), metadata); + } else { + print_file_metadata(&mut std::io::stdout(), metadata.file_metadata()); + } + } + } +} diff --git a/arrow-rs/parquet/src/bin/parquet-show-bloom-filter.rs b/arrow-rs/parquet/src/bin/parquet-show-bloom-filter.rs new file mode 100644 index 0000000..b1b3325 --- /dev/null +++ b/arrow-rs/parquet/src/bin/parquet-show-bloom-filter.rs @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to read bloom filter data from a Parquet file. +//! +//! # Install +//! +//! `parquet-show-bloom-filter` can be installed using `cargo`: +//! ``` +//! cargo install parquet --features=cli +//! ``` +//! After this `parquet-show-bloom-filter` should be available: +//! ``` +//! parquet-show-bloom-filter XYZ.parquet id a +//! ``` +//! +//! The binary can also be built from the source code and run as follows: +//! ``` +//! cargo run --features=cli --bin parquet-show-bloom-filter -- --file-name XYZ.parquet --column id --values a +//! ``` + +use clap::Parser; +use parquet::file::{ + properties::ReaderProperties, + reader::{FileReader, SerializedFileReader}, + serialized_reader::ReadOptionsBuilder, +}; +use std::{fs::File, path::Path}; + +#[derive(Debug, Parser)] +#[clap(author, version, about("Binary file to read bloom filter data from a Parquet file"), long_about = None)] +struct Args { + #[clap(help("Path to the parquet file"))] + file_name: String, + #[clap(help("Check the bloom filter indexes for the given column"))] + column: String, + #[clap( + help( + "Check if the given values match bloom filter, the values will be evaluated as strings" + ), + required = true + )] + values: Vec, +} + +fn main() { + let args = Args::parse(); + let file_name = args.file_name; + let path = Path::new(&file_name); + let file = File::open(path).expect("Unable to open file"); + + let file_reader = SerializedFileReader::new_with_options( + file, + ReadOptionsBuilder::new() + .with_reader_properties( + ReaderProperties::builder() + .set_read_bloom_filter(true) + .build(), + ) + .build(), + ) + .expect("Unable to open file as Parquet"); + let metadata = file_reader.metadata(); + for (ri, row_group) in metadata.row_groups().iter().enumerate() { + println!("Row group #{ri}"); + println!("{}", "=".repeat(80)); + if let Some((column_index, _)) = row_group + .columns() + .iter() + .enumerate() + .find(|(_, column)| column.column_path().string() == args.column) + { + let row_group_reader = file_reader + .get_row_group(ri) + .expect("Unable to read row group"); + if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) { + args.values.iter().for_each(|value| { + println!( + "Value {} is {} in bloom filter", + value, + if sbbf.check(&value.as_str()) { + "present" + } else { + "absent" + } + ) + }); + } else { + println!("No bloom filter found for column {}", args.column); + } + } else { + println!( + "No column named {} found, candidate columns are: {}", + args.column, + row_group + .columns() + .iter() + .map(|c| c.column_path().string()) + .collect::>() + .join(", ") + ); + } + } +}