Created cplusplus feature to isolate C++ only elements (#1786)

* Created cplusplus feature to isolate C++ only elements

* cplusplus options

* PR feedback

* Deserializable and errorkind are used for CPP

* EH changes to reflect AMQP changes

* serialization and deserialization are cplusplus features
This commit is contained in:
Larry Osterman 2024-09-04 14:12:46 -07:00 коммит произвёл GitHub
Родитель 759a012e79
Коммит f54f0534ba
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
12 изменённых файлов: 1489 добавлений и 114 удалений

1
.vscode/cspell.json поставляемый
Просмотреть файл

@ -15,6 +15,7 @@
"asyncoperation", "asyncoperation",
"azsdk", "azsdk",
"azurecli", "azurecli",
"cplusplus",
"datalake", "datalake",
"datetime", "datetime",
"devicecode", "devicecode",

1220
eng/common/spelling/package-lock.json сгенерированный

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -5,6 +5,7 @@
"@cspell/cspell-bundled-dicts": "^6.12.0", "@cspell/cspell-bundled-dicts": "^6.12.0",
"@cspell/cspell-types": "^6.12.0", "@cspell/cspell-types": "^6.12.0",
"cspell": "^6.12.0", "cspell": "^6.12.0",
"cspell-lib": "^6.12.0" "cspell-lib": "^6.12.0",
"cspell-version-pin": "file:"
} }
} }

Просмотреть файл

@ -46,6 +46,7 @@ fe2o3-amqp = [
"serde_amqp", "serde_amqp",
"serde_bytes", "serde_bytes",
] ]
cplusplus = []
test_e2e = [] test_e2e = []
[package.metadata.docs.rs] [package.metadata.docs.rs]

Просмотреть файл

@ -134,38 +134,32 @@ impl From<fe2o3_amqp_types::messaging::ApplicationProperties>
impl From<fe2o3_amqp_types::messaging::Header> for AmqpMessageHeader { impl From<fe2o3_amqp_types::messaging::Header> for AmqpMessageHeader {
fn from(header: fe2o3_amqp_types::messaging::Header) -> Self { fn from(header: fe2o3_amqp_types::messaging::Header) -> Self {
AmqpMessageHeader::builder() println!("Source Header: {:?}", header);
let rv = AmqpMessageHeader::builder()
.with_durable(header.durable) .with_durable(header.durable)
.with_priority(header.priority.into()) .with_priority(header.priority.into())
.with_time_to_live(std::time::Duration::from_millis( .with_time_to_live(
header.ttl.unwrap_or(0) as u64 header
)) .ttl
.map(|t| std::time::Duration::from_millis(t as u64)),
)
.with_first_acquirer(header.first_acquirer) .with_first_acquirer(header.first_acquirer)
.with_delivery_count(header.delivery_count) .with_delivery_count(header.delivery_count)
.build() .build();
println!("Converted Header: {:?}", rv);
rv
} }
} }
impl From<AmqpMessageHeader> for fe2o3_amqp_types::messaging::Header { impl From<AmqpMessageHeader> for fe2o3_amqp_types::messaging::Header {
fn from(header: AmqpMessageHeader) -> Self { fn from(header: AmqpMessageHeader) -> Self {
let mut builder = fe2o3_amqp_types::messaging::Header::builder(); fe2o3_amqp_types::messaging::Header::builder()
.durable(header.durable())
if let Some(durable) = header.durable() { .priority(fe2o3_amqp_types::messaging::Priority(header.priority()))
builder = builder.durable(*durable); .ttl(header.time_to_live().map(|t| t.as_millis() as u32))
} .first_acquirer(header.first_acquirer())
if let Some(priority) = header.priority() { .delivery_count(header.delivery_count())
builder = builder.priority(fe2o3_amqp_types::messaging::Priority(*priority)); .build()
}
if let Some(time_to_live) = header.time_to_live() {
builder = builder.ttl(Some(time_to_live.as_millis() as u32));
}
if let Some(first_acquirer) = header.first_acquirer() {
builder = builder.first_acquirer(*first_acquirer);
}
if let Some(delivery_count) = header.delivery_count() {
builder = builder.delivery_count(*delivery_count);
}
builder.build()
} }
} }

Просмотреть файл

