Add support for CSV output format.
When `--csv` argument is used, the utility produces CSV format. Nested top level elements (arrays, objects) are formatted as JSON. Using the option alone saves some time already: ``` PS C:\Users\mispecto\Projects\azure-kusto-parquet-conv> Measure-Command { .\target\debug\pq2json.exe ..\..\Downloads\20200809201109710_53ea103e_e9fabfe6_001.parquet > 1 } Days : 0 Hours : 0 Minutes : 0 Seconds : 11 Milliseconds : 361 Ticks : 113610039 TotalDays : 0.000131493100694444 TotalHours : 0.00315583441666667 TotalMinutes : 0.189350065 TotalSeconds : 11.3610039 TotalMilliseconds : 11361.0039 PS C:\Users\mispecto\Projects\azure-kusto-parquet-conv> Measure-Command { .\target\debug\pq2json.exe --csv ..\..\Downloads\20200809201109710_53ea103e_e9fabfe6_001.parquet > 1 } Days : 0 Hours : 0 Minutes : 0 Seconds : 5 Milliseconds : 624 Ticks : 56247514 TotalDays : 6.51012893518518E-05 TotalHours : 0.00156243094444444 TotalMinutes : 0.0937458566666667 TotalSeconds : 5.6247514 TotalMilliseconds : 5624.7514 ```
This commit is contained in:
Родитель
d6d3fb9fc7
Коммит
45e6d85e5e
|
@ -56,7 +56,7 @@ source = "git+https://github.com/rzheka/arrow.git?branch=dev#31f4f2342be32281716
|
|||
dependencies = [
|
||||
"arrow-flight 0.18.0-SNAPSHOT (git+https://github.com/rzheka/arrow.git?branch=dev)",
|
||||
"chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"csv 1.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"csv 1.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"flatbuffers 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"hex 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"indexmap 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -315,7 +315,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "csv"
|
||||
version = "1.1.3"
|
||||
version = "1.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"bstr 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -915,6 +915,7 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"csv 1.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"num-bigint 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"parquet 0.18.0-SNAPSHOT (git+https://github.com/rzheka/arrow.git?branch=dev)",
|
||||
|
@ -928,7 +929,7 @@ version = "0.8.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"atty 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"csv 1.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"csv 1.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"encode_unicode 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"term 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -1871,7 +1872,7 @@ dependencies = [
|
|||
"checksum crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb"
|
||||
"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
|
||||
"checksum crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
|
||||
"checksum csv 1.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "00affe7f6ab566df61b4be3ce8cf16bc2576bca0963ceb0955e45d514bf9a279"
|
||||
"checksum csv 1.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "fc4666154fd004af3fd6f1da2e81a96fd5a81927fe8ddb6ecc79e2aa6e138b54"
|
||||
"checksum csv-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90"
|
||||
"checksum dirs 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3fd78930633bd1c6e35c4b42b1df7b0cbc6bc191146e512bb3bedf243fcc3901"
|
||||
"checksum either 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "5527cfe0d098f36e3f8839852688e63c8fff1c90b2b405aef730615f9a7bcf7b"
|
||||
|
|
|
@ -2,18 +2,17 @@
|
|||
<package >
|
||||
<metadata>
|
||||
<id>pq2json</id>
|
||||
<version>0.1.4</version>
|
||||
<version>0.1.5</version>
|
||||
<authors>Evgeney Ryzhyk</authors>
|
||||
<owners>Evgeney Ryzhyk</owners>
|
||||
<license type="expression">MIT</license>
|
||||
<projectUrl>https://github.com/Azure/azure-kusto-parquet-conv</projectUrl>
|
||||
<requireLicenseAcceptance>false</requireLicenseAcceptance>
|
||||
<description>Parquet to JSON (line delimited) converter tool.</description>
|
||||
<releaseNotes>Fixed crash when reading null byte array field.</releaseNotes>
|
||||
<copyright>Copyright 2019</copyright>
|
||||
<releaseNotes>Support output to pseudo-CSV format.</releaseNotes>
|
||||
<copyright>Copyright 2020</copyright>
|
||||
<tags></tags>
|
||||
<dependencies>
|
||||
</dependencies>
|
||||
<dependencies></dependencies>
|
||||
</metadata>
|
||||
<files>
|
||||
<file src="target\release\pq2json.exe" target="tools\x64" />
|
||||
|
|
|
@ -12,3 +12,4 @@ serde = "1"
|
|||
serde_json = "1"
|
||||
num-bigint = "0.2"
|
||||
chrono = "0.4"
|
||||
csv = "1.1.4"
|
|
@ -0,0 +1,300 @@
|
|||
use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
use std::fs::File;
|
||||
use std::io::{self, BufWriter, Write};
|
||||
use std::path::Path;
|
||||
|
||||
use num_bigint::{BigInt, Sign};
|
||||
use parquet::data_type::Decimal;
|
||||
use parquet::file::reader::{FileReader, SerializedFileReader};
|
||||
use parquet::record::{FieldType, List, ListAccessor, Map, MapAccessor, Row, RowAccessor};
|
||||
use parquet::schema::types::Type as SchemaType;
|
||||
use serde_json::{Number, Value};
|
||||
|
||||
use crate::settings::{Settings, TimestampRendering};
|
||||
use parquet::record::reader::RowIter;
|
||||
|
||||
const WRITER_BUF_CAP: usize = 256 * 1024;
|
||||
|
||||
/// Writes Parquet file as text, either as JSONL (every line contains single JSON record)
|
||||
/// or as CSV (where nested structures are formatted as JSON strings).
|
||||
///
|
||||
/// Arguments:
|
||||
///
|
||||
/// * `settings` - Converter settings
|
||||
/// * `input_file` - Parquet file path
|
||||
/// * `output_file` - Optional output file path (if not provided - output is written to STDOUT).
|
||||
///
|
||||
pub fn convert(
|
||||
settings: &Settings,
|
||||
input_file: &str,
|
||||
output_file: Option<&str>,
|
||||
) -> Result<(), Box<dyn Error>> {
|
||||
let file = File::open(&Path::new(input_file))?;
|
||||
let reader = SerializedFileReader::new(file)?;
|
||||
|
||||
let writer = match output_file {
|
||||
Some(output_file) => Box::new(BufWriter::with_capacity(
|
||||
WRITER_BUF_CAP,
|
||||
File::create(&Path::new(output_file))?,
|
||||
)) as Box<dyn Write>,
|
||||
None => Box::new(BufWriter::with_capacity(WRITER_BUF_CAP, io::stdout())) as Box<dyn Write>,
|
||||
};
|
||||
|
||||
let schema = settings
|
||||
.columns
|
||||
.as_ref()
|
||||
.map(|c| projected_schema(&reader, &c).unwrap());
|
||||
|
||||
let rows = reader.get_row_iter(schema)?;
|
||||
|
||||
if settings.csv {
|
||||
top_level_rows_to_csv(&settings, rows, writer)
|
||||
} else {
|
||||
top_level_rows_to_json(&settings, rows, writer)
|
||||
}
|
||||
}
|
||||
|
||||
fn projected_schema(
|
||||
reader: &SerializedFileReader<File>,
|
||||
columns: &Vec<String>,
|
||||
) -> Result<SchemaType, Box<dyn Error>> {
|
||||
let file_meta = reader.metadata().file_metadata();
|
||||
let mut schema_fields = HashMap::new();
|
||||
for field in file_meta.schema().get_fields().iter() {
|
||||
schema_fields.insert(field.name().to_owned(), field);
|
||||
}
|
||||
|
||||
let mut projected_fields = columns
|
||||
.iter()
|
||||
.map(|c| {
|
||||
schema_fields
|
||||
.get_mut(c)
|
||||
.expect(format!("column '{}' doesn't exist", c).as_str())
|
||||
.clone()
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(
|
||||
SchemaType::group_type_builder(&file_meta.schema().get_basic_info().name())
|
||||
.with_fields(&mut projected_fields)
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
}
|
||||
|
||||
macro_rules! element_to_value {
|
||||
($ft:expr, $obj:ident, $i:ident, $settings:ident) => {
|
||||
match $ft {
|
||||
FieldType::Null => Value::Null,
|
||||
FieldType::Bool => Value::Bool($obj.get_bool($i)?),
|
||||
FieldType::Byte => Value::Number($obj.get_byte($i)?.into()),
|
||||
FieldType::Short => Value::Number($obj.get_short($i)?.into()),
|
||||
FieldType::Int => Value::Number($obj.get_int($i)?.into()),
|
||||
FieldType::Long => Value::Number($obj.get_long($i)?.into()),
|
||||
FieldType::UByte => Value::Number($obj.get_ubyte($i)?.into()),
|
||||
FieldType::UShort => Value::Number($obj.get_ushort($i)?.into()),
|
||||
FieldType::UInt => Value::Number($obj.get_uint($i)?.into()),
|
||||
FieldType::ULong => Value::Number($obj.get_ulong($i)?.into()),
|
||||
FieldType::Float => float_to_value($obj.get_float($i)? as f64),
|
||||
FieldType::Double => float_to_value($obj.get_double($i)?),
|
||||
FieldType::Decimal => Value::String(decimal_to_string($obj.get_decimal($i)?)),
|
||||
FieldType::Str => Value::String($obj.get_string($i)?.to_string()),
|
||||
FieldType::Bytes => bytes_to_value($obj.get_bytes($i)?.data()),
|
||||
FieldType::Date => Value::Number($obj.get_date($i)?.into()),
|
||||
FieldType::TimestampMillis => {
|
||||
timestamp_to_value($settings, $obj.get_timestamp_millis($i)?)?
|
||||
}
|
||||
FieldType::TimestampMicros => timestamp_to_value(
|
||||
$settings,
|
||||
$obj.get_timestamp_micros($i).map(|ts| ts / 1000)?,
|
||||
)?,
|
||||
FieldType::Group => row_to_value($settings, $obj.get_group($i)?)?,
|
||||
FieldType::List => list_to_value($settings, $obj.get_list($i)?)?,
|
||||
FieldType::Map => map_to_value($settings, $obj.get_map($i)?)?,
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
fn top_level_rows_to_json(
|
||||
settings: &Settings,
|
||||
mut rows: RowIter,
|
||||
mut writer: Box<dyn Write>,
|
||||
) -> Result<(), Box<dyn Error>> {
|
||||
while let Some(row) = rows.next() {
|
||||
let value = row_to_value(settings, &row)?;
|
||||
let value = if value.is_null() {
|
||||
Value::Object(serde_json::Map::default())
|
||||
} else {
|
||||
value
|
||||
};
|
||||
writeln!(writer, "{}", serde_json::to_string(&value)?)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn top_level_rows_to_csv(
|
||||
settings: &Settings,
|
||||
mut rows: RowIter,
|
||||
mut writer: Box<dyn Write>,
|
||||
) -> Result<(), Box<dyn Error>> {
|
||||
while let Some(row) = rows.next() {
|
||||
let mut csv_writer = csv::Writer::from_writer(vec![]);
|
||||
for i in 0..row.len() {
|
||||
let field_type = row.get_field_type(i);
|
||||
let value = element_to_value!(field_type, row, i, settings);
|
||||
csv_writer.write_field(value_to_csv(&value))?;
|
||||
}
|
||||
writeln!(writer, "{}", String::from_utf8(csv_writer.into_inner()?)?)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn value_to_csv(value: &Value) -> String {
|
||||
match value {
|
||||
Value::Null => String::new(),
|
||||
Value::Bool(v) => v.to_string(),
|
||||
Value::Number(ref v) => {
|
||||
if v.is_f64() {
|
||||
format!("{:e}", v.as_f64().unwrap())
|
||||
} else if v.is_u64() {
|
||||
format!("{}", v.as_u64().unwrap())
|
||||
} else {
|
||||
format!("{}", v.as_i64().unwrap())
|
||||
}
|
||||
}
|
||||
Value::String(ref v) => v.to_owned(),
|
||||
Value::Array(ref v) => serde_json::to_string(&v).unwrap(),
|
||||
Value::Object(ref v) => serde_json::to_string(&v).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
fn row_to_value(settings: &Settings, row: &Row) -> Result<Value, Box<dyn Error>> {
|
||||
let mut map = serde_json::Map::with_capacity(row.len());
|
||||
for i in 0..row.len() {
|
||||
let name = row.get_field_name(i);
|
||||
let field_type = row.get_field_type(i);
|
||||
let value = element_to_value!(field_type, row, i, settings);
|
||||
if !(settings.omit_nulls && value.is_null()) {
|
||||
map.insert(name.to_string(), value);
|
||||
}
|
||||
}
|
||||
|
||||
if settings.omit_empty_bags && map.is_empty() {
|
||||
Ok(Value::Null)
|
||||
} else {
|
||||
Ok(Value::Object(map))
|
||||
}
|
||||
}
|
||||
|
||||
fn list_to_value(settings: &Settings, list: &List) -> Result<Value, Box<dyn Error>> {
|
||||
let mut arr = Vec::<Value>::with_capacity(list.len());
|
||||
for i in 0..list.len() {
|
||||
let elt_ty = list.get_element_type(i);
|
||||
let value = element_to_value!(elt_ty, list, i, settings);
|
||||
arr.push(value);
|
||||
}
|
||||
|
||||
if settings.omit_empty_lists && arr.is_empty() {
|
||||
Ok(Value::Null)
|
||||
} else {
|
||||
Ok(Value::Array(arr))
|
||||
}
|
||||
}
|
||||
|
||||
fn map_to_value(settings: &Settings, map: &Map) -> Result<Value, Box<dyn Error>> {
|
||||
let mut jsmap = serde_json::Map::with_capacity(map.len());
|
||||
let keys = map.get_keys();
|
||||
let values = map.get_values();
|
||||
for i in 0..map.len() {
|
||||
let key_ty = keys.get_element_type(i);
|
||||
let key = match key_ty {
|
||||
FieldType::Str => keys.get_string(i)?.to_string(),
|
||||
// TODO: return error here
|
||||
_ => panic!("Non-string key"),
|
||||
};
|
||||
|
||||
let val_ty = values.get_element_type(i);
|
||||
let value = element_to_value!(val_ty, values, i, settings);
|
||||
if !(settings.omit_nulls && value.is_null()) {
|
||||
jsmap.insert(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
if settings.omit_empty_bags && jsmap.is_empty() {
|
||||
Ok(Value::Null)
|
||||
} else {
|
||||
Ok(Value::Object(jsmap))
|
||||
}
|
||||
}
|
||||
|
||||
fn bytes_to_value(bytes: &[u8]) -> Value {
|
||||
let nums = bytes
|
||||
.iter()
|
||||
.map(|&b| Value::Number(b.into()))
|
||||
.collect::<Vec<_>>();
|
||||
Value::Array(nums)
|
||||
}
|
||||
|
||||
fn float_to_value(f: f64) -> Value {
|
||||
Number::from_f64(f)
|
||||
.map(|n| Value::Number(n))
|
||||
.unwrap_or_else(|| Value::Null)
|
||||
}
|
||||
|
||||
const TICKS_TILL_UNIX_TIME: u64 = 621355968000000000u64;
|
||||
|
||||
fn timestamp_to_value(settings: &Settings, ts: u64) -> Result<Value, Box<dyn Error>> {
|
||||
match settings.timestamp_rendering {
|
||||
TimestampRendering::Ticks => {
|
||||
let ticks = ts
|
||||
.checked_mul(10000)
|
||||
.and_then(|t| t.checked_add(TICKS_TILL_UNIX_TIME));
|
||||
let v = ticks
|
||||
.map(|t| Value::Number(t.into()))
|
||||
.unwrap_or(Value::Null);
|
||||
Ok(v)
|
||||
}
|
||||
TimestampRendering::IsoStr => {
|
||||
let seconds = (ts / 1000) as i64;
|
||||
let nanos = ((ts % 1000) * 1000000) as u32;
|
||||
let datetime =
|
||||
if let Some(dt) = chrono::NaiveDateTime::from_timestamp_opt(seconds, nanos) {
|
||||
dt
|
||||
} else {
|
||||
return Ok(Value::Null);
|
||||
};
|
||||
let iso_str = datetime.format("%Y-%m-%dT%H:%M:%S.%6fZ").to_string();
|
||||
Ok(Value::String(iso_str))
|
||||
}
|
||||
TimestampRendering::UnixMs => Ok(Value::Number(ts.into())),
|
||||
}
|
||||
}
|
||||
|
||||
fn decimal_to_string(decimal: &Decimal) -> String {
|
||||
assert!(decimal.scale() >= 0 && decimal.precision() > decimal.scale());
|
||||
|
||||
// Specify as signed bytes to resolve sign as part of conversion.
|
||||
let num = BigInt::from_signed_bytes_be(decimal.data());
|
||||
|
||||
// Offset of the first digit in a string.
|
||||
let negative = if num.sign() == Sign::Minus { 1 } else { 0 };
|
||||
let mut num_str = num.to_string();
|
||||
let mut point = num_str.len() as i32 - decimal.scale() - negative;
|
||||
|
||||
// Convert to string form without scientific notation.
|
||||
if point <= 0 {
|
||||
// Zeros need to be prepended to the unscaled value.
|
||||
while point < 0 {
|
||||
num_str.insert(negative as usize, '0');
|
||||
point += 1;
|
||||
}
|
||||
num_str.insert_str(negative as usize, "0.");
|
||||
} else {
|
||||
// No zeroes need to be prepended to the unscaled value, simply insert decimal
|
||||
// point.
|
||||
num_str.insert((point + negative) as usize, '.');
|
||||
}
|
||||
|
||||
num_str
|
||||
}
|
|
@ -1,380 +1,138 @@
|
|||
use clap::{App, Arg};
|
||||
use num_bigint::{BigInt, Sign};
|
||||
use parquet::data_type::Decimal;
|
||||
use parquet::file::reader::{FileReader, SerializedFileReader};
|
||||
use parquet::record::{FieldType, List, ListAccessor, Map, MapAccessor, Row, RowAccessor};
|
||||
use parquet::schema::types::{Type as SchemaType};
|
||||
use parquet::schema::printer::{print_file_metadata, print_parquet_metadata};
|
||||
use serde_json::{Number, Value};
|
||||
use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
use std::fs::File;
|
||||
use std::io::{self, BufWriter, Write};
|
||||
use std::path::Path;
|
||||
|
||||
fn main() {
|
||||
let matches = App::new("pq2json")
|
||||
.version("0.1")
|
||||
.arg(
|
||||
Arg::with_name("omit-nulls")
|
||||
.long("omit-nulls")
|
||||
.help("Omit bag entries with null values")
|
||||
.takes_value(false)
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("omit-empty-bags")
|
||||
.long("omit-empty-bags")
|
||||
.help("Omit empty property bags")
|
||||
.takes_value(false)
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("omit-empty-lists")
|
||||
.long("omit-empty-lists")
|
||||
.help("Omit empty lists")
|
||||
.takes_value(false)
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("prune")
|
||||
.short("p")
|
||||
.long("prune")
|
||||
.help(
|
||||
"Omit nulls, empty bags and empty lists \
|
||||
(equivalent to a combination of the three --omit-... options)",
|
||||
)
|
||||
.takes_value(false)
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("timestamp")
|
||||
.short("t")
|
||||
.long("timestamp")
|
||||
.possible_values(&["isostr", "ticks", "unixms"])
|
||||
.default_value("isostr")
|
||||
.help(
|
||||
"Timestamp rendering option. Either \
|
||||
ticks (100ns ticks since 01-01-01), \
|
||||
isostr (ISO8601 string) \
|
||||
or unixms (milliseconds since Unix epoch)",
|
||||
)
|
||||
.takes_value(true)
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("columns")
|
||||
.short("c")
|
||||
.long("columns")
|
||||
.help("Comma separated top-level columns to select")
|
||||
.takes_value(true)
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("output")
|
||||
.short("o")
|
||||
.long("output")
|
||||
.value_name("OUT_FILE")
|
||||
.help("Output file")
|
||||
.takes_value(true)
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("schema")
|
||||
.long("schema")
|
||||
.help("Print schema")
|
||||
.takes_value(false)
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("INPUT")
|
||||
.help("Input file to use")
|
||||
.required(true)
|
||||
.index(1),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("v")
|
||||
.short("v")
|
||||
.multiple(true)
|
||||
.help("Sets the level of verbosity"),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
let input = matches.value_of("INPUT").expect("INPUT must be provided");
|
||||
let output = matches.value_of("OUT_FILE").unwrap_or("");
|
||||
|
||||
let timestamp_rendering = match matches.value_of("timestamp").unwrap_or("ticks") {
|
||||
"ticks" => TimestampRendering::Ticks,
|
||||
"isostr" => TimestampRendering::IsoStr,
|
||||
"unixms" => TimestampRendering::UnixMs,
|
||||
_ => TimestampRendering::IsoStr,
|
||||
};
|
||||
|
||||
let columns: &str = matches.value_of("columns").unwrap_or("");
|
||||
|
||||
let settings = Settings {
|
||||
omit_nulls: matches.is_present("omit-nulls") || matches.is_present("prune"),
|
||||
omit_empty_bags: matches.is_present("omit-empty-bags") || matches.is_present("prune"),
|
||||
omit_empty_lists: matches.is_present("omit-empty-lists") || matches.is_present("prune"),
|
||||
timestamp_rendering: timestamp_rendering,
|
||||
columns: if columns.is_empty() { None } else { Some(columns.split(",").map(|s| s.to_string()).collect()) }
|
||||
};
|
||||
|
||||
let res = if matches.is_present("schema") {
|
||||
print_schema(input)
|
||||
} else {
|
||||
convert(&settings, input, output)
|
||||
};
|
||||
|
||||
match res {
|
||||
Ok(()) => (),
|
||||
Err(e) => {
|
||||
eprintln!("ERROR: {:?}", e);
|
||||
std::process::exit(-1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct Settings {
|
||||
omit_nulls: bool,
|
||||
omit_empty_bags: bool,
|
||||
omit_empty_lists: bool,
|
||||
timestamp_rendering: TimestampRendering,
|
||||
columns: Option<Vec<String>>
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
|
||||
enum TimestampRendering {
|
||||
Ticks,
|
||||
IsoStr,
|
||||
UnixMs,
|
||||
}
|
||||
|
||||
const WRITER_BUF_CAP: usize = 256 * 1024;
|
||||
|
||||
fn convert(settings: &Settings, input: &str, output: &str) -> Result<(), Box<dyn Error>> {
|
||||
let file = File::open(&Path::new(input))?;
|
||||
let reader = SerializedFileReader::new(file)?;
|
||||
|
||||
let mut writer = if output.is_empty() {
|
||||
Box::new(BufWriter::with_capacity(WRITER_BUF_CAP, io::stdout())) as Box<dyn Write>
|
||||
} else {
|
||||
Box::new(BufWriter::with_capacity(
|
||||
WRITER_BUF_CAP,
|
||||
File::create(&Path::new(output))?,
|
||||
)) as Box<dyn Write>
|
||||
};
|
||||
|
||||
let schema = settings.columns.as_ref()
|
||||
.map(|c| get_projected_schema(&reader, &c).unwrap());
|
||||
|
||||
let mut iter = reader.get_row_iter(schema)?;
|
||||
while let Some(record) = iter.next() {
|
||||
let value = top_level_row_to_value(settings, &record)?;
|
||||
writeln!(writer, "{}", serde_json::to_string(&value)?)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_schema(input: &str) -> Result<(), Box<dyn Error>> {
|
||||
let file = File::open(&Path::new(input))?;
|
||||
let reader = SerializedFileReader::new(file)?;
|
||||
let meta = reader.metadata();
|
||||
let mut output = Vec::new();
|
||||
print_parquet_metadata(&mut output, &meta);
|
||||
println!("\n\nParquet metadata");
|
||||
println!("=================================================");
|
||||
println!("{}", String::from_utf8(output)?);
|
||||
|
||||
let mut output = Vec::new();
|
||||
let file_meta = reader.metadata().file_metadata();
|
||||
print_file_metadata(&mut output, &file_meta);
|
||||
println!("\n\nFile metadata");
|
||||
println!("=================================================");
|
||||
println!("{}", String::from_utf8(output)?);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_projected_schema(reader: &SerializedFileReader<File>, columns: &Vec<String>) -> Result<SchemaType, Box<dyn Error>> {
|
||||
let file_meta = reader.metadata().file_metadata();
|
||||
let mut schema_fields = HashMap::new();
|
||||
for field in file_meta.schema().get_fields().iter() {
|
||||
schema_fields.insert(field.name().to_owned(), field);
|
||||
}
|
||||
// TODO: return error if non-existent column specified
|
||||
let mut projected_fields = columns.iter()
|
||||
.map(|c| schema_fields[c].clone())
|
||||
.collect();
|
||||
Ok(SchemaType::group_type_builder(&file_meta.schema().get_basic_info().name())
|
||||
.with_fields(&mut projected_fields)
|
||||
.build()
|
||||
.unwrap())
|
||||
}
|
||||
|
||||
macro_rules! element_to_value {
|
||||
($ft:expr, $obj:ident, $i:ident, $settings:ident) => {
|
||||
match $ft {
|
||||
FieldType::Null => Value::Null,
|
||||
FieldType::Bool => Value::Bool($obj.get_bool($i)?),
|
||||
FieldType::Byte => Value::Number($obj.get_byte($i)?.into()),
|
||||
FieldType::Short => Value::Number($obj.get_short($i)?.into()),
|
||||
FieldType::Int => Value::Number($obj.get_int($i)?.into()),
|
||||
FieldType::Long => Value::Number($obj.get_long($i)?.into()),
|
||||
FieldType::UByte => Value::Number($obj.get_ubyte($i)?.into()),
|
||||
FieldType::UShort => Value::Number($obj.get_ushort($i)?.into()),
|
||||
FieldType::UInt => Value::Number($obj.get_uint($i)?.into()),
|
||||
FieldType::ULong => Value::Number($obj.get_ulong($i)?.into()),
|
||||
FieldType::Float => float_to_value($obj.get_float($i)? as f64),
|
||||
FieldType::Double => float_to_value($obj.get_double($i)?),
|
||||
FieldType::Decimal => Value::String(decimal_to_string($obj.get_decimal($i)?)),
|
||||
FieldType::Str => Value::String($obj.get_string($i)?.to_string()),
|
||||
FieldType::Bytes => bytes_to_value($obj.get_bytes($i)?.data()),
|
||||
FieldType::Date => Value::Number($obj.get_date($i)?.into()),
|
||||
FieldType::TimestampMillis => timestamp_to_value($settings, $obj.get_timestamp_millis($i)?)?,
|
||||
FieldType::TimestampMicros => timestamp_to_value($settings, $obj.get_timestamp_micros($i).map(|ts| ts / 1000)?)?,
|
||||
FieldType::Group => row_to_value($settings, $obj.get_group($i)?)?,
|
||||
FieldType::List => list_to_value($settings, $obj.get_list($i)?)?,
|
||||
FieldType::Map => map_to_value($settings, $obj.get_map($i)?)?,
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
fn top_level_row_to_value(settings: &Settings, row: &Row) -> Result<Value, Box<dyn Error>> {
|
||||
let value = row_to_value(settings, row)?;
|
||||
if value.is_null() {
|
||||
Ok(Value::Object(serde_json::Map::default()))
|
||||
} else {
|
||||
Ok(value)
|
||||
}
|
||||
}
|
||||
|
||||
fn row_to_value(settings: &Settings, row: &Row) -> Result<Value, Box<dyn Error>> {
|
||||
let mut map = serde_json::Map::with_capacity(row.len());
|
||||
for i in 0..row.len() {
|
||||
let name = row.get_field_name(i);
|
||||
let field_type = row.get_field_type(i);
|
||||
let value = element_to_value!(field_type, row, i, settings);
|
||||
if !(settings.omit_nulls && value.is_null()) {
|
||||
map.insert(name.to_string(), value);
|
||||
}
|
||||
}
|
||||
|
||||
if settings.omit_empty_bags && map.is_empty() {
|
||||
Ok(Value::Null)
|
||||
} else {
|
||||
Ok(Value::Object(map))
|
||||
}
|
||||
}
|
||||
|
||||
fn list_to_value(settings: &Settings, list: &List) -> Result<Value, Box<dyn Error>> {
|
||||
let mut arr = Vec::<Value>::with_capacity(list.len());
|
||||
for i in 0..list.len() {
|
||||
let elt_ty = list.get_element_type(i);
|
||||
let value = element_to_value!(elt_ty, list, i, settings);
|
||||
arr.push(value);
|
||||
}
|
||||
|
||||
if settings.omit_empty_lists && arr.is_empty() {
|
||||
Ok(Value::Null)
|
||||
} else {
|
||||
Ok(Value::Array(arr))
|
||||
}
|
||||
}
|
||||
|
||||
fn map_to_value(settings: &Settings, map: &Map) -> Result<Value, Box<dyn Error>> {
|
||||
let mut jsmap = serde_json::Map::with_capacity(map.len());
|
||||
let keys = map.get_keys();
|
||||
let values = map.get_values();
|
||||
for i in 0..map.len() {
|
||||
let key_ty = keys.get_element_type(i);
|
||||
let key = match key_ty {
|
||||
FieldType::Str => keys.get_string(i)?.to_string(),
|
||||
// TODO: return error here
|
||||
_ => panic!("Non-string key"),
|
||||
};
|
||||
|
||||
let val_ty = values.get_element_type(i);
|
||||
let value = element_to_value!(val_ty, values, i, settings);
|
||||
if !(settings.omit_nulls && value.is_null()) {
|
||||
jsmap.insert(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
if settings.omit_empty_bags && jsmap.is_empty() {
|
||||
Ok(Value::Null)
|
||||
} else {
|
||||
Ok(Value::Object(jsmap))
|
||||
}
|
||||
}
|
||||
|
||||
fn bytes_to_value(bytes: &[u8]) -> Value {
|
||||
let nums = bytes
|
||||
.iter()
|
||||
.map(|&b| Value::Number(b.into()))
|
||||
.collect::<Vec<_>>();
|
||||
Value::Array(nums)
|
||||
}
|
||||
|
||||
fn float_to_value(f: f64) -> Value {
|
||||
Number::from_f64(f)
|
||||
.map(|n| Value::Number(n))
|
||||
.unwrap_or_else(|| Value::Null)
|
||||
}
|
||||
|
||||
const TICKS_TILL_UNIX_TIME: u64 = 621355968000000000u64;
|
||||
|
||||
fn timestamp_to_value(settings: &Settings, ts: u64) -> Result<Value, Box<dyn Error>> {
|
||||
match settings.timestamp_rendering {
|
||||
TimestampRendering::Ticks => {
|
||||
let ticks = ts
|
||||
.checked_mul(10000)
|
||||
.and_then(|t| t.checked_add(TICKS_TILL_UNIX_TIME));
|
||||
let v = ticks
|
||||
.map(|t| Value::Number(t.into()))
|
||||
.unwrap_or(Value::Null);
|
||||
Ok(v)
|
||||
}
|
||||
TimestampRendering::IsoStr => {
|
||||
let seconds = (ts / 1000) as i64;
|
||||
let nanos = ((ts % 1000) * 1000000) as u32;
|
||||
let datetime =
|
||||
if let Some(dt) = chrono::NaiveDateTime::from_timestamp_opt(seconds, nanos) {
|
||||
dt
|
||||
} else {
|
||||
return Ok(Value::Null);
|
||||
};
|
||||
let iso_str = datetime.format("%Y-%m-%dT%H:%M:%S.%6fZ").to_string();
|
||||
Ok(Value::String(iso_str))
|
||||
}
|
||||
TimestampRendering::UnixMs => Ok(Value::Number(ts.into())),
|
||||
}
|
||||
}
|
||||
|
||||
fn decimal_to_string(decimal: &Decimal) -> String {
|
||||
assert!(decimal.scale() >= 0 && decimal.precision() > decimal.scale());
|
||||
|
||||
// Specify as signed bytes to resolve sign as part of conversion.
|
||||
let num = BigInt::from_signed_bytes_be(decimal.data());
|
||||
|
||||
// Offset of the first digit in a string.
|
||||
let negative = if num.sign() == Sign::Minus { 1 } else { 0 };
|
||||
let mut num_str = num.to_string();
|
||||
let mut point = num_str.len() as i32 - decimal.scale() - negative;
|
||||
|
||||
// Convert to string form without scientific notation.
|
||||
if point <= 0 {
|
||||
// Zeros need to be prepended to the unscaled value.
|
||||
while point < 0 {
|
||||
num_str.insert(negative as usize, '0');
|
||||
point += 1;
|
||||
}
|
||||
num_str.insert_str(negative as usize, "0.");
|
||||
} else {
|
||||
// No zeroes need to be prepended to the unscaled value, simply insert decimal
|
||||
// point.
|
||||
num_str.insert((point + negative) as usize, '.');
|
||||
}
|
||||
|
||||
num_str
|
||||
}
|
||||
use clap::{App, Arg};
|
||||
|
||||
use crate::settings::{Settings, TimestampRendering};
|
||||
|
||||
mod converter;
|
||||
mod schema;
|
||||
mod settings;
|
||||
|
||||
fn main() {
|
||||
let matches = App::new("pq2json")
|
||||
.version("0.1")
|
||||
.arg(
|
||||
Arg::with_name("omit-nulls")
|
||||
.long("omit-nulls")
|
||||
.help("Omit bag entries with null values")
|
||||
.takes_value(false)
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("omit-empty-bags")
|
||||
.long("omit-empty-bags")
|
||||
.help("Omit empty property bags")
|
||||
.takes_value(false)
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("omit-empty-lists")
|
||||
.long("omit-empty-lists")
|
||||
.help("Omit empty lists")
|
||||
.takes_value(false)
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("prune")
|
||||
.short("p")
|
||||
.long("prune")
|
||||
.help(
|
||||
"Omit nulls, empty bags and empty lists \
|
||||
(equivalent to a combination of the three --omit-... options)",
|
||||
)
|
||||
.takes_value(false)
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("csv")
|
||||
.long("csv")
|
||||
.help("Output root level fields in CSV format")
|
||||
.takes_value(false)
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("timestamp")
|
||||
.short("t")
|
||||
.long("timestamp")
|
||||
.possible_values(&["isostr", "ticks", "unixms"])
|
||||
.default_value("isostr")
|
||||
.help(
|
||||
"Timestamp rendering option. Either \
|
||||
ticks (100ns ticks since 01-01-01), \
|
||||
isostr (ISO8601 string) \
|
||||
or unixms (milliseconds since Unix epoch)",
|
||||
)
|
||||
.takes_value(true)
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("columns")
|
||||
.short("c")
|
||||
.long("columns")
|
||||
.help("Comma separated top-level columns to select")
|
||||
.takes_value(true)
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("output")
|
||||
.short("o")
|
||||
.long("output")
|
||||
.value_name("OUT_FILE")
|
||||
.help("Output file")
|
||||
.takes_value(true)
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("schema")
|
||||
.long("schema")
|
||||
.help("Print schema")
|
||||
.takes_value(false)
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("INPUT")
|
||||
.help("Input file to use")
|
||||
.required(true)
|
||||
.index(1),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("v")
|
||||
.short("v")
|
||||
.multiple(true)
|
||||
.help("Sets the level of verbosity"),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
let input = matches.value_of("INPUT").expect("INPUT must be provided");
|
||||
let output = matches.value_of("OUT_FILE");
|
||||
|
||||
let timestamp_rendering = match matches.value_of("timestamp").unwrap_or("ticks") {
|
||||
"ticks" => TimestampRendering::Ticks,
|
||||
"isostr" => TimestampRendering::IsoStr,
|
||||
"unixms" => TimestampRendering::UnixMs,
|
||||
_ => TimestampRendering::IsoStr,
|
||||
};
|
||||
|
||||
let settings = Settings {
|
||||
omit_nulls: matches.is_present("omit-nulls") || matches.is_present("prune"),
|
||||
omit_empty_bags: matches.is_present("omit-empty-bags") || matches.is_present("prune"),
|
||||
timestamp_rendering,
|
||||
omit_empty_lists: matches.is_present("omit-empty-lists") || matches.is_present("prune"),
|
||||
columns: matches
|
||||
.value_of("columns")
|
||||
.map(|columns| columns.split(",").map(|s| s.to_string()).collect()),
|
||||
csv: matches.is_present("csv"),
|
||||
};
|
||||
|
||||
let res = if matches.is_present("schema") {
|
||||
schema::print_schema(input)
|
||||
} else {
|
||||
converter::convert(&settings, input, output)
|
||||
};
|
||||
|
||||
match res {
|
||||
Ok(()) => (),
|
||||
Err(e) => {
|
||||
eprintln!("ERROR: {:?}", e);
|
||||
std::process::exit(-1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
use std::error::Error;
|
||||
use std::fs::File;
|
||||
use std::path::Path;
|
||||
|
||||
use parquet::file::reader::{FileReader, SerializedFileReader};
|
||||
use parquet::schema::printer::{print_file_metadata, print_parquet_metadata};
|
||||
|
||||
/// Prints Parquet file schema information
|
||||
///
|
||||
/// Arguments:
|
||||
///
|
||||
/// * `input_file` - Parquet file path
|
||||
///
|
||||
pub fn print_schema(input_file: &str) -> Result<(), Box<dyn Error>> {
|
||||
let file = File::open(&Path::new(input_file))?;
|
||||
let reader = SerializedFileReader::new(file)?;
|
||||
let meta = reader.metadata();
|
||||
let mut output = Vec::new();
|
||||
print_parquet_metadata(&mut output, &meta);
|
||||
println!("\n\nParquet metadata");
|
||||
println!("=================================================");
|
||||
println!("{}", String::from_utf8(output)?);
|
||||
|
||||
let mut output = Vec::new();
|
||||
let file_meta = reader.metadata().file_metadata();
|
||||
print_file_metadata(&mut output, &file_meta);
|
||||
println!("\n\nFile metadata");
|
||||
println!("=================================================");
|
||||
println!("{}", String::from_utf8(output)?);
|
||||
Ok(())
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
#[derive(Debug, Clone)]
|
||||
pub struct Settings {
|
||||
pub omit_nulls: bool,
|
||||
pub omit_empty_bags: bool,
|
||||
pub omit_empty_lists: bool,
|
||||
pub timestamp_rendering: TimestampRendering,
|
||||
pub columns: Option<Vec<String>>,
|
||||
pub csv: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
|
||||
pub enum TimestampRendering {
|
||||
Ticks,
|
||||
IsoStr,
|
||||
UnixMs,
|
||||
}
|
Загрузка…
Ссылка в новой задаче