Only in arrow-rs: Cargo.lock diff -ru arrow-rs.orig/parquet/src/data_type.rs arrow-rs/parquet/src/data_type.rs --- arrow-rs.orig/parquet/src/data_type.rs 2024-06-25 13:10:49.853750300 +0300 +++ arrow-rs/parquet/src/data_type.rs 2024-06-25 13:46:47.955666200 +0300 @@ -62,6 +62,12 @@ seconds * 1_000 + nanoseconds / 1_000_000 } + /// Converts this INT96 into an i64 representing the number of MICROSECONDS since Epoch + pub fn to_micros(&self) -> i64 { + let (seconds, nanoseconds) = self.to_seconds_and_nanos(); + seconds * 1_000_000 + nanoseconds / 1_000 + } + /// Converts this INT96 into an i64 representing the number of NANOSECONDS since EPOCH /// /// Will wrap around on overflow diff -ru arrow-rs.orig/parquet/src/record/api.rs arrow-rs/parquet/src/record/api.rs --- arrow-rs.orig/parquet/src/record/api.rs 2024-06-25 13:10:49.870755100 +0300 +++ arrow-rs/parquet/src/record/api.rs 2024-06-25 14:03:23.583254500 +0300 @@ -132,8 +132,58 @@ } } +/// Type of the field within the `Row`. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +pub enum FieldType { + /// Null + Null, + /// Boolean value (`true`, `false`). + Bool, + /// Signed integer INT_8. + Byte, + /// Signed integer INT_16. + Short, + /// Signed integer INT_32. + Int, + /// Signed integer INT_64. + Long, + // Unsigned integer UINT_8. + UByte, + // Unsigned integer UINT_16. + UShort, + // Unsigned integer UINT_32. + UInt, + // Unsigned integer UINT_64. + ULong, + /// IEEE 32-bit floating point value. + Float, + /// IEEE 64-bit floating point value. + Double, + /// Decimal value. + Decimal, + /// UTF-8 encoded character string. + Str, + /// General binary value. + Bytes, + /// Date without a time of day, stores the number of days from the + /// Unix epoch, 1 January 1970. + Date, + /// Milliseconds from the Unix epoch, 1 January 1970. + TimestampMillis, + /// Microseconds from the Unix epoch, 1 January 1970. + TimestampMicros, + /// Struct, child elements are tuples of field-value pairs. + Group, + /// List of elements. + List, + /// List of key-value pairs. + Map, +} + /// Trait for type-safe convenient access to fields within a Row. pub trait RowAccessor { + fn get_field_type(&self, i: usize) -> FieldType; + fn get_field_name(&self, i: usize) -> &str; fn get_bool(&self, i: usize) -> Result; fn get_byte(&self, i: usize) -> Result; fn get_short(&self, i: usize) -> Result; @@ -148,6 +198,7 @@ fn get_double(&self, i: usize) -> Result; fn get_timestamp_millis(&self, i: usize) -> Result; fn get_timestamp_micros(&self, i: usize) -> Result; + fn get_date(&self, i: usize) -> Result; fn get_decimal(&self, i: usize) -> Result<&Decimal>; fn get_string(&self, i: usize) -> Result<&String>; fn get_bytes(&self, i: usize) -> Result<&ByteArray>; @@ -220,6 +271,14 @@ } impl RowAccessor for Row { + fn get_field_type(&self, i: usize) -> FieldType { + self.fields[i].1.get_field_type() + } + + fn get_field_name(&self, i: usize) -> &str { + &self.fields[i].0 + } + row_primitive_accessor!(get_bool, Bool, bool); row_primitive_accessor!(get_byte, Byte, i8); @@ -248,6 +307,8 @@ row_primitive_accessor!(get_timestamp_micros, TimestampMicros, i64); + row_primitive_accessor!(get_date, Date, i32); + row_complex_accessor!(get_decimal, Decimal, Decimal); row_complex_accessor!(get_string, Str, String); @@ -309,6 +370,7 @@ /// Trait for type-safe access of an index for a `List`. /// Note that the get_XXX methods do not do bound checking. pub trait ListAccessor { + fn get_element_type(&self, i: usize) -> FieldType; fn get_bool(&self, i: usize) -> Result; fn get_byte(&self, i: usize) -> Result; fn get_short(&self, i: usize) -> Result; @@ -323,6 +385,7 @@ fn get_double(&self, i: usize) -> Result; fn get_timestamp_millis(&self, i: usize) -> Result; fn get_timestamp_micros(&self, i: usize) -> Result; + fn get_date(&self, i: usize) -> Result; fn get_decimal(&self, i: usize) -> Result<&Decimal>; fn get_string(&self, i: usize) -> Result<&String>; fn get_bytes(&self, i: usize) -> Result<&ByteArray>; @@ -366,6 +429,10 @@ } impl ListAccessor for List { + fn get_element_type(&self, i: usize) -> FieldType { + self.elements[i].get_field_type() + } + list_primitive_accessor!(get_bool, Bool, bool); list_primitive_accessor!(get_byte, Byte, i8); @@ -394,6 +461,8 @@ list_primitive_accessor!(get_timestamp_micros, TimestampMicros, i64); + list_primitive_accessor!(get_date, Date, i32); + list_complex_accessor!(get_decimal, Decimal, Decimal); list_complex_accessor!(get_string, Str, String); @@ -433,6 +502,8 @@ /// Trait for type-safe access of an index for a `Map` pub trait MapAccessor { + fn key_type(&self, i: usize) -> FieldType; + fn value_type(&self, i: usize) -> FieldType; fn get_keys<'a>(&'a self) -> Box; fn get_values<'a>(&'a self) -> Box; } @@ -459,6 +530,10 @@ } impl<'a> ListAccessor for MapList<'a> { + fn get_element_type(&self, i: usize) -> FieldType { + self.elements[i].get_field_type() + } + map_list_primitive_accessor!(get_bool, Bool, bool); map_list_primitive_accessor!(get_byte, Byte, i8); @@ -487,6 +562,8 @@ map_list_primitive_accessor!(get_timestamp_micros, TimestampMicros, i64); + map_list_primitive_accessor!(get_date, Date, i32); + list_complex_accessor!(get_decimal, Decimal, Decimal); list_complex_accessor!(get_string, Str, String); @@ -501,6 +578,14 @@ } impl MapAccessor for Map { + fn key_type(&self, i: usize) -> FieldType { + self.entries[i].0.get_field_type() + } + + fn value_type(&self, i: usize) -> FieldType { + self.entries[i].1.get_field_type() + } + fn get_keys<'a>(&'a self) -> Box { let map_list = MapList { elements: self.entries.iter().map(|v| &v.0).collect(), @@ -599,6 +684,33 @@ } } + pub fn get_field_type(&self) -> FieldType { + match self { + Field::Null => FieldType::Null, + Field::Bool(_) => FieldType::Bool, + Field::Byte(_) => FieldType::Byte, + Field::Short(_) => FieldType::Short, + Field::Int(_) => FieldType::Int, + Field::Long(_) => FieldType::Long, + Field::UByte(_) => FieldType::UByte, + Field::UShort(_) => FieldType::UShort, + Field::UInt(_) => FieldType::UInt, + Field::ULong(_) => FieldType::ULong, + Field::Float(_) => FieldType::Float, + Field::Float16(_) => FieldType::Float, + Field::Double(_) => FieldType::Double, + Field::Decimal(_) => FieldType::Decimal, + Field::Str(_) => FieldType::Str, + Field::Bytes(_) => FieldType::Bytes, + Field::TimestampMillis(_) => FieldType::TimestampMillis, + Field::TimestampMicros(_) => FieldType::TimestampMicros, + Field::Date(_) => FieldType::Date, + Field::Group(_) => FieldType::Group, + Field::ListInternal(_) => FieldType::List, + Field::MapInternal(_) => FieldType::Map, + } + } + /// Determines if this Row represents a primitive value. pub fn is_primitive(&self) -> bool { !matches!( @@ -654,7 +766,13 @@ /// `Timestamp` value. #[inline] pub fn convert_int96(_descr: &ColumnDescPtr, value: Int96) -> Self { - Field::TimestampMillis(value.to_i64()) + let micros = value.to_micros(); + if micros < 0 { + // XXX: Temporary workaround for negative timestamps -- return 1970-01-01T00:00:00Z + Field::TimestampMicros(0) + } else { + Field::TimestampMicros(micros as i64) + } } /// Converts Parquet FLOAT type with logical type into `f32` value. @@ -1050,8 +1168,23 @@ let value = Int96::from(vec![4165425152, 13, 2454923]); let row = Field::convert_int96(&descr, value); assert_eq!(row, Field::TimestampMillis(1238544060000)); + + // Negative int96 + let value = Int96::from(vec![0, 0, 0]); + let row = Field::convert_int96(&descr, value); + assert_eq!(row, Field::TimestampMillis(0)); } + //#[test] + //#[should_panic(expected = "Expected non-negative milliseconds when converting Int96")] + //fn test_row_convert_int96_invalid() { + // INT96 value does not depend on logical type + // let descr = make_column_descr![PhysicalType::INT96, LogicalType::NONE]; + + // let value = Int96::from(vec![0, 0, 0]); + // Field::convert_int96(&descr, value); + //} + #[test] fn test_row_convert_float() { // FLOAT value does not depend on logical type diff -ru arrow-rs.orig/parquet/src/record/mod.rs arrow-rs/parquet/src/record/mod.rs --- arrow-rs.orig/parquet/src/record/mod.rs 2024-06-25 13:10:49.870755100 +0300 +++ arrow-rs/parquet/src/record/mod.rs 2024-06-25 12:49:09.422781500 +0300 @@ -25,7 +25,8 @@ pub use self::{ api::{ - Field, List, ListAccessor, Map, MapAccessor, Row, RowAccessor, RowColumnIter, RowFormatter, + Field, FieldType, List, ListAccessor, Map, MapAccessor, Row, RowAccessor, RowColumnIter, + RowFormatter, }, record_reader::RecordReader, record_writer::RecordWriter, diff -ru arrow-rs.orig/parquet/src/record/reader.rs arrow-rs/parquet/src/record/reader.rs --- arrow-rs.orig/parquet/src/record/reader.rs 2024-06-25 13:10:49.871756000 +0300 +++ arrow-rs/parquet/src/record/reader.rs 2024-06-25 12:50:25.274021200 +0300 @@ -400,9 +400,15 @@ fn read_field(&mut self) -> Result { let field = match *self { Reader::PrimitiveReader(_, ref mut column) => { - let value = column.current_value()?; - column.read_next()?; - value + // It's not obvious why some bytearray (string) fields are read through this PrimitiveReader, + // the following condition adds support for nullable fields: + if column.is_null() { + Field::Null + } else { + let value = column.current_value()?; + column.read_next()?; + value + } } Reader::OptionReader(def_level, ref mut reader) => { if reader.current_def_level() > def_level {