@ -57,22 +57,21 @@ where
let mut amqp_message_builder = AmqpMessage::builder(); let mut amqp_message_builder = AmqpMessage::builder();
if let Some(application_properties) = message.application_properties { if let Some(application_properties) = message.application_properties {
amqp_message_builder =
amqp_message_builder.with_application_properties(application_properties.into()); amqp_message_builder.with_application_properties(application_properties.into());
} }
let body = message.body; let body = message.body;
if body.is_empty() { if body.is_empty() {
let body = AmqpMessageBody::Empty; let body = AmqpMessageBody::Empty;
amqp_message_builder = amqp_message_builder.with_body(body); amqp_message_builder.with_body(body);
} else if body.is_data() { } else if body.is_data() {
let data = body.try_into_data().unwrap(); let data = body.try_into_data().unwrap();
let body = AmqpMessageBody::Binary(data.map(|x| x.to_vec()).collect()); let body = AmqpMessageBody::Binary(data.map(|x| x.to_vec()).collect());
amqp_message_builder = amqp_message_builder.with_body(body); amqp_message_builder.with_body(body);
} else if body.is_value() { } else if body.is_value() {
let value = body.try_into_value().unwrap(); let value = body.try_into_value().unwrap();
let value = value.try_into().unwrap(); let value = value.try_into().unwrap();
amqp_message_builder = amqp_message_builder.with_body(AmqpMessageBody::Value(value)); amqp_message_builder.with_body(AmqpMessageBody::Value(value));
} else if body.is_sequence() { } else if body.is_sequence() {
let sequence = body.try_into_sequence().unwrap(); let sequence = body.try_into_sequence().unwrap();
let body = AmqpMessageBody::Sequence( let body = AmqpMessageBody::Sequence(
@ -87,29 +86,27 @@ where
}) })
.collect(), .collect(),
); );
amqp_message_builder = amqp_message_builder.with_body(body); amqp_message_builder.with_body(body);
} }
if let Some(header) = message.header { if let Some(header) = message.header {
amqp_message_builder = amqp_message_builder.with_header(header.into()); amqp_message_builder.with_header(header.into());
} }
if let Some(properties) = message.properties { if let Some(properties) = message.properties {
amqp_message_builder = amqp_message_builder.with_properties(properties); amqp_message_builder.with_properties(properties);
} }
if let Some(delivery_annotations) = message.delivery_annotations { if let Some(delivery_annotations) = message.delivery_annotations {
amqp_message_builder =
amqp_message_builder.with_delivery_annotations(delivery_annotations.0.into()); amqp_message_builder.with_delivery_annotations(delivery_annotations.0.into());
} }
if let Some(message_annotations) = message.message_annotations { if let Some(message_annotations) = message.message_annotations {
amqp_message_builder =
amqp_message_builder.with_message_annotations(message_annotations.0.into()); amqp_message_builder.with_message_annotations(message_annotations.0.into());
} }
if let Some(footer) = message.footer { if let Some(footer) = message.footer {
amqp_message_builder = amqp_message_builder.with_footer(footer.0.into()); amqp_message_builder.with_footer(footer.0.into());
} }
amqp_message_builder.build() amqp_message_builder.build()
@ -173,34 +170,33 @@ impl
fe2o3_amqp_types::messaging::Body<fe2o3_amqp_types::messaging::message::EmptyBody>, fe2o3_amqp_types::messaging::Body<fe2o3_amqp_types::messaging::message::EmptyBody>,
>, >,
) -> Self { ) -> Self {
let mut amqp_message_builder = AmqpMessage::builder().with_body(AmqpMessageBody::Empty); let mut amqp_message_builder = AmqpMessage::builder();
amqp_message_builder.with_body(AmqpMessageBody::Empty);
if let Some(application_properties) = message.application_properties { if let Some(application_properties) = message.application_properties {
amqp_message_builder =
amqp_message_builder.with_application_properties(application_properties.into()); amqp_message_builder.with_application_properties(application_properties.into());
} }
if let Some(header) = message.header { if let Some(header) = message.header {
amqp_message_builder = amqp_message_builder.with_header(header.into()); amqp_message_builder.with_header(header.into());
} }
if let Some(properties) = message.properties { if let Some(properties) = message.properties {
info!("Converting properties to AmqpMessageProperties"); info!("Converting properties to AmqpMessageProperties");
amqp_message_builder = amqp_message_builder.with_properties(properties); amqp_message_builder.with_properties(properties);
} }
if let Some(delivery_annotations) = message.delivery_annotations { if let Some(delivery_annotations) = message.delivery_annotations {
amqp_message_builder =
amqp_message_builder.with_delivery_annotations(delivery_annotations.0.into()); amqp_message_builder.with_delivery_annotations(delivery_annotations.0.into());
} }
if let Some(message_annotations) = message.message_annotations { if let Some(message_annotations) = message.message_annotations {
amqp_message_builder =
amqp_message_builder.with_message_annotations(message_annotations.0.into()); amqp_message_builder.with_message_annotations(message_annotations.0.into());
} }
if let Some(footer) = message.footer { if let Some(footer) = message.footer {
amqp_message_builder = amqp_message_builder.with_footer(footer.0.into()); amqp_message_builder.with_footer(footer.0.into());
} }
amqp_message_builder.build() amqp_message_builder.build()
@ -488,7 +484,7 @@ mod tests {
.with_delivery_count(95) .with_delivery_count(95)
.with_first_acquirer(true) .with_first_acquirer(true)
.with_durable(true) .with_durable(true)
.with_time_to_live(std::time::Duration::from_millis(1000)) .with_time_to_live(Some(std::time::Duration::from_millis(1000)))
.with_priority(3) .with_priority(3)
.build(), .build(),
) )

