From 576beb446f6f083413e54246ec1f6a9402782d7e Mon Sep 17 00:00:00 2001 From: Michael Spector Date: Mon, 18 Nov 2019 12:15:15 +0200 Subject: [PATCH] Allow selecting specific (top-level) columns from Parquet file --- Cargo.lock | 4 ++-- pq2json/src/main.rs | 37 +++++++++++++++++++++++++++++++++++-- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 385c4a2..1d0ca2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -37,7 +37,7 @@ dependencies = [ [[package]] name = "arrow" version = "0.14.0-SNAPSHOT" -source = "git+https://github.com/rzheka/arrow.git?branch=dev#81a421d2dbd4909eb941289ce7efa7b6ac95a29d" +source = "git+https://github.com/rzheka/arrow.git?branch=dev#6b48f2284be5ff2f8b8541e40d21947f089ce0c9" dependencies = [ "chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "csv 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -530,7 +530,7 @@ dependencies = [ [[package]] name = "parquet" version = "0.14.0-SNAPSHOT" -source = "git+https://github.com/rzheka/arrow.git?branch=dev#81a421d2dbd4909eb941289ce7efa7b6ac95a29d" +source = "git+https://github.com/rzheka/arrow.git?branch=dev#6b48f2284be5ff2f8b8541e40d21947f089ce0c9" dependencies = [ "arrow 0.14.0-SNAPSHOT (git+https://github.com/rzheka/arrow.git?branch=dev)", "brotli 2.5.1 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/pq2json/src/main.rs b/pq2json/src/main.rs index 4a5ca5d..fc8828a 100644 --- a/pq2json/src/main.rs +++ b/pq2json/src/main.rs @@ -3,8 +3,10 @@ use num_bigint::{BigInt, Sign}; use parquet::data_type::Decimal; use parquet::file::reader::{FileReader, SerializedFileReader}; use parquet::record::{FieldType, List, ListAccessor, Map, MapAccessor, Row, RowAccessor}; +use parquet::schema::types::{Type as SchemaType}; use parquet::schema::printer::{print_file_metadata, print_parquet_metadata}; use serde_json::{Number, Value}; +use std::collections::HashMap; use std::error::Error; use std::fs::File; use std::io::{self, BufWriter, Write}; @@ -60,6 +62,14 @@ fn main() { .takes_value(true) .required(false), ) + .arg( + Arg::with_name("columns") + .short("c") + .long("columns") + .help("Comma separated top-level columns to select") + .takes_value(true) + .required(false), + ) .arg( Arg::with_name("output") .short("o") @@ -100,11 +110,14 @@ fn main() { _ => TimestampRendering::IsoStr, }; + let columns: &str = matches.value_of("columns").unwrap_or(""); + let settings = Settings { omit_nulls: matches.is_present("omit-nulls") || matches.is_present("prune"), omit_empty_bags: matches.is_present("omit-empty-bags") || matches.is_present("prune"), omit_empty_lists: matches.is_present("omit-empty-lists") || matches.is_present("prune"), - timestamp_rendering, + timestamp_rendering: timestamp_rendering, + columns: if columns.is_empty() { None } else { Some(columns.split(",").map(|s| s.to_string()).collect()) } }; let res = if matches.is_present("schema") { @@ -128,6 +141,7 @@ struct Settings { omit_empty_bags: bool, omit_empty_lists: bool, timestamp_rendering: TimestampRendering, + columns: Option> } #[derive(Debug, Copy, Clone, Eq, PartialEq)] @@ -152,7 +166,10 @@ fn convert(settings: &Settings, input: &str, output: &str) -> Result<(), Box }; - let mut iter = reader.get_row_iter(None)?; + let schema = settings.columns.as_ref() + .map(|c| get_projected_schema(&reader, &c).unwrap()); + + let mut iter = reader.get_row_iter(schema)?; while let Some(record) = iter.next() { let value = top_level_row_to_value(settings, &record)?; writeln!(writer, "{}", serde_json::to_string(&value)?)?; @@ -179,6 +196,22 @@ fn print_schema(input: &str) -> Result<(), Box> { Ok(()) } +fn get_projected_schema(reader: &SerializedFileReader, columns: &Vec) -> Result> { + let file_meta = reader.metadata().file_metadata(); + let mut schema_fields = HashMap::new(); + for field in file_meta.schema().get_fields().iter() { + schema_fields.insert(field.name().to_owned(), field); + } + // TODO: return error if non-existent column specified + let mut projected_fields = columns.iter() + .map(|c| schema_fields[c].clone()) + .collect(); + Ok(SchemaType::group_type_builder("schema") + .with_fields(&mut projected_fields) + .build() + .unwrap()) +} + macro_rules! element_to_value { ($ft:expr, $obj:ident, $i:ident, $settings:ident) => { match $ft {