azure-kusto-parquet-conv/KUSTO.patch

300 строки
11 KiB
Diff

Only in arrow-rs: Cargo.lock
diff -ru arrow-rs.orig/parquet/src/data_type.rs arrow-rs/parquet/src/data_type.rs
--- arrow-rs.orig/parquet/src/data_type.rs 2024-06-25 13:10:49.853750300 +0300
+++ arrow-rs/parquet/src/data_type.rs 2024-06-25 13:46:47.955666200 +0300
@@ -62,6 +62,12 @@
seconds * 1_000 + nanoseconds / 1_000_000
}
+ /// Converts this INT96 into an i64 representing the number of MICROSECONDS since Epoch
+ pub fn to_micros(&self) -> i64 {
+ let (seconds, nanoseconds) = self.to_seconds_and_nanos();
+ seconds * 1_000_000 + nanoseconds / 1_000
+ }
+
/// Converts this INT96 into an i64 representing the number of NANOSECONDS since EPOCH
///
/// Will wrap around on overflow
diff -ru arrow-rs.orig/parquet/src/record/api.rs arrow-rs/parquet/src/record/api.rs
--- arrow-rs.orig/parquet/src/record/api.rs 2024-06-25 13:10:49.870755100 +0300
+++ arrow-rs/parquet/src/record/api.rs 2024-06-25 14:03:23.583254500 +0300
@@ -132,8 +132,58 @@
}
}
+/// Type of the field within the `Row`.
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
+pub enum FieldType {
+ /// Null
+ Null,
+ /// Boolean value (`true`, `false`).
+ Bool,
+ /// Signed integer INT_8.
+ Byte,
+ /// Signed integer INT_16.
+ Short,
+ /// Signed integer INT_32.
+ Int,
+ /// Signed integer INT_64.
+ Long,
+ // Unsigned integer UINT_8.
+ UByte,
+ // Unsigned integer UINT_16.
+ UShort,
+ // Unsigned integer UINT_32.
+ UInt,
+ // Unsigned integer UINT_64.
+ ULong,
+ /// IEEE 32-bit floating point value.
+ Float,
+ /// IEEE 64-bit floating point value.
+ Double,
+ /// Decimal value.
+ Decimal,
+ /// UTF-8 encoded character string.
+ Str,
+ /// General binary value.
+ Bytes,
+ /// Date without a time of day, stores the number of days from the
+ /// Unix epoch, 1 January 1970.
+ Date,
+ /// Milliseconds from the Unix epoch, 1 January 1970.
+ TimestampMillis,
+ /// Microseconds from the Unix epoch, 1 January 1970.
+ TimestampMicros,
+ /// Struct, child elements are tuples of field-value pairs.
+ Group,
+ /// List of elements.
+ List,
+ /// List of key-value pairs.
+ Map,
+}
+
/// Trait for type-safe convenient access to fields within a Row.
pub trait RowAccessor {
+ fn get_field_type(&self, i: usize) -> FieldType;
+ fn get_field_name(&self, i: usize) -> &str;
fn get_bool(&self, i: usize) -> Result<bool>;
fn get_byte(&self, i: usize) -> Result<i8>;
fn get_short(&self, i: usize) -> Result<i16>;
@@ -148,6 +198,7 @@
fn get_double(&self, i: usize) -> Result<f64>;
fn get_timestamp_millis(&self, i: usize) -> Result<i64>;
fn get_timestamp_micros(&self, i: usize) -> Result<i64>;
+ fn get_date(&self, i: usize) -> Result<i32>;
fn get_decimal(&self, i: usize) -> Result<&Decimal>;
fn get_string(&self, i: usize) -> Result<&String>;
fn get_bytes(&self, i: usize) -> Result<&ByteArray>;
@@ -220,6 +271,14 @@
}
impl RowAccessor for Row {
+ fn get_field_type(&self, i: usize) -> FieldType {
+ self.fields[i].1.get_field_type()
+ }
+
+ fn get_field_name(&self, i: usize) -> &str {
+ &self.fields[i].0
+ }
+
row_primitive_accessor!(get_bool, Bool, bool);
row_primitive_accessor!(get_byte, Byte, i8);
@@ -248,6 +307,8 @@
row_primitive_accessor!(get_timestamp_micros, TimestampMicros, i64);
+ row_primitive_accessor!(get_date, Date, i32);
+
row_complex_accessor!(get_decimal, Decimal, Decimal);
row_complex_accessor!(get_string, Str, String);
@@ -309,6 +370,7 @@
/// Trait for type-safe access of an index for a `List`.
/// Note that the get_XXX methods do not do bound checking.
pub trait ListAccessor {
+ fn get_element_type(&self, i: usize) -> FieldType;
fn get_bool(&self, i: usize) -> Result<bool>;
fn get_byte(&self, i: usize) -> Result<i8>;
fn get_short(&self, i: usize) -> Result<i16>;
@@ -323,6 +385,7 @@
fn get_double(&self, i: usize) -> Result<f64>;
fn get_timestamp_millis(&self, i: usize) -> Result<i64>;
fn get_timestamp_micros(&self, i: usize) -> Result<i64>;
+ fn get_date(&self, i: usize) -> Result<i32>;
fn get_decimal(&self, i: usize) -> Result<&Decimal>;
fn get_string(&self, i: usize) -> Result<&String>;
fn get_bytes(&self, i: usize) -> Result<&ByteArray>;
@@ -366,6 +429,10 @@
}
impl ListAccessor for List {
+ fn get_element_type(&self, i: usize) -> FieldType {
+ self.elements[i].get_field_type()
+ }
+
list_primitive_accessor!(get_bool, Bool, bool);
list_primitive_accessor!(get_byte, Byte, i8);
@@ -394,6 +461,8 @@
list_primitive_accessor!(get_timestamp_micros, TimestampMicros, i64);
+ list_primitive_accessor!(get_date, Date, i32);
+
list_complex_accessor!(get_decimal, Decimal, Decimal);
list_complex_accessor!(get_string, Str, String);
@@ -433,6 +502,8 @@
/// Trait for type-safe access of an index for a `Map`
pub trait MapAccessor {
+ fn key_type(&self, i: usize) -> FieldType;
+ fn value_type(&self, i: usize) -> FieldType;
fn get_keys<'a>(&'a self) -> Box<dyn ListAccessor + 'a>;
fn get_values<'a>(&'a self) -> Box<dyn ListAccessor + 'a>;
}
@@ -459,6 +530,10 @@
}
impl<'a> ListAccessor for MapList<'a> {
+ fn get_element_type(&self, i: usize) -> FieldType {
+ self.elements[i].get_field_type()
+ }
+
map_list_primitive_accessor!(get_bool, Bool, bool);
map_list_primitive_accessor!(get_byte, Byte, i8);
@@ -487,6 +562,8 @@
map_list_primitive_accessor!(get_timestamp_micros, TimestampMicros, i64);
+ map_list_primitive_accessor!(get_date, Date, i32);
+
list_complex_accessor!(get_decimal, Decimal, Decimal);
list_complex_accessor!(get_string, Str, String);
@@ -501,6 +578,14 @@
}
impl MapAccessor for Map {
+ fn key_type(&self, i: usize) -> FieldType {
+ self.entries[i].0.get_field_type()
+ }
+
+ fn value_type(&self, i: usize) -> FieldType {
+ self.entries[i].1.get_field_type()
+ }
+
fn get_keys<'a>(&'a self) -> Box<dyn ListAccessor + 'a> {
let map_list = MapList {
elements: self.entries.iter().map(|v| &v.0).collect(),
@@ -599,6 +684,33 @@
}
}
+ pub fn get_field_type(&self) -> FieldType {
+ match self {
+ Field::Null => FieldType::Null,
+ Field::Bool(_) => FieldType::Bool,
+ Field::Byte(_) => FieldType::Byte,
+ Field::Short(_) => FieldType::Short,
+ Field::Int(_) => FieldType::Int,
+ Field::Long(_) => FieldType::Long,
+ Field::UByte(_) => FieldType::UByte,
+ Field::UShort(_) => FieldType::UShort,
+ Field::UInt(_) => FieldType::UInt,
+ Field::ULong(_) => FieldType::ULong,
+ Field::Float(_) => FieldType::Float,
+ Field::Float16(_) => FieldType::Float,
+ Field::Double(_) => FieldType::Double,
+ Field::Decimal(_) => FieldType::Decimal,
+ Field::Str(_) => FieldType::Str,
+ Field::Bytes(_) => FieldType::Bytes,
+ Field::TimestampMillis(_) => FieldType::TimestampMillis,
+ Field::TimestampMicros(_) => FieldType::TimestampMicros,
+ Field::Date(_) => FieldType::Date,
+ Field::Group(_) => FieldType::Group,
+ Field::ListInternal(_) => FieldType::List,
+ Field::MapInternal(_) => FieldType::Map,
+ }
+ }
+
/// Determines if this Row represents a primitive value.
pub fn is_primitive(&self) -> bool {
!matches!(
@@ -654,7 +766,13 @@
/// `Timestamp` value.
#[inline]
pub fn convert_int96(_descr: &ColumnDescPtr, value: Int96) -> Self {
- Field::TimestampMillis(value.to_i64())
+ let micros = value.to_micros();
+ if micros < 0 {
+ // XXX: Temporary workaround for negative timestamps -- return 1970-01-01T00:00:00Z
+ Field::TimestampMicros(0)
+ } else {
+ Field::TimestampMicros(micros as i64)
+ }
}
/// Converts Parquet FLOAT type with logical type into `f32` value.
@@ -1050,8 +1168,23 @@
let value = Int96::from(vec![4165425152, 13, 2454923]);
let row = Field::convert_int96(&descr, value);
assert_eq!(row, Field::TimestampMillis(1238544060000));
+
+ // Negative int96
+ let value = Int96::from(vec![0, 0, 0]);
+ let row = Field::convert_int96(&descr, value);
+ assert_eq!(row, Field::TimestampMillis(0));
}
+ //#[test]
+ //#[should_panic(expected = "Expected non-negative milliseconds when converting Int96")]
+ //fn test_row_convert_int96_invalid() {
+ // INT96 value does not depend on logical type
+ // let descr = make_column_descr![PhysicalType::INT96, LogicalType::NONE];
+
+ // let value = Int96::from(vec![0, 0, 0]);
+ // Field::convert_int96(&descr, value);
+ //}
+
#[test]
fn test_row_convert_float() {
// FLOAT value does not depend on logical type
diff -ru arrow-rs.orig/parquet/src/record/mod.rs arrow-rs/parquet/src/record/mod.rs
--- arrow-rs.orig/parquet/src/record/mod.rs 2024-06-25 13:10:49.870755100 +0300
+++ arrow-rs/parquet/src/record/mod.rs 2024-06-25 12:49:09.422781500 +0300
@@ -25,7 +25,8 @@
pub use self::{
api::{
- Field, List, ListAccessor, Map, MapAccessor, Row, RowAccessor, RowColumnIter, RowFormatter,
+ Field, FieldType, List, ListAccessor, Map, MapAccessor, Row, RowAccessor, RowColumnIter,
+ RowFormatter,
},
record_reader::RecordReader,
record_writer::RecordWriter,
diff -ru arrow-rs.orig/parquet/src/record/reader.rs arrow-rs/parquet/src/record/reader.rs
--- arrow-rs.orig/parquet/src/record/reader.rs 2024-06-25 13:10:49.871756000 +0300
+++ arrow-rs/parquet/src/record/reader.rs 2024-06-25 12:50:25.274021200 +0300
@@ -400,9 +400,15 @@
fn read_field(&mut self) -> Result<Field> {
let field = match *self {
Reader::PrimitiveReader(_, ref mut column) => {
- let value = column.current_value()?;
- column.read_next()?;
- value
+ // It's not obvious why some bytearray (string) fields are read through this PrimitiveReader,
+ // the following condition adds support for nullable fields:
+ if column.is_null() {
+ Field::Null
+ } else {
+ let value = column.current_value()?;
+ column.read_next()?;
+ value
+ }
}
Reader::OptionReader(def_level, ref mut reader) => {
if reader.current_def_level() > def_level {