Merge pull request #8 from Azure/csv

Support CSV output format
This commit is contained in:
Michael Spector 2020-11-15 07:07:15 +00:00 коммит произвёл GitHub
Родитель d6d3fb9fc7 64b1046380
Коммит 38159e93ba
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 514 добавлений и 389 удалений

10
Cargo.lock сгенерированный
Просмотреть файл

@ -56,7 +56,7 @@ source = "git+https://github.com/rzheka/arrow.git?branch=dev#31f4f2342be32281716
dependencies = [
"arrow-flight 0.18.0-SNAPSHOT (git+https://github.com/rzheka/arrow.git?branch=dev)",
"chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"csv 1.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"csv 1.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"flatbuffers 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"indexmap 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -315,7 +315,7 @@ dependencies = [
[[package]]
name = "csv"
version = "1.1.3"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bstr 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)",
@ -915,9 +915,11 @@ version = "0.1.0"
dependencies = [
"chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
"csv 1.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"num-bigint 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"parquet 0.18.0-SNAPSHOT (git+https://github.com/rzheka/arrow.git?branch=dev)",
"ryu 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.93 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -928,7 +930,7 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"atty 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"csv 1.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"csv 1.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"encode_unicode 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"term 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1871,7 +1873,7 @@ dependencies = [
"checksum crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb"
"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
"checksum crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
"checksum csv 1.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "00affe7f6ab566df61b4be3ce8cf16bc2576bca0963ceb0955e45d514bf9a279"
"checksum csv 1.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "fc4666154fd004af3fd6f1da2e81a96fd5a81927fe8ddb6ecc79e2aa6e138b54"
"checksum csv-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90"
"checksum dirs 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3fd78930633bd1c6e35c4b42b1df7b0cbc6bc191146e512bb3bedf243fcc3901"
"checksum either 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "5527cfe0d098f36e3f8839852688e63c8fff1c90b2b405aef730615f9a7bcf7b"

Просмотреть файл

@ -2,18 +2,17 @@
<package >
<metadata>
<id>pq2json</id>
<version>0.1.4</version>
<version>0.1.5</version>
<authors>Evgeney Ryzhyk</authors>
<owners>Evgeney Ryzhyk</owners>
<license type="expression">MIT</license>
<projectUrl>https://github.com/Azure/azure-kusto-parquet-conv</projectUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>Parquet to JSON (line delimited) converter tool.</description>
<releaseNotes>Fixed crash when reading null byte array field.</releaseNotes>
<copyright>Copyright 2019</copyright>
<releaseNotes>Support output to pseudo-CSV format.</releaseNotes>
<copyright>Copyright 2020</copyright>
<tags></tags>
<dependencies>
</dependencies>
<dependencies></dependencies>
</metadata>
<files>
<file src="target\release\pq2json.exe" target="tools\x64" />

Просмотреть файл

@ -12,3 +12,5 @@ serde = "1"
serde_json = "1"
num-bigint = "0.2"
chrono = "0.4"
csv = "1.1.4"
ryu = "1.0"

317
pq2json/src/converter.rs Normal file
Просмотреть файл

@ -0,0 +1,317 @@
use std::collections::HashMap;
use std::error::Error;
use std::fs::File;
use std::io::{self, BufWriter, Write};
use std::path::Path;
use num_bigint::{BigInt, Sign};
use parquet::data_type::Decimal;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::record::{FieldType, List, ListAccessor, Map, MapAccessor, Row, RowAccessor};
use parquet::schema::types::Type as SchemaType;
use serde_json::{Number, Value};
use crate::settings::{Settings, TimestampRendering};
use chrono::Duration;
use csv::Terminator;
use parquet::record::reader::RowIter;
const WRITER_BUF_CAP: usize = 256 * 1024;
/// Writes Parquet file as text, either as JSONL (every line contains single JSON record)
/// or as CSV (where nested structures are formatted as JSON strings).
///
/// Arguments:
///
/// * `settings` - Converter settings
/// * `input_file` - Parquet file path
/// * `output_file` - Optional output file path (if not provided - output is written to STDOUT).
///
pub fn convert(
settings: &Settings,
input_file: &str,
output_file: Option<&str>,
) -> Result<(), Box<dyn Error>> {
let file = File::open(&Path::new(input_file))?;
let reader = SerializedFileReader::new(file)?;
let writer = match output_file {
Some(output_file) => Box::new(BufWriter::with_capacity(
WRITER_BUF_CAP,
File::create(&Path::new(output_file))?,
)) as Box<dyn Write>,
None => Box::new(BufWriter::with_capacity(WRITER_BUF_CAP, io::stdout())) as Box<dyn Write>,
};
let schema = settings
.columns
.as_ref()
.map(|c| projected_schema(&reader, &c).unwrap());
let rows = reader.get_row_iter(schema)?;
if settings.csv {
top_level_rows_to_csv(&settings, rows, writer)
} else {
top_level_rows_to_json(&settings, rows, writer)
}
}
fn projected_schema(
reader: &SerializedFileReader<File>,
columns: &Vec<String>,
) -> Result<SchemaType, Box<dyn Error>> {
let file_meta = reader.metadata().file_metadata();
let mut schema_fields = HashMap::new();
for field in file_meta.schema().get_fields().iter() {
schema_fields.insert(field.name().to_owned(), field);
}
let mut projected_fields = columns
.iter()
.map(|c| {
schema_fields
.get_mut(c)
.expect(format!("column '{}' doesn't exist", c).as_str())
.clone()
})
.collect();
Ok(
SchemaType::group_type_builder(&file_meta.schema().get_basic_info().name())
.with_fields(&mut projected_fields)
.build()
.unwrap(),
)
}
macro_rules! element_to_value {
($ft:expr, $obj:ident, $i:ident, $settings:ident) => {
match $ft {
FieldType::Null => Value::Null,
FieldType::Bool => Value::Bool($obj.get_bool($i)?),
FieldType::Byte => Value::Number($obj.get_byte($i)?.into()),
FieldType::Short => Value::Number($obj.get_short($i)?.into()),
FieldType::Int => Value::Number($obj.get_int($i)?.into()),
FieldType::Long => Value::Number($obj.get_long($i)?.into()),
FieldType::UByte => Value::Number($obj.get_ubyte($i)?.into()),
FieldType::UShort => Value::Number($obj.get_ushort($i)?.into()),
FieldType::UInt => Value::Number($obj.get_uint($i)?.into()),
FieldType::ULong => Value::Number($obj.get_ulong($i)?.into()),
FieldType::Float => float_to_value($obj.get_float($i)? as f64),
FieldType::Double => float_to_value($obj.get_double($i)?),
FieldType::Decimal => Value::String(decimal_to_string($obj.get_decimal($i)?)),
FieldType::Str => Value::String($obj.get_string($i)?.to_string()),
FieldType::Bytes => bytes_to_value($obj.get_bytes($i)?.data()),
FieldType::Date => date_to_value($obj.get_date($i)?)?,
FieldType::TimestampMillis => {
timestamp_to_value($settings, $obj.get_timestamp_millis($i)?)?
}
FieldType::TimestampMicros => timestamp_to_value(
$settings,
$obj.get_timestamp_micros($i).map(|ts| ts / 1000)?,
)?,
FieldType::Group => row_to_value($settings, $obj.get_group($i)?)?,
FieldType::List => list_to_value($settings, $obj.get_list($i)?)?,
FieldType::Map => map_to_value($settings, $obj.get_map($i)?)?,
}
};
}
fn top_level_rows_to_json(
settings: &Settings,
mut rows: RowIter,
mut writer: Box<dyn Write>,
) -> Result<(), Box<dyn Error>> {
while let Some(row) = rows.next() {
let value = row_to_value(settings, &row)?;
let value = if value.is_null() {
Value::Object(serde_json::Map::default())
} else {
value
};
writeln!(writer, "{}", serde_json::to_string(&value)?)?;
}
Ok(())
}
fn top_level_rows_to_csv(
settings: &Settings,
mut rows: RowIter,
mut writer: Box<dyn Write>,
) -> Result<(), Box<dyn Error>> {
while let Some(row) = rows.next() {
let mut csv_writer = csv::WriterBuilder::new()
.terminator(Terminator::Any(b'\r'))
.from_writer(vec![]);
for i in 0..row.len() {
let field_type = row.get_field_type(i);
let value = element_to_value!(field_type, row, i, settings);
csv_writer.write_field(value_to_csv(&value))?;
}
csv_writer.write_record(None::<&[u8]>)?;
writeln!(writer, "{}", String::from_utf8(csv_writer.into_inner()?)?)?;
}
Ok(())
}
fn value_to_csv(value: &Value) -> String {
match value {
Value::Null => String::new(),
Value::Bool(v) => v.to_string(),
Value::Number(ref v) => {
if v.is_f64() {
let mut buffer = ryu::Buffer::new();
buffer.format(v.as_f64().unwrap()).to_owned()
} else if v.is_u64() {
format!("{}", v.as_u64().unwrap())
} else {
format!("{}", v.as_i64().unwrap())
}
}
Value::String(ref v) => v.to_owned(),
Value::Array(ref v) => serde_json::to_string(&v).unwrap(),
Value::Object(ref v) => serde_json::to_string(&v).unwrap(),
}
}
fn row_to_value(settings: &Settings, row: &Row) -> Result<Value, Box<dyn Error>> {
let mut map = serde_json::Map::with_capacity(row.len());
for i in 0..row.len() {
let name = row.get_field_name(i);
let field_type = row.get_field_type(i);
let value = element_to_value!(field_type, row, i, settings);
if !(settings.omit_nulls && value.is_null()) {
map.insert(name.to_string(), value);
}
}
if settings.omit_empty_bags && map.is_empty() {
Ok(Value::Null)
} else {
Ok(Value::Object(map))
}
}
fn list_to_value(settings: &Settings, list: &List) -> Result<Value, Box<dyn Error>> {
let mut arr = Vec::<Value>::with_capacity(list.len());
for i in 0..list.len() {
let elt_ty = list.get_element_type(i);
let value = element_to_value!(elt_ty, list, i, settings);
arr.push(value);
}
if settings.omit_empty_lists && arr.is_empty() {
Ok(Value::Null)
} else {
Ok(Value::Array(arr))
}
}
fn map_to_value(settings: &Settings, map: &Map) -> Result<Value, Box<dyn Error>> {
let mut jsmap = serde_json::Map::with_capacity(map.len());
let keys = map.get_keys();
let values = map.get_values();
for i in 0..map.len() {
let key_ty = keys.get_element_type(i);
let key = match key_ty {
FieldType::Str => keys.get_string(i)?.to_string(),
// TODO: return error here
_ => panic!("Non-string key"),
};
let val_ty = values.get_element_type(i);
let value = element_to_value!(val_ty, values, i, settings);
if !(settings.omit_nulls && value.is_null()) {
jsmap.insert(key, value);
}
}
if settings.omit_empty_bags && jsmap.is_empty() {
Ok(Value::Null)
} else {
Ok(Value::Object(jsmap))
}
}
fn bytes_to_value(bytes: &[u8]) -> Value {
let nums = bytes
.iter()
.map(|&b| Value::Number(b.into()))
.collect::<Vec<_>>();
Value::Array(nums)
}
fn float_to_value(f: f64) -> Value {
Number::from_f64(f)
.map(|n| Value::Number(n))
.unwrap_or_else(|| Value::Null)
}
const TICKS_TILL_UNIX_TIME: u64 = 621355968000000000u64;
fn timestamp_to_value(settings: &Settings, ts: u64) -> Result<Value, Box<dyn Error>> {
match settings.timestamp_rendering {
TimestampRendering::Ticks => {
let ticks = ts
.checked_mul(10000)
.and_then(|t| t.checked_add(TICKS_TILL_UNIX_TIME));
let v = ticks
.map(|t| Value::Number(t.into()))
.unwrap_or(Value::Null);
Ok(v)
}
TimestampRendering::IsoStr => {
let seconds = (ts / 1000) as i64;
let nanos = ((ts % 1000) * 1000000) as u32;
let datetime =
if let Some(dt) = chrono::NaiveDateTime::from_timestamp_opt(seconds, nanos) {
dt
} else {
return Ok(Value::Null);
};
let iso_str = datetime.format("%Y-%m-%dT%H:%M:%S.%6fZ").to_string();
Ok(Value::String(iso_str))
}
TimestampRendering::UnixMs => Ok(Value::Number(ts.into())),
}
}
fn date_to_value(days_from_epoch: u32) -> Result<Value, Box<dyn Error>> {
let date = match chrono::NaiveDate::from_ymd(1970, 1, 1)
.checked_add_signed(Duration::days(days_from_epoch as i64))
{
Some(date) => date,
None => return Ok(Value::Null),
};
let iso_str = date.format("%Y-%m-%d").to_string();
Ok(Value::String(iso_str))
}
fn decimal_to_string(decimal: &Decimal) -> String {
assert!(decimal.scale() >= 0 && decimal.precision() > decimal.scale());
// Specify as signed bytes to resolve sign as part of conversion.
let num = BigInt::from_signed_bytes_be(decimal.data());
// Offset of the first digit in a string.
let negative = if num.sign() == Sign::Minus { 1 } else { 0 };
let mut num_str = num.to_string();
let mut point = num_str.len() as i32 - decimal.scale() - negative;
// Convert to string form without scientific notation.
if point <= 0 {
// Zeros need to be prepended to the unscaled value.
while point < 0 {
num_str.insert(negative as usize, '0');
point += 1;
}
num_str.insert_str(negative as usize, "0.");
} else {
// No zeroes need to be prepended to the unscaled value, simply insert decimal
// point.
num_str.insert((point + negative) as usize, '.');
}
num_str
}

Просмотреть файл

@ -1,380 +1,138 @@
use clap::{App, Arg};
use num_bigint::{BigInt, Sign};
use parquet::data_type::Decimal;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::record::{FieldType, List, ListAccessor, Map, MapAccessor, Row, RowAccessor};
use parquet::schema::types::{Type as SchemaType};
use parquet::schema::printer::{print_file_metadata, print_parquet_metadata};
use serde_json::{Number, Value};
use std::collections::HashMap;
use std::error::Error;
use std::fs::File;
use std::io::{self, BufWriter, Write};
use std::path::Path;
fn main() {
let matches = App::new("pq2json")
.version("0.1")
.arg(
Arg::with_name("omit-nulls")
.long("omit-nulls")
.help("Omit bag entries with null values")
.takes_value(false)
.required(false),
)
.arg(
Arg::with_name("omit-empty-bags")
.long("omit-empty-bags")
.help("Omit empty property bags")
.takes_value(false)
.required(false),
)
.arg(
Arg::with_name("omit-empty-lists")
.long("omit-empty-lists")
.help("Omit empty lists")
.takes_value(false)
.required(false),
)
.arg(
Arg::with_name("prune")
.short("p")
.long("prune")
.help(
"Omit nulls, empty bags and empty lists \
(equivalent to a combination of the three --omit-... options)",
)
.takes_value(false)
.required(false),
)
.arg(
Arg::with_name("timestamp")
.short("t")
.long("timestamp")
.possible_values(&["isostr", "ticks", "unixms"])
.default_value("isostr")
.help(
"Timestamp rendering option. Either \
ticks (100ns ticks since 01-01-01), \
isostr (ISO8601 string) \
or unixms (milliseconds since Unix epoch)",
)
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name("columns")
.short("c")
.long("columns")
.help("Comma separated top-level columns to select")
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name("output")
.short("o")
.long("output")
.value_name("OUT_FILE")
.help("Output file")
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name("schema")
.long("schema")
.help("Print schema")
.takes_value(false)
.required(false),
)
.arg(
Arg::with_name("INPUT")
.help("Input file to use")
.required(true)
.index(1),
)
.arg(
Arg::with_name("v")
.short("v")
.multiple(true)
.help("Sets the level of verbosity"),
)
.get_matches();
let input = matches.value_of("INPUT").expect("INPUT must be provided");
let output = matches.value_of("OUT_FILE").unwrap_or("");
let timestamp_rendering = match matches.value_of("timestamp").unwrap_or("ticks") {
"ticks" => TimestampRendering::Ticks,
"isostr" => TimestampRendering::IsoStr,
"unixms" => TimestampRendering::UnixMs,
_ => TimestampRendering::IsoStr,
};
let columns: &str = matches.value_of("columns").unwrap_or("");
let settings = Settings {
omit_nulls: matches.is_present("omit-nulls") || matches.is_present("prune"),
omit_empty_bags: matches.is_present("omit-empty-bags") || matches.is_present("prune"),
omit_empty_lists: matches.is_present("omit-empty-lists") || matches.is_present("prune"),
timestamp_rendering: timestamp_rendering,
columns: if columns.is_empty() { None } else { Some(columns.split(",").map(|s| s.to_string()).collect()) }
};
let res = if matches.is_present("schema") {
print_schema(input)
} else {
convert(&settings, input, output)
};
match res {
Ok(()) => (),
Err(e) => {
eprintln!("ERROR: {:?}", e);
std::process::exit(-1);
}
}
}
#[derive(Debug, Clone)]
struct Settings {
omit_nulls: bool,
omit_empty_bags: bool,
omit_empty_lists: bool,
timestamp_rendering: TimestampRendering,
columns: Option<Vec<String>>
}
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
enum TimestampRendering {
Ticks,
IsoStr,
UnixMs,
}
const WRITER_BUF_CAP: usize = 256 * 1024;
fn convert(settings: &Settings, input: &str, output: &str) -> Result<(), Box<dyn Error>> {
let file = File::open(&Path::new(input))?;
let reader = SerializedFileReader::new(file)?;
let mut writer = if output.is_empty() {
Box::new(BufWriter::with_capacity(WRITER_BUF_CAP, io::stdout())) as Box<dyn Write>
} else {
Box::new(BufWriter::with_capacity(
WRITER_BUF_CAP,
File::create(&Path::new(output))?,
)) as Box<dyn Write>
};
let schema = settings.columns.as_ref()
.map(|c| get_projected_schema(&reader, &c).unwrap());
let mut iter = reader.get_row_iter(schema)?;
while let Some(record) = iter.next() {
let value = top_level_row_to_value(settings, &record)?;
writeln!(writer, "{}", serde_json::to_string(&value)?)?;
}
Ok(())
}
fn print_schema(input: &str) -> Result<(), Box<dyn Error>> {
let file = File::open(&Path::new(input))?;
let reader = SerializedFileReader::new(file)?;
let meta = reader.metadata();
let mut output = Vec::new();
print_parquet_metadata(&mut output, &meta);
println!("\n\nParquet metadata");
println!("=================================================");
println!("{}", String::from_utf8(output)?);
let mut output = Vec::new();
let file_meta = reader.metadata().file_metadata();
print_file_metadata(&mut output, &file_meta);
println!("\n\nFile metadata");
println!("=================================================");
println!("{}", String::from_utf8(output)?);
Ok(())
}
fn get_projected_schema(reader: &SerializedFileReader<File>, columns: &Vec<String>) -> Result<SchemaType, Box<dyn Error>> {
let file_meta = reader.metadata().file_metadata();
let mut schema_fields = HashMap::new();
for field in file_meta.schema().get_fields().iter() {
schema_fields.insert(field.name().to_owned(), field);
}
// TODO: return error if non-existent column specified
let mut projected_fields = columns.iter()
.map(|c| schema_fields[c].clone())
.collect();
Ok(SchemaType::group_type_builder(&file_meta.schema().get_basic_info().name())
.with_fields(&mut projected_fields)
.build()
.unwrap())
}
macro_rules! element_to_value {
($ft:expr, $obj:ident, $i:ident, $settings:ident) => {
match $ft {
FieldType::Null => Value::Null,
FieldType::Bool => Value::Bool($obj.get_bool($i)?),
FieldType::Byte => Value::Number($obj.get_byte($i)?.into()),
FieldType::Short => Value::Number($obj.get_short($i)?.into()),
FieldType::Int => Value::Number($obj.get_int($i)?.into()),
FieldType::Long => Value::Number($obj.get_long($i)?.into()),
FieldType::UByte => Value::Number($obj.get_ubyte($i)?.into()),
FieldType::UShort => Value::Number($obj.get_ushort($i)?.into()),
FieldType::UInt => Value::Number($obj.get_uint($i)?.into()),
FieldType::ULong => Value::Number($obj.get_ulong($i)?.into()),
FieldType::Float => float_to_value($obj.get_float($i)? as f64),
FieldType::Double => float_to_value($obj.get_double($i)?),
FieldType::Decimal => Value::String(decimal_to_string($obj.get_decimal($i)?)),
FieldType::Str => Value::String($obj.get_string($i)?.to_string()),
FieldType::Bytes => bytes_to_value($obj.get_bytes($i)?.data()),
FieldType::Date => Value::Number($obj.get_date($i)?.into()),
FieldType::TimestampMillis => timestamp_to_value($settings, $obj.get_timestamp_millis($i)?)?,
FieldType::TimestampMicros => timestamp_to_value($settings, $obj.get_timestamp_micros($i).map(|ts| ts / 1000)?)?,
FieldType::Group => row_to_value($settings, $obj.get_group($i)?)?,
FieldType::List => list_to_value($settings, $obj.get_list($i)?)?,
FieldType::Map => map_to_value($settings, $obj.get_map($i)?)?,
}
};
}
fn top_level_row_to_value(settings: &Settings, row: &Row) -> Result<Value, Box<dyn Error>> {
let value = row_to_value(settings, row)?;
if value.is_null() {
Ok(Value::Object(serde_json::Map::default()))
} else {
Ok(value)
}
}
fn row_to_value(settings: &Settings, row: &Row) -> Result<Value, Box<dyn Error>> {
let mut map = serde_json::Map::with_capacity(row.len());
for i in 0..row.len() {
let name = row.get_field_name(i);
let field_type = row.get_field_type(i);
let value = element_to_value!(field_type, row, i, settings);
if !(settings.omit_nulls && value.is_null()) {
map.insert(name.to_string(), value);
}
}
if settings.omit_empty_bags && map.is_empty() {
Ok(Value::Null)
} else {
Ok(Value::Object(map))
}
}
fn list_to_value(settings: &Settings, list: &List) -> Result<Value, Box<dyn Error>> {
let mut arr = Vec::<Value>::with_capacity(list.len());
for i in 0..list.len() {
let elt_ty = list.get_element_type(i);
let value = element_to_value!(elt_ty, list, i, settings);
arr.push(value);
}
if settings.omit_empty_lists && arr.is_empty() {
Ok(Value::Null)
} else {
Ok(Value::Array(arr))
}
}
fn map_to_value(settings: &Settings, map: &Map) -> Result<Value, Box<dyn Error>> {
let mut jsmap = serde_json::Map::with_capacity(map.len());
let keys = map.get_keys();
let values = map.get_values();
for i in 0..map.len() {
let key_ty = keys.get_element_type(i);
let key = match key_ty {
FieldType::Str => keys.get_string(i)?.to_string(),
// TODO: return error here
_ => panic!("Non-string key"),
};
let val_ty = values.get_element_type(i);
let value = element_to_value!(val_ty, values, i, settings);
if !(settings.omit_nulls && value.is_null()) {
jsmap.insert(key, value);
}
}
if settings.omit_empty_bags && jsmap.is_empty() {
Ok(Value::Null)
} else {
Ok(Value::Object(jsmap))
}
}
fn bytes_to_value(bytes: &[u8]) -> Value {
let nums = bytes
.iter()
.map(|&b| Value::Number(b.into()))
.collect::<Vec<_>>();
Value::Array(nums)
}
fn float_to_value(f: f64) -> Value {
Number::from_f64(f)
.map(|n| Value::Number(n))
.unwrap_or_else(|| Value::Null)
}
const TICKS_TILL_UNIX_TIME: u64 = 621355968000000000u64;
fn timestamp_to_value(settings: &Settings, ts: u64) -> Result<Value, Box<dyn Error>> {
match settings.timestamp_rendering {
TimestampRendering::Ticks => {
let ticks = ts
.checked_mul(10000)
.and_then(|t| t.checked_add(TICKS_TILL_UNIX_TIME));
let v = ticks
.map(|t| Value::Number(t.into()))
.unwrap_or(Value::Null);
Ok(v)
}
TimestampRendering::IsoStr => {
let seconds = (ts / 1000) as i64;
let nanos = ((ts % 1000) * 1000000) as u32;
let datetime =
if let Some(dt) = chrono::NaiveDateTime::from_timestamp_opt(seconds, nanos) {
dt
} else {
return Ok(Value::Null);
};
let iso_str = datetime.format("%Y-%m-%dT%H:%M:%S.%6fZ").to_string();
Ok(Value::String(iso_str))
}
TimestampRendering::UnixMs => Ok(Value::Number(ts.into())),
}
}
fn decimal_to_string(decimal: &Decimal) -> String {
assert!(decimal.scale() >= 0 && decimal.precision() > decimal.scale());
// Specify as signed bytes to resolve sign as part of conversion.
let num = BigInt::from_signed_bytes_be(decimal.data());
// Offset of the first digit in a string.
let negative = if num.sign() == Sign::Minus { 1 } else { 0 };
let mut num_str = num.to_string();
let mut point = num_str.len() as i32 - decimal.scale() - negative;
// Convert to string form without scientific notation.
if point <= 0 {
// Zeros need to be prepended to the unscaled value.
while point < 0 {
num_str.insert(negative as usize, '0');
point += 1;
}
num_str.insert_str(negative as usize, "0.");
} else {
// No zeroes need to be prepended to the unscaled value, simply insert decimal
// point.
num_str.insert((point + negative) as usize, '.');
}
num_str
}
use clap::{App, Arg};
use crate::settings::{Settings, TimestampRendering};
mod converter;
mod schema;
mod settings;
fn main() {
let matches = App::new("pq2json")
.version("0.1")
.arg(
Arg::with_name("omit-nulls")
.long("omit-nulls")
.help("Omit bag entries with null values")
.takes_value(false)
.required(false),
)
.arg(
Arg::with_name("omit-empty-bags")
.long("omit-empty-bags")
.help("Omit empty property bags")
.takes_value(false)
.required(false),
)
.arg(
Arg::with_name("omit-empty-lists")
.long("omit-empty-lists")
.help("Omit empty lists")
.takes_value(false)
.required(false),
)
.arg(
Arg::with_name("prune")
.short("p")
.long("prune")
.help(
"Omit nulls, empty bags and empty lists \
(equivalent to a combination of the three --omit-... options)",
)
.takes_value(false)
.required(false),
)
.arg(
Arg::with_name("csv")
.long("csv")
.help("Output root level fields in CSV format")
.takes_value(false)
.required(false),
)
.arg(
Arg::with_name("timestamp")
.short("t")
.long("timestamp")
.possible_values(&["isostr", "ticks", "unixms"])
.default_value("isostr")
.help(
"Timestamp rendering option. Either \
ticks (100ns ticks since 01-01-01), \
isostr (ISO8601 string) \
or unixms (milliseconds since Unix epoch)",
)
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name("columns")
.short("c")
.long("columns")
.help("Comma separated top-level columns to select")
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name("output")
.short("o")
.long("output")
.value_name("OUT_FILE")
.help("Output file")
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name("schema")
.long("schema")
.help("Print schema")
.takes_value(false)
.required(false),
)
.arg(
Arg::with_name("INPUT")
.help("Input file to use")
.required(true)
.index(1),
)
.arg(
Arg::with_name("v")
.short("v")
.multiple(true)
.help("Sets the level of verbosity"),
)
.get_matches();
let input = matches.value_of("INPUT").expect("INPUT must be provided");
let output = matches.value_of("OUT_FILE");
let timestamp_rendering = match matches.value_of("timestamp").unwrap_or("ticks") {
"ticks" => TimestampRendering::Ticks,
"isostr" => TimestampRendering::IsoStr,
"unixms" => TimestampRendering::UnixMs,
_ => TimestampRendering::IsoStr,
};
let settings = Settings {
omit_nulls: matches.is_present("omit-nulls") || matches.is_present("prune"),
omit_empty_bags: matches.is_present("omit-empty-bags") || matches.is_present("prune"),
timestamp_rendering,
omit_empty_lists: matches.is_present("omit-empty-lists") || matches.is_present("prune"),
columns: matches
.value_of("columns")
.map(|columns| columns.split(",").map(|s| s.to_string()).collect()),
csv: matches.is_present("csv"),
};
let res = if matches.is_present("schema") {
schema::print_schema(input)
} else {
converter::convert(&settings, input, output)
};
match res {
Ok(()) => (),
Err(e) => {
eprintln!("ERROR: {:?}", e);
std::process::exit(-1);
}
}
}

31
pq2json/src/schema.rs Normal file
Просмотреть файл

@ -0,0 +1,31 @@
use std::error::Error;
use std::fs::File;
use std::path::Path;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::schema::printer::{print_file_metadata, print_parquet_metadata};
/// Prints Parquet file schema information
///
/// Arguments:
///
/// * `input_file` - Parquet file path
///
pub fn print_schema(input_file: &str) -> Result<(), Box<dyn Error>> {
let file = File::open(&Path::new(input_file))?;
let reader = SerializedFileReader::new(file)?;
let meta = reader.metadata();
let mut output = Vec::new();
print_parquet_metadata(&mut output, &meta);
println!("\n\nParquet metadata");
println!("=================================================");
println!("{}", String::from_utf8(output)?);
let mut output = Vec::new();
let file_meta = reader.metadata().file_metadata();
print_file_metadata(&mut output, &file_meta);
println!("\n\nFile metadata");
println!("=================================================");
println!("{}", String::from_utf8(output)?);
Ok(())
}

16
pq2json/src/settings.rs Normal file
Просмотреть файл

@ -0,0 +1,16 @@
#[derive(Debug, Clone)]
pub struct Settings {
pub omit_nulls: bool,
pub omit_empty_bags: bool,
pub omit_empty_lists: bool,
pub timestamp_rendering: TimestampRendering,
pub columns: Option<Vec<String>>,
pub csv: bool,
}
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum TimestampRendering {
Ticks,
IsoStr,
UnixMs,
}