Allow selecting specific (top-level) columns from Parquet file

This commit is contained in:
Michael Spector 2019-11-18 12:15:15 +02:00
Родитель 8102e87867
Коммит 576beb446f
2 изменённых файлов: 37 добавлений и 4 удалений

4
Cargo.lock сгенерированный
Просмотреть файл

@ -37,7 +37,7 @@ dependencies = [
[[package]] [[package]]
name = "arrow" name = "arrow"
version = "0.14.0-SNAPSHOT" version = "0.14.0-SNAPSHOT"
source = "git+https://github.com/rzheka/arrow.git?branch=dev#81a421d2dbd4909eb941289ce7efa7b6ac95a29d" source = "git+https://github.com/rzheka/arrow.git?branch=dev#6b48f2284be5ff2f8b8541e40d21947f089ce0c9"
dependencies = [ dependencies = [
"chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"csv 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)", "csv 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)",
@ -530,7 +530,7 @@ dependencies = [
[[package]] [[package]]
name = "parquet" name = "parquet"
version = "0.14.0-SNAPSHOT" version = "0.14.0-SNAPSHOT"
source = "git+https://github.com/rzheka/arrow.git?branch=dev#81a421d2dbd4909eb941289ce7efa7b6ac95a29d" source = "git+https://github.com/rzheka/arrow.git?branch=dev#6b48f2284be5ff2f8b8541e40d21947f089ce0c9"
dependencies = [ dependencies = [
"arrow 0.14.0-SNAPSHOT (git+https://github.com/rzheka/arrow.git?branch=dev)", "arrow 0.14.0-SNAPSHOT (git+https://github.com/rzheka/arrow.git?branch=dev)",
"brotli 2.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "brotli 2.5.1 (registry+https://github.com/rust-lang/crates.io-index)",

Просмотреть файл

@ -3,8 +3,10 @@ use num_bigint::{BigInt, Sign};
use parquet::data_type::Decimal; use parquet::data_type::Decimal;
use parquet::file::reader::{FileReader, SerializedFileReader}; use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::record::{FieldType, List, ListAccessor, Map, MapAccessor, Row, RowAccessor}; use parquet::record::{FieldType, List, ListAccessor, Map, MapAccessor, Row, RowAccessor};
use parquet::schema::types::{Type as SchemaType};
use parquet::schema::printer::{print_file_metadata, print_parquet_metadata}; use parquet::schema::printer::{print_file_metadata, print_parquet_metadata};
use serde_json::{Number, Value}; use serde_json::{Number, Value};
use std::collections::HashMap;
use std::error::Error; use std::error::Error;
use std::fs::File; use std::fs::File;
use std::io::{self, BufWriter, Write}; use std::io::{self, BufWriter, Write};
@ -60,6 +62,14 @@ fn main() {
.takes_value(true) .takes_value(true)
.required(false), .required(false),
) )
.arg(
Arg::with_name("columns")
.short("c")
.long("columns")
.help("Comma separated top-level columns to select")
.takes_value(true)
.required(false),
)
.arg( .arg(
Arg::with_name("output") Arg::with_name("output")
.short("o") .short("o")
@ -100,11 +110,14 @@ fn main() {
_ => TimestampRendering::IsoStr, _ => TimestampRendering::IsoStr,
}; };
let columns: &str = matches.value_of("columns").unwrap_or("");
let settings = Settings { let settings = Settings {
omit_nulls: matches.is_present("omit-nulls") || matches.is_present("prune"), omit_nulls: matches.is_present("omit-nulls") || matches.is_present("prune"),
omit_empty_bags: matches.is_present("omit-empty-bags") || matches.is_present("prune"), omit_empty_bags: matches.is_present("omit-empty-bags") || matches.is_present("prune"),
omit_empty_lists: matches.is_present("omit-empty-lists") || matches.is_present("prune"), omit_empty_lists: matches.is_present("omit-empty-lists") || matches.is_present("prune"),
timestamp_rendering, timestamp_rendering: timestamp_rendering,
columns: if columns.is_empty() { None } else { Some(columns.split(",").map(|s| s.to_string()).collect()) }
}; };
let res = if matches.is_present("schema") { let res = if matches.is_present("schema") {
@ -128,6 +141,7 @@ struct Settings {
omit_empty_bags: bool, omit_empty_bags: bool,
omit_empty_lists: bool, omit_empty_lists: bool,
timestamp_rendering: TimestampRendering, timestamp_rendering: TimestampRendering,
columns: Option<Vec<String>>
} }
#[derive(Debug, Copy, Clone, Eq, PartialEq)] #[derive(Debug, Copy, Clone, Eq, PartialEq)]
@ -152,7 +166,10 @@ fn convert(settings: &Settings, input: &str, output: &str) -> Result<(), Box<dyn
)) as Box<dyn Write> )) as Box<dyn Write>
}; };
let mut iter = reader.get_row_iter(None)?; let schema = settings.columns.as_ref()
.map(|c| get_projected_schema(&reader, &c).unwrap());
let mut iter = reader.get_row_iter(schema)?;
while let Some(record) = iter.next() { while let Some(record) = iter.next() {
let value = top_level_row_to_value(settings, &record)?; let value = top_level_row_to_value(settings, &record)?;
writeln!(writer, "{}", serde_json::to_string(&value)?)?; writeln!(writer, "{}", serde_json::to_string(&value)?)?;
@ -179,6 +196,22 @@ fn print_schema(input: &str) -> Result<(), Box<dyn Error>> {
Ok(()) Ok(())
} }
fn get_projected_schema(reader: &SerializedFileReader<File>, columns: &Vec<String>) -> Result<SchemaType, Box<dyn Error>> {
let file_meta = reader.metadata().file_metadata();
let mut schema_fields = HashMap::new();
for field in file_meta.schema().get_fields().iter() {
schema_fields.insert(field.name().to_owned(), field);
}
// TODO: return error if non-existent column specified
let mut projected_fields = columns.iter()
.map(|c| schema_fields[c].clone())
.collect();
Ok(SchemaType::group_type_builder("schema")
.with_fields(&mut projected_fields)
.build()
.unwrap())
}
macro_rules! element_to_value { macro_rules! element_to_value {
($ft:expr, $obj:ident, $i:ident, $settings:ident) => { ($ft:expr, $obj:ident, $i:ident, $settings:ident) => {
match $ft { match $ft {