From 45e6d85e5ee84a2520eaa4cc80597a1379d178c9 Mon Sep 17 00:00:00 2001 From: Michael Spector Date: Wed, 11 Nov 2020 16:15:14 +0200 Subject: [PATCH] Add support for CSV output format. When `--csv` argument is used, the utility produces CSV format. Nested top level elements (arrays, objects) are formatted as JSON. Using the option alone saves some time already: ``` PS C:\Users\mispecto\Projects\azure-kusto-parquet-conv> Measure-Command { .\target\debug\pq2json.exe ..\..\Downloads\20200809201109710_53ea103e_e9fabfe6_001.parquet > 1 } Days : 0 Hours : 0 Minutes : 0 Seconds : 11 Milliseconds : 361 Ticks : 113610039 TotalDays : 0.000131493100694444 TotalHours : 0.00315583441666667 TotalMinutes : 0.189350065 TotalSeconds : 11.3610039 TotalMilliseconds : 11361.0039 PS C:\Users\mispecto\Projects\azure-kusto-parquet-conv> Measure-Command { .\target\debug\pq2json.exe --csv ..\..\Downloads\20200809201109710_53ea103e_e9fabfe6_001.parquet > 1 } Days : 0 Hours : 0 Minutes : 0 Seconds : 5 Milliseconds : 624 Ticks : 56247514 TotalDays : 6.51012893518518E-05 TotalHours : 0.00156243094444444 TotalMinutes : 0.0937458566666667 TotalSeconds : 5.6247514 TotalMilliseconds : 5624.7514 ``` --- Cargo.lock | 9 +- Package.nuspec | 9 +- pq2json/Cargo.toml | 1 + pq2json/src/converter.rs | 300 +++++++++++++++++++++++ pq2json/src/main.rs | 518 +++++++++++---------------------------- pq2json/src/schema.rs | 31 +++ pq2json/src/settings.rs | 16 ++ 7 files changed, 495 insertions(+), 389 deletions(-) create mode 100644 pq2json/src/converter.rs create mode 100644 pq2json/src/schema.rs create mode 100644 pq2json/src/settings.rs diff --git a/Cargo.lock b/Cargo.lock index 490d6a3..47771ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,7 +56,7 @@ source = "git+https://github.com/rzheka/arrow.git?branch=dev#31f4f2342be32281716 dependencies = [ "arrow-flight 0.18.0-SNAPSHOT (git+https://github.com/rzheka/arrow.git?branch=dev)", "chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "csv 1.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "csv 1.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "flatbuffers 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "indexmap 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -315,7 +315,7 @@ dependencies = [ [[package]] name = "csv" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bstr 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", @@ -915,6 +915,7 @@ version = "0.1.0" dependencies = [ "chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", + "csv 1.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "num-bigint 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "parquet 0.18.0-SNAPSHOT (git+https://github.com/rzheka/arrow.git?branch=dev)", @@ -928,7 +929,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "atty 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", - "csv 1.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "csv 1.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "encode_unicode 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "term 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1871,7 +1872,7 @@ dependencies = [ "checksum crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb" "checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1" "checksum crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" -"checksum csv 1.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "00affe7f6ab566df61b4be3ce8cf16bc2576bca0963ceb0955e45d514bf9a279" +"checksum csv 1.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "fc4666154fd004af3fd6f1da2e81a96fd5a81927fe8ddb6ecc79e2aa6e138b54" "checksum csv-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" "checksum dirs 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3fd78930633bd1c6e35c4b42b1df7b0cbc6bc191146e512bb3bedf243fcc3901" "checksum either 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "5527cfe0d098f36e3f8839852688e63c8fff1c90b2b405aef730615f9a7bcf7b" diff --git a/Package.nuspec b/Package.nuspec index 555862f..c5db13f 100644 --- a/Package.nuspec +++ b/Package.nuspec @@ -2,18 +2,17 @@ pq2json - 0.1.4 + 0.1.5 Evgeney Ryzhyk Evgeney Ryzhyk MIT https://github.com/Azure/azure-kusto-parquet-conv false Parquet to JSON (line delimited) converter tool. - Fixed crash when reading null byte array field. - Copyright 2019 + Support output to pseudo-CSV format. + Copyright 2020 - - + diff --git a/pq2json/Cargo.toml b/pq2json/Cargo.toml index b3642f9..15f354d 100644 --- a/pq2json/Cargo.toml +++ b/pq2json/Cargo.toml @@ -12,3 +12,4 @@ serde = "1" serde_json = "1" num-bigint = "0.2" chrono = "0.4" +csv = "1.1.4" \ No newline at end of file diff --git a/pq2json/src/converter.rs b/pq2json/src/converter.rs new file mode 100644 index 0000000..cfc5b24 --- /dev/null +++ b/pq2json/src/converter.rs @@ -0,0 +1,300 @@ +use std::collections::HashMap; +use std::error::Error; +use std::fs::File; +use std::io::{self, BufWriter, Write}; +use std::path::Path; + +use num_bigint::{BigInt, Sign}; +use parquet::data_type::Decimal; +use parquet::file::reader::{FileReader, SerializedFileReader}; +use parquet::record::{FieldType, List, ListAccessor, Map, MapAccessor, Row, RowAccessor}; +use parquet::schema::types::Type as SchemaType; +use serde_json::{Number, Value}; + +use crate::settings::{Settings, TimestampRendering}; +use parquet::record::reader::RowIter; + +const WRITER_BUF_CAP: usize = 256 * 1024; + +/// Writes Parquet file as text, either as JSONL (every line contains single JSON record) +/// or as CSV (where nested structures are formatted as JSON strings). +/// +/// Arguments: +/// +/// * `settings` - Converter settings +/// * `input_file` - Parquet file path +/// * `output_file` - Optional output file path (if not provided - output is written to STDOUT). +/// +pub fn convert( + settings: &Settings, + input_file: &str, + output_file: Option<&str>, +) -> Result<(), Box> { + let file = File::open(&Path::new(input_file))?; + let reader = SerializedFileReader::new(file)?; + + let writer = match output_file { + Some(output_file) => Box::new(BufWriter::with_capacity( + WRITER_BUF_CAP, + File::create(&Path::new(output_file))?, + )) as Box, + None => Box::new(BufWriter::with_capacity(WRITER_BUF_CAP, io::stdout())) as Box, + }; + + let schema = settings + .columns + .as_ref() + .map(|c| projected_schema(&reader, &c).unwrap()); + + let rows = reader.get_row_iter(schema)?; + + if settings.csv { + top_level_rows_to_csv(&settings, rows, writer) + } else { + top_level_rows_to_json(&settings, rows, writer) + } +} + +fn projected_schema( + reader: &SerializedFileReader, + columns: &Vec, +) -> Result> { + let file_meta = reader.metadata().file_metadata(); + let mut schema_fields = HashMap::new(); + for field in file_meta.schema().get_fields().iter() { + schema_fields.insert(field.name().to_owned(), field); + } + + let mut projected_fields = columns + .iter() + .map(|c| { + schema_fields + .get_mut(c) + .expect(format!("column '{}' doesn't exist", c).as_str()) + .clone() + }) + .collect(); + + Ok( + SchemaType::group_type_builder(&file_meta.schema().get_basic_info().name()) + .with_fields(&mut projected_fields) + .build() + .unwrap(), + ) +} + +macro_rules! element_to_value { + ($ft:expr, $obj:ident, $i:ident, $settings:ident) => { + match $ft { + FieldType::Null => Value::Null, + FieldType::Bool => Value::Bool($obj.get_bool($i)?), + FieldType::Byte => Value::Number($obj.get_byte($i)?.into()), + FieldType::Short => Value::Number($obj.get_short($i)?.into()), + FieldType::Int => Value::Number($obj.get_int($i)?.into()), + FieldType::Long => Value::Number($obj.get_long($i)?.into()), + FieldType::UByte => Value::Number($obj.get_ubyte($i)?.into()), + FieldType::UShort => Value::Number($obj.get_ushort($i)?.into()), + FieldType::UInt => Value::Number($obj.get_uint($i)?.into()), + FieldType::ULong => Value::Number($obj.get_ulong($i)?.into()), + FieldType::Float => float_to_value($obj.get_float($i)? as f64), + FieldType::Double => float_to_value($obj.get_double($i)?), + FieldType::Decimal => Value::String(decimal_to_string($obj.get_decimal($i)?)), + FieldType::Str => Value::String($obj.get_string($i)?.to_string()), + FieldType::Bytes => bytes_to_value($obj.get_bytes($i)?.data()), + FieldType::Date => Value::Number($obj.get_date($i)?.into()), + FieldType::TimestampMillis => { + timestamp_to_value($settings, $obj.get_timestamp_millis($i)?)? + } + FieldType::TimestampMicros => timestamp_to_value( + $settings, + $obj.get_timestamp_micros($i).map(|ts| ts / 1000)?, + )?, + FieldType::Group => row_to_value($settings, $obj.get_group($i)?)?, + FieldType::List => list_to_value($settings, $obj.get_list($i)?)?, + FieldType::Map => map_to_value($settings, $obj.get_map($i)?)?, + } + }; +} + +fn top_level_rows_to_json( + settings: &Settings, + mut rows: RowIter, + mut writer: Box, +) -> Result<(), Box> { + while let Some(row) = rows.next() { + let value = row_to_value(settings, &row)?; + let value = if value.is_null() { + Value::Object(serde_json::Map::default()) + } else { + value + }; + writeln!(writer, "{}", serde_json::to_string(&value)?)?; + } + Ok(()) +} + +fn top_level_rows_to_csv( + settings: &Settings, + mut rows: RowIter, + mut writer: Box, +) -> Result<(), Box> { + while let Some(row) = rows.next() { + let mut csv_writer = csv::Writer::from_writer(vec![]); + for i in 0..row.len() { + let field_type = row.get_field_type(i); + let value = element_to_value!(field_type, row, i, settings); + csv_writer.write_field(value_to_csv(&value))?; + } + writeln!(writer, "{}", String::from_utf8(csv_writer.into_inner()?)?)?; + } + Ok(()) +} + +fn value_to_csv(value: &Value) -> String { + match value { + Value::Null => String::new(), + Value::Bool(v) => v.to_string(), + Value::Number(ref v) => { + if v.is_f64() { + format!("{:e}", v.as_f64().unwrap()) + } else if v.is_u64() { + format!("{}", v.as_u64().unwrap()) + } else { + format!("{}", v.as_i64().unwrap()) + } + } + Value::String(ref v) => v.to_owned(), + Value::Array(ref v) => serde_json::to_string(&v).unwrap(), + Value::Object(ref v) => serde_json::to_string(&v).unwrap(), + } +} + +fn row_to_value(settings: &Settings, row: &Row) -> Result> { + let mut map = serde_json::Map::with_capacity(row.len()); + for i in 0..row.len() { + let name = row.get_field_name(i); + let field_type = row.get_field_type(i); + let value = element_to_value!(field_type, row, i, settings); + if !(settings.omit_nulls && value.is_null()) { + map.insert(name.to_string(), value); + } + } + + if settings.omit_empty_bags && map.is_empty() { + Ok(Value::Null) + } else { + Ok(Value::Object(map)) + } +} + +fn list_to_value(settings: &Settings, list: &List) -> Result> { + let mut arr = Vec::::with_capacity(list.len()); + for i in 0..list.len() { + let elt_ty = list.get_element_type(i); + let value = element_to_value!(elt_ty, list, i, settings); + arr.push(value); + } + + if settings.omit_empty_lists && arr.is_empty() { + Ok(Value::Null) + } else { + Ok(Value::Array(arr)) + } +} + +fn map_to_value(settings: &Settings, map: &Map) -> Result> { + let mut jsmap = serde_json::Map::with_capacity(map.len()); + let keys = map.get_keys(); + let values = map.get_values(); + for i in 0..map.len() { + let key_ty = keys.get_element_type(i); + let key = match key_ty { + FieldType::Str => keys.get_string(i)?.to_string(), + // TODO: return error here + _ => panic!("Non-string key"), + }; + + let val_ty = values.get_element_type(i); + let value = element_to_value!(val_ty, values, i, settings); + if !(settings.omit_nulls && value.is_null()) { + jsmap.insert(key, value); + } + } + + if settings.omit_empty_bags && jsmap.is_empty() { + Ok(Value::Null) + } else { + Ok(Value::Object(jsmap)) + } +} + +fn bytes_to_value(bytes: &[u8]) -> Value { + let nums = bytes + .iter() + .map(|&b| Value::Number(b.into())) + .collect::>(); + Value::Array(nums) +} + +fn float_to_value(f: f64) -> Value { + Number::from_f64(f) + .map(|n| Value::Number(n)) + .unwrap_or_else(|| Value::Null) +} + +const TICKS_TILL_UNIX_TIME: u64 = 621355968000000000u64; + +fn timestamp_to_value(settings: &Settings, ts: u64) -> Result> { + match settings.timestamp_rendering { + TimestampRendering::Ticks => { + let ticks = ts + .checked_mul(10000) + .and_then(|t| t.checked_add(TICKS_TILL_UNIX_TIME)); + let v = ticks + .map(|t| Value::Number(t.into())) + .unwrap_or(Value::Null); + Ok(v) + } + TimestampRendering::IsoStr => { + let seconds = (ts / 1000) as i64; + let nanos = ((ts % 1000) * 1000000) as u32; + let datetime = + if let Some(dt) = chrono::NaiveDateTime::from_timestamp_opt(seconds, nanos) { + dt + } else { + return Ok(Value::Null); + }; + let iso_str = datetime.format("%Y-%m-%dT%H:%M:%S.%6fZ").to_string(); + Ok(Value::String(iso_str)) + } + TimestampRendering::UnixMs => Ok(Value::Number(ts.into())), + } +} + +fn decimal_to_string(decimal: &Decimal) -> String { + assert!(decimal.scale() >= 0 && decimal.precision() > decimal.scale()); + + // Specify as signed bytes to resolve sign as part of conversion. + let num = BigInt::from_signed_bytes_be(decimal.data()); + + // Offset of the first digit in a string. + let negative = if num.sign() == Sign::Minus { 1 } else { 0 }; + let mut num_str = num.to_string(); + let mut point = num_str.len() as i32 - decimal.scale() - negative; + + // Convert to string form without scientific notation. + if point <= 0 { + // Zeros need to be prepended to the unscaled value. + while point < 0 { + num_str.insert(negative as usize, '0'); + point += 1; + } + num_str.insert_str(negative as usize, "0."); + } else { + // No zeroes need to be prepended to the unscaled value, simply insert decimal + // point. + num_str.insert((point + negative) as usize, '.'); + } + + num_str +} diff --git a/pq2json/src/main.rs b/pq2json/src/main.rs index 5ac97e5..568217a 100644 --- a/pq2json/src/main.rs +++ b/pq2json/src/main.rs @@ -1,380 +1,138 @@ -use clap::{App, Arg}; -use num_bigint::{BigInt, Sign}; -use parquet::data_type::Decimal; -use parquet::file::reader::{FileReader, SerializedFileReader}; -use parquet::record::{FieldType, List, ListAccessor, Map, MapAccessor, Row, RowAccessor}; -use parquet::schema::types::{Type as SchemaType}; -use parquet::schema::printer::{print_file_metadata, print_parquet_metadata}; -use serde_json::{Number, Value}; -use std::collections::HashMap; -use std::error::Error; -use std::fs::File; -use std::io::{self, BufWriter, Write}; -use std::path::Path; - -fn main() { - let matches = App::new("pq2json") - .version("0.1") - .arg( - Arg::with_name("omit-nulls") - .long("omit-nulls") - .help("Omit bag entries with null values") - .takes_value(false) - .required(false), - ) - .arg( - Arg::with_name("omit-empty-bags") - .long("omit-empty-bags") - .help("Omit empty property bags") - .takes_value(false) - .required(false), - ) - .arg( - Arg::with_name("omit-empty-lists") - .long("omit-empty-lists") - .help("Omit empty lists") - .takes_value(false) - .required(false), - ) - .arg( - Arg::with_name("prune") - .short("p") - .long("prune") - .help( - "Omit nulls, empty bags and empty lists \ - (equivalent to a combination of the three --omit-... options)", - ) - .takes_value(false) - .required(false), - ) - .arg( - Arg::with_name("timestamp") - .short("t") - .long("timestamp") - .possible_values(&["isostr", "ticks", "unixms"]) - .default_value("isostr") - .help( - "Timestamp rendering option. Either \ - ticks (100ns ticks since 01-01-01), \ - isostr (ISO8601 string) \ - or unixms (milliseconds since Unix epoch)", - ) - .takes_value(true) - .required(false), - ) - .arg( - Arg::with_name("columns") - .short("c") - .long("columns") - .help("Comma separated top-level columns to select") - .takes_value(true) - .required(false), - ) - .arg( - Arg::with_name("output") - .short("o") - .long("output") - .value_name("OUT_FILE") - .help("Output file") - .takes_value(true) - .required(false), - ) - .arg( - Arg::with_name("schema") - .long("schema") - .help("Print schema") - .takes_value(false) - .required(false), - ) - .arg( - Arg::with_name("INPUT") - .help("Input file to use") - .required(true) - .index(1), - ) - .arg( - Arg::with_name("v") - .short("v") - .multiple(true) - .help("Sets the level of verbosity"), - ) - .get_matches(); - - let input = matches.value_of("INPUT").expect("INPUT must be provided"); - let output = matches.value_of("OUT_FILE").unwrap_or(""); - - let timestamp_rendering = match matches.value_of("timestamp").unwrap_or("ticks") { - "ticks" => TimestampRendering::Ticks, - "isostr" => TimestampRendering::IsoStr, - "unixms" => TimestampRendering::UnixMs, - _ => TimestampRendering::IsoStr, - }; - - let columns: &str = matches.value_of("columns").unwrap_or(""); - - let settings = Settings { - omit_nulls: matches.is_present("omit-nulls") || matches.is_present("prune"), - omit_empty_bags: matches.is_present("omit-empty-bags") || matches.is_present("prune"), - omit_empty_lists: matches.is_present("omit-empty-lists") || matches.is_present("prune"), - timestamp_rendering: timestamp_rendering, - columns: if columns.is_empty() { None } else { Some(columns.split(",").map(|s| s.to_string()).collect()) } - }; - - let res = if matches.is_present("schema") { - print_schema(input) - } else { - convert(&settings, input, output) - }; - - match res { - Ok(()) => (), - Err(e) => { - eprintln!("ERROR: {:?}", e); - std::process::exit(-1); - } - } -} - -#[derive(Debug, Clone)] -struct Settings { - omit_nulls: bool, - omit_empty_bags: bool, - omit_empty_lists: bool, - timestamp_rendering: TimestampRendering, - columns: Option> -} - -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -enum TimestampRendering { - Ticks, - IsoStr, - UnixMs, -} - -const WRITER_BUF_CAP: usize = 256 * 1024; - -fn convert(settings: &Settings, input: &str, output: &str) -> Result<(), Box> { - let file = File::open(&Path::new(input))?; - let reader = SerializedFileReader::new(file)?; - - let mut writer = if output.is_empty() { - Box::new(BufWriter::with_capacity(WRITER_BUF_CAP, io::stdout())) as Box - } else { - Box::new(BufWriter::with_capacity( - WRITER_BUF_CAP, - File::create(&Path::new(output))?, - )) as Box - }; - - let schema = settings.columns.as_ref() - .map(|c| get_projected_schema(&reader, &c).unwrap()); - - let mut iter = reader.get_row_iter(schema)?; - while let Some(record) = iter.next() { - let value = top_level_row_to_value(settings, &record)?; - writeln!(writer, "{}", serde_json::to_string(&value)?)?; - } - Ok(()) -} - -fn print_schema(input: &str) -> Result<(), Box> { - let file = File::open(&Path::new(input))?; - let reader = SerializedFileReader::new(file)?; - let meta = reader.metadata(); - let mut output = Vec::new(); - print_parquet_metadata(&mut output, &meta); - println!("\n\nParquet metadata"); - println!("================================================="); - println!("{}", String::from_utf8(output)?); - - let mut output = Vec::new(); - let file_meta = reader.metadata().file_metadata(); - print_file_metadata(&mut output, &file_meta); - println!("\n\nFile metadata"); - println!("================================================="); - println!("{}", String::from_utf8(output)?); - Ok(()) -} - -fn get_projected_schema(reader: &SerializedFileReader, columns: &Vec) -> Result> { - let file_meta = reader.metadata().file_metadata(); - let mut schema_fields = HashMap::new(); - for field in file_meta.schema().get_fields().iter() { - schema_fields.insert(field.name().to_owned(), field); - } - // TODO: return error if non-existent column specified - let mut projected_fields = columns.iter() - .map(|c| schema_fields[c].clone()) - .collect(); - Ok(SchemaType::group_type_builder(&file_meta.schema().get_basic_info().name()) - .with_fields(&mut projected_fields) - .build() - .unwrap()) -} - -macro_rules! element_to_value { - ($ft:expr, $obj:ident, $i:ident, $settings:ident) => { - match $ft { - FieldType::Null => Value::Null, - FieldType::Bool => Value::Bool($obj.get_bool($i)?), - FieldType::Byte => Value::Number($obj.get_byte($i)?.into()), - FieldType::Short => Value::Number($obj.get_short($i)?.into()), - FieldType::Int => Value::Number($obj.get_int($i)?.into()), - FieldType::Long => Value::Number($obj.get_long($i)?.into()), - FieldType::UByte => Value::Number($obj.get_ubyte($i)?.into()), - FieldType::UShort => Value::Number($obj.get_ushort($i)?.into()), - FieldType::UInt => Value::Number($obj.get_uint($i)?.into()), - FieldType::ULong => Value::Number($obj.get_ulong($i)?.into()), - FieldType::Float => float_to_value($obj.get_float($i)? as f64), - FieldType::Double => float_to_value($obj.get_double($i)?), - FieldType::Decimal => Value::String(decimal_to_string($obj.get_decimal($i)?)), - FieldType::Str => Value::String($obj.get_string($i)?.to_string()), - FieldType::Bytes => bytes_to_value($obj.get_bytes($i)?.data()), - FieldType::Date => Value::Number($obj.get_date($i)?.into()), - FieldType::TimestampMillis => timestamp_to_value($settings, $obj.get_timestamp_millis($i)?)?, - FieldType::TimestampMicros => timestamp_to_value($settings, $obj.get_timestamp_micros($i).map(|ts| ts / 1000)?)?, - FieldType::Group => row_to_value($settings, $obj.get_group($i)?)?, - FieldType::List => list_to_value($settings, $obj.get_list($i)?)?, - FieldType::Map => map_to_value($settings, $obj.get_map($i)?)?, - } - }; -} - -fn top_level_row_to_value(settings: &Settings, row: &Row) -> Result> { - let value = row_to_value(settings, row)?; - if value.is_null() { - Ok(Value::Object(serde_json::Map::default())) - } else { - Ok(value) - } -} - -fn row_to_value(settings: &Settings, row: &Row) -> Result> { - let mut map = serde_json::Map::with_capacity(row.len()); - for i in 0..row.len() { - let name = row.get_field_name(i); - let field_type = row.get_field_type(i); - let value = element_to_value!(field_type, row, i, settings); - if !(settings.omit_nulls && value.is_null()) { - map.insert(name.to_string(), value); - } - } - - if settings.omit_empty_bags && map.is_empty() { - Ok(Value::Null) - } else { - Ok(Value::Object(map)) - } -} - -fn list_to_value(settings: &Settings, list: &List) -> Result> { - let mut arr = Vec::::with_capacity(list.len()); - for i in 0..list.len() { - let elt_ty = list.get_element_type(i); - let value = element_to_value!(elt_ty, list, i, settings); - arr.push(value); - } - - if settings.omit_empty_lists && arr.is_empty() { - Ok(Value::Null) - } else { - Ok(Value::Array(arr)) - } -} - -fn map_to_value(settings: &Settings, map: &Map) -> Result> { - let mut jsmap = serde_json::Map::with_capacity(map.len()); - let keys = map.get_keys(); - let values = map.get_values(); - for i in 0..map.len() { - let key_ty = keys.get_element_type(i); - let key = match key_ty { - FieldType::Str => keys.get_string(i)?.to_string(), - // TODO: return error here - _ => panic!("Non-string key"), - }; - - let val_ty = values.get_element_type(i); - let value = element_to_value!(val_ty, values, i, settings); - if !(settings.omit_nulls && value.is_null()) { - jsmap.insert(key, value); - } - } - - if settings.omit_empty_bags && jsmap.is_empty() { - Ok(Value::Null) - } else { - Ok(Value::Object(jsmap)) - } -} - -fn bytes_to_value(bytes: &[u8]) -> Value { - let nums = bytes - .iter() - .map(|&b| Value::Number(b.into())) - .collect::>(); - Value::Array(nums) -} - -fn float_to_value(f: f64) -> Value { - Number::from_f64(f) - .map(|n| Value::Number(n)) - .unwrap_or_else(|| Value::Null) -} - -const TICKS_TILL_UNIX_TIME: u64 = 621355968000000000u64; - -fn timestamp_to_value(settings: &Settings, ts: u64) -> Result> { - match settings.timestamp_rendering { - TimestampRendering::Ticks => { - let ticks = ts - .checked_mul(10000) - .and_then(|t| t.checked_add(TICKS_TILL_UNIX_TIME)); - let v = ticks - .map(|t| Value::Number(t.into())) - .unwrap_or(Value::Null); - Ok(v) - } - TimestampRendering::IsoStr => { - let seconds = (ts / 1000) as i64; - let nanos = ((ts % 1000) * 1000000) as u32; - let datetime = - if let Some(dt) = chrono::NaiveDateTime::from_timestamp_opt(seconds, nanos) { - dt - } else { - return Ok(Value::Null); - }; - let iso_str = datetime.format("%Y-%m-%dT%H:%M:%S.%6fZ").to_string(); - Ok(Value::String(iso_str)) - } - TimestampRendering::UnixMs => Ok(Value::Number(ts.into())), - } -} - -fn decimal_to_string(decimal: &Decimal) -> String { - assert!(decimal.scale() >= 0 && decimal.precision() > decimal.scale()); - - // Specify as signed bytes to resolve sign as part of conversion. - let num = BigInt::from_signed_bytes_be(decimal.data()); - - // Offset of the first digit in a string. - let negative = if num.sign() == Sign::Minus { 1 } else { 0 }; - let mut num_str = num.to_string(); - let mut point = num_str.len() as i32 - decimal.scale() - negative; - - // Convert to string form without scientific notation. - if point <= 0 { - // Zeros need to be prepended to the unscaled value. - while point < 0 { - num_str.insert(negative as usize, '0'); - point += 1; - } - num_str.insert_str(negative as usize, "0."); - } else { - // No zeroes need to be prepended to the unscaled value, simply insert decimal - // point. - num_str.insert((point + negative) as usize, '.'); - } - - num_str -} +use clap::{App, Arg}; + +use crate::settings::{Settings, TimestampRendering}; + +mod converter; +mod schema; +mod settings; + +fn main() { + let matches = App::new("pq2json") + .version("0.1") + .arg( + Arg::with_name("omit-nulls") + .long("omit-nulls") + .help("Omit bag entries with null values") + .takes_value(false) + .required(false), + ) + .arg( + Arg::with_name("omit-empty-bags") + .long("omit-empty-bags") + .help("Omit empty property bags") + .takes_value(false) + .required(false), + ) + .arg( + Arg::with_name("omit-empty-lists") + .long("omit-empty-lists") + .help("Omit empty lists") + .takes_value(false) + .required(false), + ) + .arg( + Arg::with_name("prune") + .short("p") + .long("prune") + .help( + "Omit nulls, empty bags and empty lists \ + (equivalent to a combination of the three --omit-... options)", + ) + .takes_value(false) + .required(false), + ) + .arg( + Arg::with_name("csv") + .long("csv") + .help("Output root level fields in CSV format") + .takes_value(false) + .required(false), + ) + .arg( + Arg::with_name("timestamp") + .short("t") + .long("timestamp") + .possible_values(&["isostr", "ticks", "unixms"]) + .default_value("isostr") + .help( + "Timestamp rendering option. Either \ + ticks (100ns ticks since 01-01-01), \ + isostr (ISO8601 string) \ + or unixms (milliseconds since Unix epoch)", + ) + .takes_value(true) + .required(false), + ) + .arg( + Arg::with_name("columns") + .short("c") + .long("columns") + .help("Comma separated top-level columns to select") + .takes_value(true) + .required(false), + ) + .arg( + Arg::with_name("output") + .short("o") + .long("output") + .value_name("OUT_FILE") + .help("Output file") + .takes_value(true) + .required(false), + ) + .arg( + Arg::with_name("schema") + .long("schema") + .help("Print schema") + .takes_value(false) + .required(false), + ) + .arg( + Arg::with_name("INPUT") + .help("Input file to use") + .required(true) + .index(1), + ) + .arg( + Arg::with_name("v") + .short("v") + .multiple(true) + .help("Sets the level of verbosity"), + ) + .get_matches(); + + let input = matches.value_of("INPUT").expect("INPUT must be provided"); + let output = matches.value_of("OUT_FILE"); + + let timestamp_rendering = match matches.value_of("timestamp").unwrap_or("ticks") { + "ticks" => TimestampRendering::Ticks, + "isostr" => TimestampRendering::IsoStr, + "unixms" => TimestampRendering::UnixMs, + _ => TimestampRendering::IsoStr, + }; + + let settings = Settings { + omit_nulls: matches.is_present("omit-nulls") || matches.is_present("prune"), + omit_empty_bags: matches.is_present("omit-empty-bags") || matches.is_present("prune"), + timestamp_rendering, + omit_empty_lists: matches.is_present("omit-empty-lists") || matches.is_present("prune"), + columns: matches + .value_of("columns") + .map(|columns| columns.split(",").map(|s| s.to_string()).collect()), + csv: matches.is_present("csv"), + }; + + let res = if matches.is_present("schema") { + schema::print_schema(input) + } else { + converter::convert(&settings, input, output) + }; + + match res { + Ok(()) => (), + Err(e) => { + eprintln!("ERROR: {:?}", e); + std::process::exit(-1); + } + } +} diff --git a/pq2json/src/schema.rs b/pq2json/src/schema.rs new file mode 100644 index 0000000..3a8e4ee --- /dev/null +++ b/pq2json/src/schema.rs @@ -0,0 +1,31 @@ +use std::error::Error; +use std::fs::File; +use std::path::Path; + +use parquet::file::reader::{FileReader, SerializedFileReader}; +use parquet::schema::printer::{print_file_metadata, print_parquet_metadata}; + +/// Prints Parquet file schema information +/// +/// Arguments: +/// +/// * `input_file` - Parquet file path +/// +pub fn print_schema(input_file: &str) -> Result<(), Box> { + let file = File::open(&Path::new(input_file))?; + let reader = SerializedFileReader::new(file)?; + let meta = reader.metadata(); + let mut output = Vec::new(); + print_parquet_metadata(&mut output, &meta); + println!("\n\nParquet metadata"); + println!("================================================="); + println!("{}", String::from_utf8(output)?); + + let mut output = Vec::new(); + let file_meta = reader.metadata().file_metadata(); + print_file_metadata(&mut output, &file_meta); + println!("\n\nFile metadata"); + println!("================================================="); + println!("{}", String::from_utf8(output)?); + Ok(()) +} diff --git a/pq2json/src/settings.rs b/pq2json/src/settings.rs new file mode 100644 index 0000000..2968882 --- /dev/null +++ b/pq2json/src/settings.rs @@ -0,0 +1,16 @@ +#[derive(Debug, Clone)] +pub struct Settings { + pub omit_nulls: bool, + pub omit_empty_bags: bool, + pub omit_empty_lists: bool, + pub timestamp_rendering: TimestampRendering, + pub columns: Option>, + pub csv: bool, +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum TimestampRendering { + Ticks, + IsoStr, + UnixMs, +}