Merge pull request #22 from dvanherbergen/mqtt-update

CQ #8246: Updated MQTT transport to match latest version in OpenHAB.
This commit is contained in:
kaikreuzer 2014-05-27 17:19:02 +02:00
Родитель 1664b908ff 072af27caa
Коммит 0a3a0d8227
4 изменённых файлов: 290 добавлений и 124 удалений

Просмотреть файл

@ -42,7 +42,7 @@ public interface MqttMessageConsumer {
/**
* Set the event publisher to use when broadcasting received messages onto
* the openHAB event bus.
* the smarthome event bus.
*
* @param eventPublisher
*/

Просмотреть файл

@ -12,15 +12,15 @@ import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.StringUtils;
import org.eclipse.smarthome.io.transport.mqtt.internal.MqttBrokerConnection;
import org.eclipse.smarthome.core.events.EventPublisher;
import org.eclipse.smarthome.io.transport.mqtt.internal.MqttBrokerConnection;
import org.osgi.service.cm.ConfigurationException;
import org.osgi.service.cm.ManagedService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* MQTT Service for creating new connections to MQTT brokers from the openHAB
* MQTT Service for creating new connections to MQTT brokers from the Smart Home
* configuration file and registering message publishers and subscribers. This
* service is the main entry point for all bundles wanting to use the MQTT
* transport.
@ -36,8 +36,7 @@ public class MqttService implements ManagedService {
private EventPublisher eventPublisher;
@Override
public void updated(Dictionary<String, ?> properties)
throws ConfigurationException {
public void updated(Dictionary<String, ?> properties) throws ConfigurationException {
// load broker configurations from configuration file
if (properties == null || properties.isEmpty()) {
@ -56,9 +55,7 @@ public class MqttService implements ManagedService {
String[] subkeys = key.split("\\.");
if (subkeys.length != 2) {
logger.debug(
"MQTT Broker property '{}' should have the format 'broker.propertykey'",
key);
logger.debug("MQTT Broker property '{}' should have the format 'broker.propertykey'", key);
continue;
}
@ -93,6 +90,10 @@ public class MqttService implements ManagedService {
conn.setAsync(Boolean.parseBoolean(value));
} else if (property.equals("clientId")) {
conn.setClientId(value);
} else if (property.equals("lwt")) {
MqttWillAndTestament will = MqttWillAndTestament.fromString(value);
logger.debug("Setting last will: {}", will);
conn.setLastWill(will);
} else {
logger.warn("Unrecognized property: {}", key);
}
@ -103,8 +104,7 @@ public class MqttService implements ManagedService {
try {
con.start();
} catch (Exception e) {
logger.error("Error starting broker connection {} : {}",
con.getName(), e.getMessage());
logger.error("Error starting broker connection", e);
}
}
}
@ -125,7 +125,7 @@ public class MqttService implements ManagedService {
Enumeration<String> e = brokerConnections.keys();
while (e.hasMoreElements()) {
MqttBrokerConnection conn = brokerConnections.get(e.nextElement());
logger.info("Stopping connection {}", conn.getName());
logger.info("Stopping broker connection '{}'", conn.getName());
conn.close();
}
@ -139,10 +139,9 @@ public class MqttService implements ManagedService {
* to look for.
* @return existing connection or new one if it didn't exist yet.
*/
private MqttBrokerConnection getConnection(String brokerName) {
private synchronized MqttBrokerConnection getConnection(String brokerName) {
MqttBrokerConnection conn = brokerConnections.get(brokerName
.toLowerCase());
MqttBrokerConnection conn = brokerConnections.get(brokerName.toLowerCase());
if (conn == null) {
conn = new MqttBrokerConnection(brokerName);
brokerConnections.put(brokerName.toLowerCase(), conn);
@ -158,8 +157,7 @@ public class MqttService implements ManagedService {
* @param mqttMessageConsumer
* Consumer which will process any received message.
*/
public void registerMessageConsumer(String brokerName,
MqttMessageConsumer mqttMessageConsumer) {
public void registerMessageConsumer(String brokerName, MqttMessageConsumer mqttMessageConsumer) {
mqttMessageConsumer.setEventPublisher(eventPublisher);
getConnection(brokerName).addConsumer(mqttMessageConsumer);
@ -171,14 +169,12 @@ public class MqttService implements ManagedService {
* @param mqttMessageConsumer
* Consumer which needs to be unregistered.
*/
public void unregisterMessageConsumer(String brokerName,
MqttMessageConsumer mqttMessageConsumer) {
public void unregisterMessageConsumer(String brokerName, MqttMessageConsumer mqttMessageConsumer) {
getConnection(brokerName).removeConsumer(mqttMessageConsumer);
}
public void registerMessageProducer(String brokerName,
MqttMessageProducer commandPublisher) {
public void registerMessageProducer(String brokerName, MqttMessageProducer commandPublisher) {
getConnection(brokerName).addProducer(commandPublisher);
}
@ -192,14 +188,13 @@ public class MqttService implements ManagedService {
* @param mqttMessageProducer
* Producer which generates the messages.
*/
public void unregisterMessageProducer(String brokerName,
MqttMessageProducer commandPublisher) {
public void unregisterMessageProducer(String brokerName, MqttMessageProducer commandPublisher) {
getConnection(brokerName).removeProducer(commandPublisher);
}
/**
* Set the publisher to use for publishing openHAB updates.
* Set the publisher to use for publishing SmartHome updates.
*
* @param eventPublisher
* EventPublisher
@ -209,7 +204,7 @@ public class MqttService implements ManagedService {
}
/**
* Remove the publisher to use for publishing openHAB updates.
* Remove the publisher to use for publishing SmartHome updates.
*
* @param eventPublisher
* EventPublisher

Просмотреть файл

@ -0,0 +1,164 @@
/**
* Copyright (c) 2014 openHAB UG (haftungsbeschraenkt) and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.smarthome.io.transport.mqtt;
import org.apache.commons.lang.StringUtils;
/**
* Class encapsulating the last will and testament that is published after the
* client has gone offline.
*
* @author Markus Mann
*
*/
public class MqttWillAndTestament {
private String topic;
private byte[] payload;
private int qos = 0;
private boolean retain = false;
/**
* Create an instance of the last will using a string with the following
* format:<br/>
* topic:message:qos:retained <br/>
* Where
* <ul>
* <li>topic is a normal topic string (no placeholders are allowed)</li>
* <li>message the message to send</li>
* <li>qos Valid values are 0 (Deliver at most once),1 (Deliver at least
* once) or 2</li>
* <li>retain true if messages shall be retained</li>
* </ul>
*
* @param string
* the string to parse. If null, null is returned
* @return the will instance, will be null only if parameter is null
*/
public static MqttWillAndTestament fromString(String string) {
if (string == null) {
return null;
}
MqttWillAndTestament result = new MqttWillAndTestament();
String[] components = string.split(":");
for (int i = 0; i < Math.min(components.length, 4); i++) {
String value = StringUtils.trimToEmpty(components[i]);
switch (i) {
case 0:
result.setTopic(value);
break;
case 1:
result.setPayload(value.getBytes());
break;
case 2:
if (!"".equals(value)) {
result.setQos(Integer.valueOf(value));
}
break;
case 3:
result.setRetain(Boolean.valueOf(value));
break;
}
}
return result;
}
/**
* @return the topic for the last will.
*/
public String getTopic() {
return topic;
}
/**
* Set the topic for the last will.
*
* @param topic
* the topic
*/
public void setTopic(String topic) {
this.topic = topic;
}
/**
* @return the payload of the last will.
*/
public byte[] getPayload() {
return payload;
}
/**
* Set the payload of the last will.
*
* @param payload
* the payload
*/
public void setPayload(byte[] payload) {
this.payload = payload;
}
/**
* @return quality of service level.
*/
public int getQos() {
return qos;
}
/**
* Set quality of service. Valid values are 0,1,2
*
* @param qos
* level.
*/
public void setQos(int qos) {
if (qos >= 0 && qos <= 2) {
this.qos = qos;
}
}
/**
* @return true if the last will should be retained by the broker.
*/
public boolean isRetain() {
return retain;
}
/**
* Set whether the last will should be retained by the broker.
*
* @param retain
* true to retain.
*/
public void setRetain(boolean retain) {
this.retain = retain;
}
/**
* {@inheritDoc}
*/
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[").append(getClass());
sb.append("] Send '");
if (payload != null) {
sb.append(new String(payload));
} else {
sb.append(payload);
}
sb.append("' to topic '");
sb.append(topic);
sb.append("'");
if (retain) {
sb.append(" retained");
}
sb.append(" using qos mode ").append(qos);
return sb.toString();
}
}

Просмотреть файл

@ -8,10 +8,10 @@
package org.eclipse.smarthome.io.transport.mqtt.internal;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Timer;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
@ -31,6 +31,7 @@ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.eclipse.smarthome.io.transport.mqtt.MqttMessageConsumer;
import org.eclipse.smarthome.io.transport.mqtt.MqttMessageProducer;
import org.eclipse.smarthome.io.transport.mqtt.MqttSenderChannel;
import org.eclipse.smarthome.io.transport.mqtt.MqttWillAndTestament;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -64,15 +65,17 @@ public class MqttBrokerConnection implements MqttCallback {
private boolean async = true;
private MqttWillAndTestament lastWill;
private String clientId;
private MqttClient client;
private boolean started;
private List<MqttMessageConsumer> consumers = new ArrayList<MqttMessageConsumer>();
private List<MqttMessageConsumer> consumers = new CopyOnWriteArrayList<MqttMessageConsumer>();
private List<MqttMessageProducer> producers = new ArrayList<MqttMessageProducer>();
private List<MqttMessageProducer> producers = new CopyOnWriteArrayList<MqttMessageProducer>();
private Timer reconnectTimer;
@ -94,7 +97,7 @@ public class MqttBrokerConnection implements MqttCallback {
* @throws Exception
* If connection could not be created.
*/
public void start() throws Exception {
public synchronized void start() throws Exception {
if (StringUtils.isEmpty(url)) {
logger.debug("No url defined for MQTT broker connection '{}'. Not starting.", name);
@ -220,6 +223,14 @@ public class MqttBrokerConnection implements MqttCallback {
this.retain = retain;
}
public MqttWillAndTestament getLastWill() {
return lastWill;
}
public void setLastWill(MqttWillAndTestament lastWill) {
this.lastWill = lastWill;
}
/**
* @return true if messages are sent asynchronously.
*/
@ -257,80 +268,74 @@ public class MqttBrokerConnection implements MqttCallback {
* @throws Exception
*/
private void openConnection() throws Exception {
try {
if (client != null && client.isConnected()) {
return;
}
if (StringUtils.isBlank(url)) {
throw new Exception("Missing url.");
}
if (client == null) {
if (StringUtils.isBlank(clientId) || clientId.length() > 23) {
clientId = MqttClient.generateClientId();
}
String tmpDir = System.getProperty("java.io.tmpdir");
MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir + "/" + name);
logger.debug("Creating new client for '{}' using id '{}' and file store '{}'", new Object[] { url, clientId,
tmpDir + "/" + name });
client = new MqttClient(url, clientId, dataStore);
client.setCallback(this);
}
MqttConnectOptions options = new MqttConnectOptions();
if (!StringUtils.isBlank(user)) {
options.setUserName(user);
}
if (!StringUtils.isBlank(password)) {
options.setPassword(password.toCharArray());
}
if (url.toLowerCase().contains("ssl")) {
if (StringUtils.isNotBlank(System.getProperty("com.ibm.ssl.protocol"))) {
// get all com.ibm.ssl properties from the system properties
// and set them as the SSL properties to use.
Properties sslProps = new Properties();
addSystemProperty("com.ibm.ssl.protocol", sslProps);
addSystemProperty("com.ibm.ssl.contextProvider", sslProps);
addSystemProperty("com.ibm.ssl.keyStore", sslProps);
addSystemProperty("com.ibm.ssl.keyStorePassword", sslProps);
addSystemProperty("com.ibm.ssl.keyStoreType", sslProps);
addSystemProperty("com.ibm.ssl.keyStoreProvider", sslProps);
addSystemProperty("com.ibm.ssl.trustStore", sslProps);
addSystemProperty("com.ibm.ssl.trustStorePassword", sslProps);
addSystemProperty("com.ibm.ssl.trustStoreType", sslProps);
addSystemProperty("com.ibm.ssl.trustStoreProvider", sslProps);
addSystemProperty("com.ibm.ssl.enabledCipherSuites", sslProps);
addSystemProperty("com.ibm.ssl.keyManager", sslProps);
addSystemProperty("com.ibm.ssl.trustManager", sslProps);
options.setSSLProperties(sslProps);
} else {
// use standard JSSE available in the runtime and
// use TLSv1.2 which is the default for a secured mosquitto
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
sslContext.init(null, new TrustManager[] { getVeryTrustingTrustManager() }, new java.security.SecureRandom());
SSLSocketFactory socketFactory = sslContext.getSocketFactory();
options.setSocketFactory(socketFactory);
}
}
client.connect(options);
} catch (MqttException e) {
logger.error("Error connecting to broker '{}' : {} : ReasonCode {} : Cause : {}",
new Object[] { name, e.getMessage(), e.getReasonCode(), e.getCause().getMessage() });
throw e;
if (client != null && client.isConnected()) {
return;
}
if (StringUtils.isBlank(url)) {
throw new Exception("Missing url");
}
if (client == null) {
if (StringUtils.isBlank(clientId) || clientId.length() > 23) {
clientId = MqttClient.generateClientId();
}
String tmpDir = System.getProperty("java.io.tmpdir") + "/" + name;
MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
logger.debug("Creating new client for '{}' using id '{}' and file store '{}'", url, clientId, tmpDir);
client = new MqttClient(url, clientId, dataStore);
client.setCallback(this);
}
MqttConnectOptions options = new MqttConnectOptions();
if (!StringUtils.isBlank(user)) {
options.setUserName(user);
}
if (!StringUtils.isBlank(password)) {
options.setPassword(password.toCharArray());
}
if (url.toLowerCase().contains("ssl")) {
if (StringUtils.isNotBlank(System.getProperty("com.ibm.ssl.protocol"))) {
// get all com.ibm.ssl properties from the system properties
// and set them as the SSL properties to use.
Properties sslProps = new Properties();
addSystemProperty("com.ibm.ssl.protocol", sslProps);
addSystemProperty("com.ibm.ssl.contextProvider", sslProps);
addSystemProperty("com.ibm.ssl.keyStore", sslProps);
addSystemProperty("com.ibm.ssl.keyStorePassword", sslProps);
addSystemProperty("com.ibm.ssl.keyStoreType", sslProps);
addSystemProperty("com.ibm.ssl.keyStoreProvider", sslProps);
addSystemProperty("com.ibm.ssl.trustStore", sslProps);
addSystemProperty("com.ibm.ssl.trustStorePassword", sslProps);
addSystemProperty("com.ibm.ssl.trustStoreType", sslProps);
addSystemProperty("com.ibm.ssl.trustStoreProvider", sslProps);
addSystemProperty("com.ibm.ssl.enabledCipherSuites", sslProps);
addSystemProperty("com.ibm.ssl.keyManager", sslProps);
addSystemProperty("com.ibm.ssl.trustManager", sslProps);
options.setSSLProperties(sslProps);
} else {
// use standard JSSE available in the runtime and
// use TLSv1.2 which is the default for a secured mosquitto
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
sslContext.init(null, new TrustManager[] { getVeryTrustingTrustManager() }, new java.security.SecureRandom());
SSLSocketFactory socketFactory = sslContext.getSocketFactory();
options.setSocketFactory(socketFactory);
}
}
if (lastWill != null) {
options.setWill(lastWill.getTopic(), lastWill.getPayload(), lastWill.getQos(), lastWill.isRetain());
}
client.connect(options);
}
/**
@ -373,7 +378,7 @@ public class MqttBrokerConnection implements MqttCallback {
* @param publisher
* to add.
*/
public void addProducer(MqttMessageProducer publisher) {
public synchronized void addProducer(MqttMessageProducer publisher) {
producers.add(publisher);
if (started) {
startProducer(publisher);
@ -388,7 +393,7 @@ public class MqttBrokerConnection implements MqttCallback {
*/
private void startProducer(MqttMessageProducer publisher) {
logger.trace("Starting message producer for broker {}", name);
logger.trace("Starting message producer for broker '{}'", name);
publisher.setSenderChannel(new MqttSenderChannel() {
@ -409,12 +414,12 @@ public class MqttBrokerConnection implements MqttCallback {
MqttTopic mqttTopic = client.getTopic(topic);
MqttDeliveryToken deliveryToken = mqttTopic.publish(message);
logger.debug("Publishing message {} to topic {} ", deliveryToken.getMessageId(), topic);
logger.debug("Publishing message {} to topic '{}'", deliveryToken.getMessageId(), topic);
if (!async) {
// wait for publish confirmation
deliveryToken.waitForCompletion(10000);
if (!deliveryToken.isComplete()) {
logger.error("Did not receive completion message within timeout limit whilst publishing to topic {} ", topic);
logger.error("Did not receive completion message within timeout limit whilst publishing to topic '{}'", topic);
}
}
@ -429,7 +434,7 @@ public class MqttBrokerConnection implements MqttCallback {
* @param consumer
* to add.
*/
public void addConsumer(MqttMessageConsumer subscriber) {
public synchronized void addConsumer(MqttMessageConsumer subscriber) {
consumers.add(subscriber);
if (started) {
startConsumer(subscriber);
@ -445,12 +450,12 @@ public class MqttBrokerConnection implements MqttCallback {
private void startConsumer(MqttMessageConsumer subscriber) {
String topic = subscriber.getTopic();
logger.debug("Starting message consumer for broker {} on topic {}", name, topic);
logger.debug("Starting message consumer for broker '{}' on topic '{}'", name, topic);
try {
client.subscribe(topic, qos);
} catch (Exception e) {
logger.error("Error starting consumer : ", e);
logger.error("Error starting consumer", e);
}
}
@ -460,8 +465,8 @@ public class MqttBrokerConnection implements MqttCallback {
* @param publisher
* to remove.
*/
public void removeProducer(MqttMessageProducer publisher) {
logger.debug("Removing message producer for broker {}", name);
public synchronized void removeProducer(MqttMessageProducer publisher) {
logger.debug("Removing message producer for broker '{}'", name);
publisher.setSenderChannel(null);
producers.remove(publisher);
}
@ -472,14 +477,15 @@ public class MqttBrokerConnection implements MqttCallback {
* @param subscriber
* to remove.
*/
public void removeConsumer(MqttMessageConsumer subscriber) {
logger.debug("Removing message consumer for topic '{}' from '{}'", subscriber.getTopic(), name);
public synchronized void removeConsumer(MqttMessageConsumer subscriber) {
logger.debug("Unsubscribing message consumer for topic '{}' from broker '{}'", subscriber.getTopic(), name);
try {
if (started) {
client.unsubscribe(subscriber.getTopic());
}
} catch (Exception e) {
logger.error("Error unsubscribing topic '{}' from '{}'", subscriber.getTopic(), name);
logger.error("Error unsubscribing topic from broker", e);
}
consumers.remove(subscriber);
@ -488,29 +494,31 @@ public class MqttBrokerConnection implements MqttCallback {
/**
* Close the MQTT connection.
*/
public void close() {
logger.debug("Closing connection to {}", name);
public synchronized void close() {
logger.debug("Closing connection to broker '{}'", name);
try {
if (started) {
client.disconnect();
}
} catch (MqttException e) {
logger.error("Error closing connection to {}.", name, e);
logger.error("Error closing connection to broker", e);
}
started = false;
}
@Override
public void connectionLost(Throwable t) {
public synchronized void connectionLost(Throwable t) {
logger.error("MQTT connection to broker was lost", t);
if (t instanceof MqttException) {
MqttException e = (MqttException) t;
logger.error("MQTT connection to '{}' was lost: {} : ReasonCode {} : Cause : {}",
new Object[] { name, e.getMessage(), e.getReasonCode(), e.getCause().getMessage() });
} else {
logger.error("MQTT connection to '{}' was lost: {} : ReasonCode {} : Cause : {}", new Object[] { name, e.getMessage(), e.getReasonCode(),
(e.getCause() == null ? "Unknown" : e.getCause().getMessage()) });
} else {
logger.error("MQTT connection to '{}' was lost: {}", name, t.getMessage());
}
started = false;
logger.info("Starting connection helper to periodically try restore connection to broker '{}'", name);
@ -520,7 +528,6 @@ public class MqttBrokerConnection implements MqttCallback {
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.trace("Message with id {} delivered.", token.getMessageId());
@ -560,10 +567,10 @@ public class MqttBrokerConnection implements MqttCallback {
regex = StringUtils.replace(regex, "#", ".*");
boolean result = source.matches(regex);
if (result) {
logger.trace("Topic match for '{}' and '{}' using regex {}", new Object[] { source, target, regex });
logger.trace("Topic match for '{}' and '{}' using regex {}", source, target, regex);
return true;
} else {
logger.trace("No topic match for '{}' and '{}' using regex {}", new Object[] { source, target, regex });
logger.trace("No topic match for '{}' and '{}' using regex {}", source, target, regex);
return false;
}
}