Просмотреть файл

@ -185,6 +185,13 @@ impl From<AmqpValue> for fe2o3_amqp_types::primitives::Value {
AmqpValue::Array(a) => fe2o3_amqp_types::primitives::Value::Array( AmqpValue::Array(a) => fe2o3_amqp_types::primitives::Value::Array(
a.into_iter().map(|v| v.into()).collect(), a.into_iter().map(|v| v.into()).collect(),
), ),
// An AMQP Composite type is essentially a Described type with a specific descriptor which
// indicates which AMQP performative it is.
//
// Iron Oxide does not directly support Composite types (they're handled via macros), so when a C++
// component attempts to convert an AMQP Composite type to Iron Oxide, we convert it to a Described type
#[cfg(feature = "cplusplus")]
AmqpValue::Composite(d) => fe2o3_amqp_types::primitives::Value::Described(Box::new( AmqpValue::Composite(d) => fe2o3_amqp_types::primitives::Value::Described(Box::new(
serde_amqp::described::Described { serde_amqp::described::Described {
descriptor: d.descriptor.clone().into(), descriptor: d.descriptor.clone().into(),
@ -356,7 +363,12 @@ impl PartialEq<AmqpValue> for fe2o3_amqp_types::primitives::Value {
_ => false, _ => false,
}, },
AmqpValue::Composite(_) => panic!("Composite values are not supported in Fe2o3"), // An AMQP Composite type is essentially a Described type with a specific descriptor which
// indicates which AMQP performative it is.
//
// Iron Oxide does not directly support Composite types (they're handled via macros), so we always return false.
#[cfg(feature = "cplusplus")]
AmqpValue::Composite(_) => false,
AmqpValue::Unknown => todo!(), AmqpValue::Unknown => todo!(),
} }

Просмотреть файл

@ -43,12 +43,14 @@ pub enum ReceiverSettleMode {
Second = AMQP_RECEIVER_SETTLE_MODE_SECOND, Second = AMQP_RECEIVER_SETTLE_MODE_SECOND,
} }
#[cfg(feature = "cplusplus")]
pub trait Serializable { pub trait Serializable {
fn serialize(&self, buffer: &mut [u8]) -> azure_core::Result<()>; fn serialize(&self, buffer: &mut [u8]) -> azure_core::Result<()>;
fn encoded_size(&self) -> usize; fn encoded_size(&self) -> usize;
} }
#[cfg(feature = "cplusplus")]
pub trait Deserializable<T> { pub trait Deserializable<T> {
fn decode(data: &[u8]) -> azure_core::Result<T>; fn decode(data: &[u8]) -> azure_core::Result<T>;
} }

Просмотреть файл

@ -3,6 +3,10 @@
//cspell: words amqp SMALLUINT SMALLULONG //cspell: words amqp SMALLUINT SMALLULONG
use super::value::{AmqpList, AmqpOrderedMap, AmqpSymbol, AmqpTimestamp, AmqpValue}; use super::value::{AmqpList, AmqpOrderedMap, AmqpSymbol, AmqpTimestamp, AmqpValue};
#[cfg(feature = "cplusplus")]
use crate::Deserializable;
#[cfg(feature = "cplusplus")]
use azure_core::error::ErrorKind;
use azure_core::error::Result; use azure_core::error::Result;
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
@ -224,6 +228,7 @@ impl From<String> for AmqpTarget {
} }
} }
#[cfg(feature = "cplusplus")]
impl From<AmqpList> for AmqpTarget { impl From<AmqpList> for AmqpTarget {
fn from(list: AmqpList) -> Self { fn from(list: AmqpList) -> Self {
let mut builder = AmqpTarget::builder(); let mut builder = AmqpTarget::builder();
@ -285,6 +290,7 @@ impl From<AmqpList> for AmqpTarget {
} }
} }
#[cfg(feature = "cplusplus")]
impl From<AmqpTarget> for AmqpList { impl From<AmqpTarget> for AmqpList {
fn from(target: AmqpTarget) -> Self { fn from(target: AmqpTarget) -> Self {
let mut list = vec![AmqpValue::Null; 7]; let mut list = vec![AmqpValue::Null; 7];
@ -368,6 +374,7 @@ impl AmqpSource {
} }
} }
#[cfg(feature = "cplusplus")]
impl From<AmqpList> for AmqpSource { impl From<AmqpList> for AmqpSource {
fn from(list: AmqpList) -> Self { fn from(list: AmqpList) -> Self {
let mut builder = AmqpSource::builder(); let mut builder = AmqpSource::builder();
@ -453,6 +460,7 @@ impl From<AmqpList> for AmqpSource {
} }
} }
#[cfg(feature = "cplusplus")]
impl From<AmqpSource> for AmqpList { impl From<AmqpSource> for AmqpList {
fn from(source: AmqpSource) -> Self { fn from(source: AmqpSource) -> Self {
let mut list = vec![AmqpValue::Null; 11]; let mut list = vec![AmqpValue::Null; 11];
@ -507,13 +515,25 @@ impl From<AmqpSource> for AmqpList {
} }
} }
#[derive(Debug, Clone, PartialEq, Default)] #[derive(Debug, Clone, PartialEq)]
pub struct AmqpMessageHeader { pub struct AmqpMessageHeader {
durable: Option<bool>, durable: bool,
priority: Option<u8>, priority: u8,
time_to_live: Option<std::time::Duration>, time_to_live: Option<std::time::Duration>,
first_acquirer: Option<bool>, first_acquirer: bool,
delivery_count: Option<u32>, delivery_count: u32,
}
impl Default for AmqpMessageHeader {
fn default() -> Self {
Self {
durable: false,
priority: 4,
time_to_live: None,
first_acquirer: false,
delivery_count: 0,
}
}
} }
impl AmqpMessageHeader { impl AmqpMessageHeader {
@ -521,24 +541,24 @@ impl AmqpMessageHeader {
builders::AmqpMessageHeaderBuilder::new() builders::AmqpMessageHeaderBuilder::new()
} }
pub fn durable(&self) -> Option<&bool> { pub fn durable(&self) -> bool {
self.durable.as_ref() self.durable
} }
pub fn priority(&self) -> Option<&u8> { pub fn priority(&self) -> u8 {
self.priority.as_ref() self.priority
} }
pub fn time_to_live(&self) -> Option<&std::time::Duration> { pub fn time_to_live(&self) -> Option<&std::time::Duration> {
self.time_to_live.as_ref() self.time_to_live.as_ref()
} }
pub fn first_acquirer(&self) -> Option<&bool> { pub fn first_acquirer(&self) -> bool {
self.first_acquirer.as_ref() self.first_acquirer
} }
pub fn delivery_count(&self) -> Option<&u32> { pub fn delivery_count(&self) -> u32 {
self.delivery_count.as_ref() self.delivery_count
} }
} }
@ -550,6 +570,7 @@ impl AmqpMessageHeader {
/// See also [Amqp Header](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-header) for more information /// See also [Amqp Header](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-header) for more information
/// ///
/// ///
#[cfg(feature = "cplusplus")]
impl From<AmqpList> for AmqpMessageHeader { impl From<AmqpList> for AmqpMessageHeader {
fn from(list: AmqpList) -> Self { fn from(list: AmqpList) -> Self {
let mut builder = AmqpMessageHeader::builder(); let mut builder = AmqpMessageHeader::builder();
@ -566,7 +587,9 @@ impl From<AmqpList> for AmqpMessageHeader {
} }
if field_count >= 3 { if field_count >= 3 {
if let Some(AmqpValue::UInt(time_to_live)) = list.0.get(2) { if let Some(AmqpValue::UInt(time_to_live)) = list.0.get(2) {
builder.with_time_to_live(std::time::Duration::from_millis(*time_to_live as u64)); builder.with_time_to_live(Some(std::time::Duration::from_millis(
*time_to_live as u64,
)));
} }
} }
if field_count >= 4 { if field_count >= 4 {
@ -583,6 +606,7 @@ impl From<AmqpList> for AmqpMessageHeader {
} }
} }
#[cfg(feature = "cplusplus")]
impl From<AmqpMessageHeader> for AmqpList { impl From<AmqpMessageHeader> for AmqpList {
fn from(header: AmqpMessageHeader) -> AmqpList { fn from(header: AmqpMessageHeader) -> AmqpList {
let mut list = vec![AmqpValue::Null; 5]; let mut list = vec![AmqpValue::Null; 5];
@ -590,17 +614,21 @@ impl From<AmqpMessageHeader> for AmqpList {
// Serialize the current value, if it exists. Otherwise serialize a null // Serialize the current value, if it exists. Otherwise serialize a null
// value if there are other values to serialize. // value if there are other values to serialize.
list[0] = header.durable.map_or(AmqpValue::Null, AmqpValue::Boolean); if header.durable {
list[1] = header.priority.map_or(AmqpValue::Null, AmqpValue::UByte); list[0] = AmqpValue::Boolean(header.durable())
};
if header.priority != 4 {
list[1] = AmqpValue::UByte(header.priority)
};
list[2] = header.time_to_live.map_or(AmqpValue::Null, |ttl| { list[2] = header.time_to_live.map_or(AmqpValue::Null, |ttl| {
AmqpValue::UInt(ttl.as_millis() as u32) AmqpValue::UInt(ttl.as_millis() as u32)
}); });
list[3] = header if header.first_acquirer {
.first_acquirer list[3] = AmqpValue::Boolean(header.first_acquirer)
.map_or(AmqpValue::Null, AmqpValue::Boolean); };
list[4] = header if header.delivery_count != 0 {
.delivery_count list[4] = AmqpValue::UInt(header.delivery_count)
.map_or(AmqpValue::Null, AmqpValue::UInt); };
let mut trailing_nulls = 0; let mut trailing_nulls = 0;
for val in list.iter().rev() { for val in list.iter().rev() {
@ -701,6 +729,7 @@ impl AmqpMessageProperties {
/// See also [Amqp Header](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties) for more information /// See also [Amqp Header](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties) for more information
/// ///
/// ///
#[cfg(feature = "cplusplus")]
impl From<AmqpList> for AmqpMessageProperties { impl From<AmqpList> for AmqpMessageProperties {
fn from(list: AmqpList) -> Self { fn from(list: AmqpList) -> Self {
let mut builder = AmqpMessageProperties::builder(); let mut builder = AmqpMessageProperties::builder();
@ -799,6 +828,7 @@ impl From<AmqpList> for AmqpMessageProperties {
} }
} }
#[cfg(feature = "cplusplus")]
impl From<AmqpMessageProperties> for AmqpList { impl From<AmqpMessageProperties> for AmqpList {
fn from(properties: AmqpMessageProperties) -> AmqpList { fn from(properties: AmqpMessageProperties) -> AmqpList {
let mut list = vec![AmqpValue::Null; 13]; let mut list = vec![AmqpValue::Null; 13];
@ -1162,6 +1192,23 @@ impl From<AmqpList> for AmqpMessage {
} }
} }
} }
#[cfg(feature = "cplusplus")]
impl Deserializable<AmqpMessage> for AmqpMessage {
fn decode(data: &[u8]) -> azure_core::Result<AmqpMessage> {
#[cfg(all(feature = "fe2o3-amqp", not(target_arch = "wasm32")))]
{
let value = serde_amqp::de::from_slice::<
fe2o3_amqp_types::messaging::message::__private::Deserializable<
fe2o3_amqp_types::messaging::Message<
fe2o3_amqp_types::messaging::Body<fe2o3_amqp_types::primitives::Value>,
>,
>,
>(data)
.map_err(|e| azure_core::error::Error::new(ErrorKind::Other, e))?;
Ok(value.0.into())
}
}
}
pub mod builders { pub mod builders {
use super::*; use super::*;
@ -1305,26 +1352,26 @@ pub mod builders {
} }
} }
pub fn with_durable(&mut self, durable: bool) -> &mut Self { pub fn with_durable(&mut self, durable: bool) -> &mut Self {
self.header.durable = Some(durable); self.header.durable = durable;
self self
} }
pub fn with_priority(&mut self, priority: u8) -> &mut Self { pub fn with_priority(&mut self, priority: u8) -> &mut Self {
self.header.priority = Some(priority); self.header.priority = priority;
self self
} }
pub fn with_time_to_live( pub fn with_time_to_live(
&mut self, &mut self,
time_to_live: impl Into<std::time::Duration>, time_to_live: Option<std::time::Duration>,
) -> &mut Self { ) -> &mut Self {
self.header.time_to_live = Some(time_to_live.into()); self.header.time_to_live = time_to_live;
self self
} }
pub fn with_first_acquirer(&mut self, first_acquirer: bool) -> &mut Self { pub fn with_first_acquirer(&mut self, first_acquirer: bool) -> &mut Self {
self.header.first_acquirer = Some(first_acquirer); self.header.first_acquirer = first_acquirer;
self self
} }
pub fn with_delivery_count(&mut self, delivery_count: u32) -> &mut Self { pub fn with_delivery_count(&mut self, delivery_count: u32) -> &mut Self {
self.header.delivery_count = Some(delivery_count); self.header.delivery_count = delivery_count;
self self
} }
} }
@ -1413,34 +1460,34 @@ pub mod builders {
} }
impl AmqpMessageBuilder { impl AmqpMessageBuilder {
pub fn build(self) -> AmqpMessage { pub fn build(&mut self) -> AmqpMessage {
self.message self.message.clone()
} }
pub(super) fn new() -> AmqpMessageBuilder { pub(super) fn new() -> AmqpMessageBuilder {
AmqpMessageBuilder { AmqpMessageBuilder {
message: Default::default(), message: Default::default(),
} }
} }
pub fn with_body(mut self, body: impl Into<AmqpMessageBody>) -> Self { pub fn with_body(&mut self, body: impl Into<AmqpMessageBody>) -> &mut Self {
self.message.body = body.into(); self.message.body = body.into();
self self
} }
pub fn with_header(mut self, header: AmqpMessageHeader) -> Self { pub fn with_header(&mut self, header: AmqpMessageHeader) -> &mut Self {
self.message.header = Some(header); self.message.header = Some(header);
self self
} }
pub fn with_application_properties( pub fn with_application_properties(
mut self, &mut self,
application_properties: AmqpApplicationProperties, application_properties: AmqpApplicationProperties,
) -> Self { ) -> &mut Self {
self.message.application_properties = Some(application_properties); self.message.application_properties = Some(application_properties);
self self
} }
pub fn add_application_property( pub fn add_application_property(
mut self, &mut self,
key: impl Into<String>, key: impl Into<String>,
value: impl Into<AmqpValue>, value: impl Into<AmqpValue>,
) -> Self { ) -> &mut Self {
if let Some(application_properties) = &mut self.message.application_properties { if let Some(application_properties) = &mut self.message.application_properties {
application_properties.0.insert(key.into(), value.into()); application_properties.0.insert(key.into(), value.into());
} else { } else {
@ -1451,22 +1498,28 @@ pub mod builders {
} }
self self
} }
pub fn with_message_annotations(mut self, message_annotations: AmqpAnnotations) -> Self { pub fn with_message_annotations(
&mut self,
message_annotations: AmqpAnnotations,
) -> &mut Self {
self.message.message_annotations = Some(message_annotations); self.message.message_annotations = Some(message_annotations);
self self
} }
pub fn with_delivery_annotations(mut self, delivery_annotations: AmqpAnnotations) -> Self { pub fn with_delivery_annotations(
&mut self,
delivery_annotations: AmqpAnnotations,
) -> &mut Self {
self.message.delivery_annotations = Some(delivery_annotations); self.message.delivery_annotations = Some(delivery_annotations);
self self
} }
pub fn with_properties<T>(mut self, properties: T) -> Self pub fn with_properties<T>(&mut self, properties: T) -> &mut Self
where where
T: Into<AmqpMessageProperties>, T: Into<AmqpMessageProperties>,
{ {
self.message.properties = Some(properties.into()); self.message.properties = Some(properties.into());
self self
} }
pub fn with_footer(mut self, footer: AmqpAnnotations) -> Self { pub fn with_footer(&mut self, footer: AmqpAnnotations) -> &mut Self {
self.message.footer = Some(footer); self.message.footer = Some(footer);
self self
} }
@ -1475,30 +1528,53 @@ pub mod builders {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::time::SystemTime;
use uuid::Uuid;
use super::*; use super::*;
use fe2o3_amqp_types::messaging::Priority;
use std::time::SystemTime;
use uuid::Uuid;
#[test] #[test]
fn test_amqp_message_header_builder() { fn test_amqp_message_header_builder() {
let header = AmqpMessageHeader::builder() let header = AmqpMessageHeader::builder()
.with_durable(true) .with_durable(true)
.with_priority(5) .with_priority(5)
.with_time_to_live(std::time::Duration::from_millis(1000)) .with_time_to_live(Some(std::time::Duration::from_millis(1000)))
.with_first_acquirer(false) .with_first_acquirer(false)
.with_delivery_count(3) .with_delivery_count(3)
.build(); .build();
assert_eq!(header.durable, Some(true)); assert_eq!(header.durable, true);
assert_eq!(header.priority, Some(5)); assert_eq!(header.priority, 5);
assert_eq!( assert_eq!(
header.time_to_live, header.time_to_live,
Some(std::time::Duration::from_millis(1000)) Some(std::time::Duration::from_millis(1000))
); );
assert_eq!(header.first_acquirer, Some(false)); assert_eq!(header.first_acquirer, false);
assert_eq!(header.delivery_count, Some(3)); assert_eq!(header.delivery_count, 3);
}
#[test]
fn test_header_serialization() {
{
let c_serialized = vec![0x00, 0x53, 0x70, 0xc0, 0x04, 0x02, 0x40, 0x50, 0x05];
let deserialized_from_c: fe2o3_amqp_types::messaging::Header =
serde_amqp::de::from_slice(&c_serialized.as_slice()).unwrap();
let header = fe2o3_amqp_types::messaging::Header::builder()
.priority(Priority::from(5))
.build();
let serialized = serde_amqp::ser::to_vec(&header).unwrap();
assert_eq!(c_serialized, serialized);
let deserialized: fe2o3_amqp_types::messaging::Header =
serde_amqp::de::from_slice(&serialized.as_slice()).unwrap();
assert_eq!(c_serialized, serialized);
assert_eq!(header, deserialized);
assert_eq!(header, deserialized_from_c);
}
// assert_eq!(header, deserialized);
} }
#[test] #[test]
@ -1717,7 +1793,7 @@ mod tests {
let message = AmqpMessage::builder() let message = AmqpMessage::builder()
.with_header( .with_header(
AmqpMessageHeader::builder() AmqpMessageHeader::builder()
.with_time_to_live(std::time::Duration::from_millis(23)) .with_time_to_live(Some(std::time::Duration::from_millis(23)))
.build(), .build(),
) )
.build(); .build();
@ -1887,4 +1963,75 @@ mod tests {
); );
} }
} }
#[test]
fn test_message_with_header_serialization() {
let message = AmqpMessage::builder()
.with_header(AmqpMessageHeader::builder().with_priority(5).build())
.with_body(AmqpValue::from("String Value Body."))
.with_properties(
AmqpMessageProperties::builder()
.with_message_id("12345")
.build(),
)
.build();
let serialized = AmqpMessage::serialize(message.clone()).unwrap();
assert_eq!(
serialized,
vec![
0x00, 0x53, 0x70, 0xc0, 0x04, 0x02, 0x40, 0x50, 0x05, 0x00, 0x53, 0x73, 0xc0, 0x08,
0x01, 0xa1, 0x05, 0x31, 0x32, 0x33, 0x34, 0x35, 0x00, 0x53, 0x77, 0xa1, 0x12, 0x53,
0x74, 0x72, 0x69, 0x6e, 0x67, 0x20, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x20, 0x42, 0x6f,
0x64, 0x79, 0x2e
]
);
#[cfg(all(feature = "fe2o3-amqp", not(target_arch = "wasm32")))]
{
let amqp_message = fe2o3_amqp_types::messaging::message::Builder::new()
.header(
fe2o3_amqp_types::messaging::Header::builder()
.priority(5)
.build(),
)
.properties(
fe2o3_amqp_types::messaging::Properties::builder()
.message_id("12345".to_string())
.build(),
)
.body(fe2o3_amqp_types::messaging::Body::Value::<
fe2o3_amqp_types::primitives::Value,
>(fe2o3_amqp_types::messaging::AmqpValue(
fe2o3_amqp_types::primitives::Value::String("String Value Body.".to_string()),
)))
.build();
let serialized_fe2o3 = serde_amqp::ser::to_vec(
&fe2o3_amqp_types::messaging::message::__private::Serializable(
amqp_message.clone(),
),
)
.unwrap();
assert_eq!(serialized, serialized_fe2o3);
// Now deserialize the message and verify that it matches the original.
let value = serde_amqp::de::from_slice::<
fe2o3_amqp_types::messaging::message::__private::Deserializable<
fe2o3_amqp_types::messaging::Message<
fe2o3_amqp_types::messaging::Body<fe2o3_amqp_types::primitives::Value>,
>,
>,
>(serialized_fe2o3.as_slice())
.unwrap();
assert_eq!(amqp_message, value.0);
}
#[cfg(feature = "cplusplus")]
{
let deserialized = AmqpMessage::decode(&serialized).unwrap();
assert_eq!(deserialized, message);
}
}
} }

Просмотреть файл

@ -2,8 +2,10 @@
// Licensed under the MIT license. // Licensed under the MIT license.
// cspell: words amqp // cspell: words amqp
#[cfg(feature = "cplusplus")]
use azure_core::Result; use azure_core::Result;
#[cfg(feature = "cplusplus")]
use crate::{Deserializable, Serializable}; use crate::{Deserializable, Serializable};
#[derive(Debug, PartialEq, Clone, Default, Eq)] #[derive(Debug, PartialEq, Clone, Default, Eq)]
@ -176,11 +178,13 @@ pub enum AmqpValue {
List(AmqpList), List(AmqpList),
Map(AmqpOrderedMap<AmqpValue, AmqpValue>), Map(AmqpOrderedMap<AmqpValue, AmqpValue>),
Array(Vec<AmqpValue>), Array(Vec<AmqpValue>),
Composite(Box<AmqpComposite>),
Described(Box<AmqpDescribed>), Described(Box<AmqpDescribed>),
#[cfg(feature = "cplusplus")]
Composite(Box<AmqpComposite>),
Unknown, Unknown,
} }
#[cfg(feature = "cplusplus")]
impl Serializable for AmqpValue { impl Serializable for AmqpValue {
fn encoded_size(&self) -> usize { fn encoded_size(&self) -> usize {
#[cfg(all(feature = "fe2o3-amqp", not(target_arch = "wasm32")))] #[cfg(all(feature = "fe2o3-amqp", not(target_arch = "wasm32")))]
@ -213,6 +217,7 @@ impl Serializable for AmqpValue {
} }
} }
#[cfg(feature = "cplusplus")]
impl Deserializable<AmqpValue> for AmqpValue { impl Deserializable<AmqpValue> for AmqpValue {
#[allow(unused_variables)] #[allow(unused_variables)]
fn decode(data: &[u8]) -> azure_core::Result<AmqpValue> { fn decode(data: &[u8]) -> azure_core::Result<AmqpValue> {

Просмотреть файл

@ -387,16 +387,15 @@ pub mod models {
if let Some(message_id) = event_data.message_id { if let Some(message_id) = event_data.message_id {
message_properties_builder.with_message_id(message_id); message_properties_builder.with_message_id(message_id);
} }
message_builder =
message_builder.with_properties(message_properties_builder.build()); message_builder.with_properties(message_properties_builder.build());
} }
if let Some(properties) = event_data.properties { if let Some(properties) = event_data.properties {
for (key, value) in properties { for (key, value) in properties {
message_builder = message_builder.add_application_property(key, value); message_builder.add_application_property(key, value);
} }
} }
if let Some(event_body) = event_data.body { if let Some(event_body) = event_data.body {
message_builder =
message_builder.with_body(AmqpMessageBody::Binary(vec![event_body.to_vec()])); message_builder.with_body(AmqpMessageBody::Binary(vec![event_body.to_vec()]));
} }
message_builder.build() message_builder.build()

Просмотреть файл

@ -310,25 +310,24 @@ impl<'a> EventDataBatch<'a> {
let mut batch_builder = AmqpMessage::builder(); let mut batch_builder = AmqpMessage::builder();
if message.header().is_some() { if message.header().is_some() {
batch_builder = batch_builder.with_header(message.header().unwrap().clone()); batch_builder.with_header(message.header().unwrap().clone());
} }
if message.properties().is_some() { if message.properties().is_some() {
batch_builder = batch_builder.with_properties(message.properties().unwrap().clone()); batch_builder.with_properties(message.properties().unwrap().clone());
} }
if message.application_properties().is_some() { if message.application_properties().is_some() {
batch_builder = batch_builder batch_builder
.with_application_properties(message.application_properties().unwrap().clone()); .with_application_properties(message.application_properties().unwrap().clone());
} }
if message.delivery_annotations().is_some() { if message.delivery_annotations().is_some() {
batch_builder = batch_builder batch_builder
.with_delivery_annotations(message.delivery_annotations().unwrap().clone()); .with_delivery_annotations(message.delivery_annotations().unwrap().clone());
} }
if message.message_annotations().is_some() { if message.message_annotations().is_some() {
batch_builder = batch_builder batch_builder.with_message_annotations(message.message_annotations().unwrap().clone());
.with_message_annotations(message.message_annotations().unwrap().clone());
} }
if message.footer().is_some() { if message.footer().is_some() {
batch_builder = batch_builder.with_footer(message.footer().unwrap().clone()); batch_builder.with_footer(message.footer().unwrap().clone());
} }
batch_builder.build() batch_builder.build()