implement send/read messages
This commit is contained in:
Родитель
2ba3616bc7
Коммит
1bdf2d45ef
|
@ -0,0 +1,124 @@
|
|||
#-------------------------------------------------------------------------
|
||||
# Copyright 2012 Microsoft Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#--------------------------------------------------------------------------
|
||||
module Azure
|
||||
module ServiceBus
|
||||
class BrokeredMessage
|
||||
|
||||
# Please read http://msdn.microsoft.com/en-us/library/windowsazure/hh780742
|
||||
# for more information on brokered message properties
|
||||
|
||||
# Public: Get/Set the ContentType of the message
|
||||
#
|
||||
# Returns a String
|
||||
attr_accessor :content_type
|
||||
|
||||
# Public: Get/Set the CorrelationID of the message
|
||||
#
|
||||
# Returns a String
|
||||
attr_accessor :correlation_id
|
||||
|
||||
# Public: Get/Set the SessionID of the message
|
||||
#
|
||||
# Returns a String
|
||||
attr_accessor :session_id
|
||||
|
||||
# Public: Get/Set the DeliveryCount of the message
|
||||
#
|
||||
# Returns an Integer
|
||||
attr_accessor :delivery_count
|
||||
|
||||
# Public: Get/Set the LockedUntilUtc for the message
|
||||
#
|
||||
# Returns a DateTime
|
||||
attr_accessor :locked_until_utc
|
||||
|
||||
# Public: Get/Set the LockToken of the message
|
||||
#
|
||||
# Returns a String (GUID)
|
||||
attr_accessor :lock_token
|
||||
|
||||
# Public: Get/Set the MessageID of the message
|
||||
#
|
||||
# Returns a String
|
||||
attr_accessor :message_id
|
||||
alias_method :id, :message_id
|
||||
|
||||
# Public: Get/Set the Label for the message
|
||||
#
|
||||
# Returns a String
|
||||
attr_accessor :label
|
||||
|
||||
# Public: Get/Set the ReplyTo for the message
|
||||
#
|
||||
# Returns a String
|
||||
attr_accessor :reply_to
|
||||
|
||||
# Public: Get/Set the EnqueuedTimeUtc for the message
|
||||
#
|
||||
# Returns a DateTime
|
||||
attr_accessor :enqueued_time_utc
|
||||
|
||||
# Public: Get/Set the SequenceNumber for the message
|
||||
#
|
||||
# Returns an Integer
|
||||
attr_accessor :sequence_number
|
||||
|
||||
# Public: Get/Set the TimeToLive for the message
|
||||
#
|
||||
# Returns an Integer
|
||||
attr_accessor :time_to_live
|
||||
|
||||
# Public: Get/Set the To field for the message
|
||||
#
|
||||
# Returns a String
|
||||
attr_accessor :to
|
||||
|
||||
# Public: Get/Set the ScheduledEnqueueTimeUtc for the message
|
||||
#
|
||||
# Returns a DateTime
|
||||
attr_accessor :scheduled_enqueue_time_utc
|
||||
|
||||
# Public: Get/Set the ReplyToSessionId for the message
|
||||
#
|
||||
# Returns a String
|
||||
attr_accessor :reply_to_session_id
|
||||
|
||||
# Public: Get/Set custom key-value properties of the message
|
||||
#
|
||||
# Returns a Hash
|
||||
attr_accessor :properties
|
||||
|
||||
# Public: Get/Set the body of the message
|
||||
#
|
||||
# Returns a String
|
||||
attr_accessor :body
|
||||
|
||||
# Public: Get/Set the URI of the locked message. This URI is needed to unlock or delete the message
|
||||
#
|
||||
# Returns an URI
|
||||
attr_accessor :location
|
||||
|
||||
# Public: Constructor.
|
||||
#
|
||||
# body - String. The body of the message
|
||||
# properties - Hash. The properties of the message (optional)
|
||||
def initialize(body, properties={})
|
||||
@body = body
|
||||
@properties = properties
|
||||
yield self if block_given?
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,128 @@
|
|||
#-------------------------------------------------------------------------
|
||||
# Copyright 2012 Microsoft Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#--------------------------------------------------------------------------
|
||||
require "rubygems"
|
||||
require "json"
|
||||
require "time"
|
||||
require "uri"
|
||||
require "extlib_lite"
|
||||
|
||||
require "azure/service_bus/brokered_message"
|
||||
|
||||
module Azure
|
||||
module ServiceBus
|
||||
class BrokeredMessageSerializer
|
||||
|
||||
PROPERTIES = [
|
||||
'ContentType',
|
||||
'CorrelationId',
|
||||
'SessionID',
|
||||
'DeliveryCount',
|
||||
'LockedUntilUtc',
|
||||
'LockToken',
|
||||
'MessageId',
|
||||
'Label',
|
||||
'ReplyTo',
|
||||
'EnqueuedTimeUtc',
|
||||
'SequenceNumber',
|
||||
'TimeToLive',
|
||||
'To',
|
||||
'ScheduledEnqueueTimeUtc',
|
||||
'ReplyToSessionId'
|
||||
].freeze
|
||||
|
||||
attr :message
|
||||
|
||||
def initialize(msg)
|
||||
@message = msg
|
||||
end
|
||||
|
||||
def self.get_from_http_response(response)
|
||||
props = JSON.parse(response.headers['brokerproperties'])
|
||||
BrokeredMessage.new(response.body) do |m|
|
||||
m.location = URI(response.headers['location']) unless response.headers['location'].nil?
|
||||
m.content_type = response.headers['content-type']
|
||||
|
||||
# String based properties
|
||||
m.lock_token = props['LockToken']
|
||||
m.message_id = props['MessageId']
|
||||
m.label = props['Label']
|
||||
m.to = props['To']
|
||||
m.session_id = props['SessionID']
|
||||
m.correlation_id = props['CorrelationId']
|
||||
m.reply_to = props['ReplyTo']
|
||||
m.reply_to = props['ReplyTo']
|
||||
m.reply_to_session_id = props['ReplyToSessionId']
|
||||
|
||||
# Time based properties
|
||||
m.locked_until_utc = Time.parse(props['LockedUntilUtc']) unless props['LockedUntilUtc'].nil?
|
||||
m.enqueued_time_utc = Time.parse(props['EnqueuedTimeUtc']) unless props['EnqueuedTimeUtc'].nil?
|
||||
m.scheduled_enqueue_time_utc = Time.parse(props['ScheduledEnqueueTimeUtc']) unless props['ScheduledEnqueueTimeUtc'].nil?
|
||||
|
||||
# Numeric based properties
|
||||
m.delivery_count = props['DeliveryCount'].to_i
|
||||
m.sequence_number = props['SequenceNumber'].to_i
|
||||
m.time_to_live = props['TimeToLive'].to_f
|
||||
|
||||
# Custom Properties
|
||||
header_names_black_list = [
|
||||
'brokerproperties',
|
||||
'date',
|
||||
'transfer-encoding',
|
||||
'location',
|
||||
'server',
|
||||
'connection',
|
||||
'content-type'
|
||||
]
|
||||
props = response.headers.reject {|k,_| header_names_black_list.include?(k.downcase) }
|
||||
props.each do |prop_name, value|
|
||||
m.properties[prop_name] = value.gsub(/"/, '')
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Serialize the message's attributes to JSON
|
||||
#
|
||||
# Returns a JSON String
|
||||
def to_json
|
||||
hash = {}
|
||||
PROPERTIES.each do |p|
|
||||
attr_name = p.underscore
|
||||
value = @message.send(attr_name)
|
||||
hash[p] = value unless value.nil?
|
||||
end
|
||||
hash.to_json
|
||||
end
|
||||
|
||||
# Build a hash based on message properties and ensure
|
||||
# the values are in a valid format for HTTP headers
|
||||
#
|
||||
# Returns a Hash
|
||||
def get_property_headers
|
||||
hash = {}
|
||||
@message.properties.each do |name, value|
|
||||
val = value
|
||||
# Check for an RFC2626 Date
|
||||
begin
|
||||
val = '"' + Time.parse(value).httpdate + '"'
|
||||
rescue ArgumentError
|
||||
val = '"' + value + '"' if value.is_a? String
|
||||
end
|
||||
hash[name] = val
|
||||
end
|
||||
hash
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -16,6 +16,8 @@ require 'azure/core/signed_service'
|
|||
require 'azure/service_bus/auth/wrap_signer'
|
||||
require 'azure/service_bus/serialization'
|
||||
|
||||
require 'azure/service_bus/brokered_message_serializer'
|
||||
|
||||
module Azure
|
||||
module ServiceBus
|
||||
class ServiceBus < Azure::Core::SignedService
|
||||
|
@ -24,6 +26,14 @@ module Azure
|
|||
super(Azure::ServiceBus::Auth::WrapSigner.new)
|
||||
@default_timeout = 90
|
||||
@host = host
|
||||
|
||||
with_filter do |req, res|
|
||||
req.headers.delete "x-ms-date"
|
||||
req.headers.delete "x-ms-version"
|
||||
req.headers.delete "DataServiceVersion"
|
||||
req.headers.delete "MaxDataServiceVersion"
|
||||
res.call
|
||||
end
|
||||
end
|
||||
|
||||
# Creates a new queue. Once created, this queue's resource manifest is immutable.
|
||||
|
@ -188,18 +198,36 @@ module Azure
|
|||
resource_list(:subscription, topic).each { |s| s.topic = topic }
|
||||
end
|
||||
|
||||
def send_topic_message(topic_name, message=nil) end
|
||||
#
|
||||
# Enqueues a message into the specified topic. The limit to the number of messages
|
||||
# which may be present in the topic is governed by the message size in MaxTopicSizeInBytes.
|
||||
# If this message causes the topic to exceed its quota, a quota exceeded error is
|
||||
# returned and the message will be rejected.
|
||||
#
|
||||
# topic_name: name of the topic.
|
||||
# message: the Message object containing message body and properties.
|
||||
#
|
||||
# topic: Either a Azure::ServiceBus::Topic instance or a string of the topic name
|
||||
# message: An Azure::ServiceBus::BrokeredMessage object containing message body and properties.
|
||||
def send_topic_message(topic, message)
|
||||
topic = _name_for(topic)
|
||||
|
||||
serializer = BrokeredMessageSerializer.new(message)
|
||||
broker_properties = serializer.to_json
|
||||
message_properties = serializer.get_property_headers
|
||||
|
||||
content_type = message.content_type || 'text/plain'
|
||||
|
||||
headers = {
|
||||
'BrokerProperties'=> broker_properties
|
||||
}
|
||||
|
||||
message_properties.each do |k,v|
|
||||
headers[k.to_s] = v
|
||||
end
|
||||
|
||||
headers["Content-Type"] = content_type
|
||||
|
||||
response = call(:post, messages_uri(topic), message.body, headers)
|
||||
response.success?
|
||||
end
|
||||
|
||||
def peek_lock_subscription_message(topic_name, subscription_name, timeout='60') end
|
||||
#
|
||||
# This operation is used to atomically retrieve and lock a message for processing.
|
||||
# The message is guaranteed not to be delivered to other receivers during the lock
|
||||
|
@ -215,8 +243,8 @@ module Azure
|
|||
# topic_name: the name of the topic
|
||||
# subscription_name: the name of the subscription
|
||||
#
|
||||
def peek_lock_subscription_message(topic_name, subscription_name, timeout='60') end
|
||||
|
||||
def unlock_subscription_message(topic_name, subscription_name, sequence_number, lock_token) end
|
||||
#
|
||||
# Unlock a message for processing by other receivers on a given subscription.
|
||||
# This operation deletes the lock object, causing the message to be unlocked.
|
||||
|
@ -230,8 +258,7 @@ module Azure
|
|||
# lock_token: The ID of the lock as returned by the Peek Message operation in
|
||||
# BrokerProperties['LockToken']
|
||||
#
|
||||
|
||||
def read_delete_subscription_message(topic_name, subscription_name, timeout='60') end
|
||||
def unlock_subscription_message(topic_name, subscription_name, sequence_number, lock_token) end
|
||||
#
|
||||
# Read and delete a message from a subscription as an atomic operation. This
|
||||
# operation should be used when a best-effort guarantee is sufficient for an
|
||||
|
@ -241,6 +268,7 @@ module Azure
|
|||
# topic_name: the name of the topic
|
||||
# subscription_name: the name of the subscription
|
||||
#
|
||||
def read_delete_subscription_message(topic_name, subscription_name, timeout='60') end
|
||||
|
||||
#
|
||||
# Completes processing on a locked message and delete it from the subscription.
|
||||
|
@ -262,10 +290,11 @@ module Azure
|
|||
# MaxTopicSizeInMegaBytes. If this message will cause the queue to exceed its
|
||||
# quota, a quota exceeded error is returned and the message will be rejected.
|
||||
#
|
||||
# queue_name: name of the queue
|
||||
# message: the Message object containing message body and properties.
|
||||
#
|
||||
def send_queue_message(queue_name, message=nil) end
|
||||
# queue: Either a Azure::ServiceBus::Queue instance or a string of the queue name
|
||||
# message: An Azure::ServiceBus::BrokeredMessage object containing message body and properties.
|
||||
def send_queue_message(queue, message)
|
||||
send_topic_message(queue, message)
|
||||
end
|
||||
|
||||
#
|
||||
# Automically retrieves and locks a message from a queue for processing. The
|
||||
|
@ -302,9 +331,14 @@ module Azure
|
|||
# that is, using this operation it is possible for messages to be lost if
|
||||
# processing fails.
|
||||
#
|
||||
# queue_name: name of the queue
|
||||
# queue: Either a Azure::ServiceBus::Queue instance or a string of the queue name
|
||||
#
|
||||
def read_delete_queue_message(queue_name, timeout=60) end
|
||||
def read_delete_queue_message(queue, timeout=60)
|
||||
uri = messages_head_uri(_name_for(queue), { "timeout"=> timeout.to_s })
|
||||
|
||||
response = call(:delete, uri)
|
||||
(response.status_code == 204) ? nil : BrokeredMessageSerializer.get_from_http_response(response)
|
||||
end
|
||||
|
||||
#
|
||||
# Completes processing on a locked message and delete it from the queue. This
|
||||
|
@ -426,6 +460,16 @@ module Azure
|
|||
Serialization.resources_from_xml(resource, response.body)
|
||||
end
|
||||
|
||||
# messages uris
|
||||
|
||||
def messages_head_uri(topic_or_queue, query={})
|
||||
generate_uri("#{topic_or_queue}/messages/head", query)
|
||||
end
|
||||
|
||||
def messages_uri(topic_or_queue, query={})
|
||||
generate_uri("#{topic_or_queue}/messages", query)
|
||||
end
|
||||
|
||||
# entry uris
|
||||
def rule_uri(topic, subscription, rule, query={})
|
||||
generate_uri("#{topic}/Subscriptions/#{subscription}/Rules/#{rule}", query)
|
||||
|
|
Загрузка…
Ссылка в новой задаче