diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttMessageConsumer.java b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttMessageConsumer.java index f7c17b8b..95ccc9cd 100644 --- a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttMessageConsumer.java +++ b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttMessageConsumer.java @@ -42,7 +42,7 @@ public interface MqttMessageConsumer { /** * Set the event publisher to use when broadcasting received messages onto - * the openHAB event bus. + * the smarthome event bus. * * @param eventPublisher */ diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttService.java b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttService.java index 58abee9d..7b0742a2 100644 --- a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttService.java +++ b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttService.java @@ -12,15 +12,15 @@ import java.util.Enumeration; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang.StringUtils; -import org.eclipse.smarthome.io.transport.mqtt.internal.MqttBrokerConnection; import org.eclipse.smarthome.core.events.EventPublisher; +import org.eclipse.smarthome.io.transport.mqtt.internal.MqttBrokerConnection; import org.osgi.service.cm.ConfigurationException; import org.osgi.service.cm.ManagedService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * MQTT Service for creating new connections to MQTT brokers from the openHAB + * MQTT Service for creating new connections to MQTT brokers from the Smart Home * configuration file and registering message publishers and subscribers. This * service is the main entry point for all bundles wanting to use the MQTT * transport. @@ -36,8 +36,7 @@ public class MqttService implements ManagedService { private EventPublisher eventPublisher; @Override - public void updated(Dictionary properties) - throws ConfigurationException { + public void updated(Dictionary properties) throws ConfigurationException { // load broker configurations from configuration file if (properties == null || properties.isEmpty()) { @@ -56,9 +55,7 @@ public class MqttService implements ManagedService { String[] subkeys = key.split("\\."); if (subkeys.length != 2) { - logger.debug( - "MQTT Broker property '{}' should have the format 'broker.propertykey'", - key); + logger.debug("MQTT Broker property '{}' should have the format 'broker.propertykey'", key); continue; } @@ -93,6 +90,10 @@ public class MqttService implements ManagedService { conn.setAsync(Boolean.parseBoolean(value)); } else if (property.equals("clientId")) { conn.setClientId(value); + } else if (property.equals("lwt")) { + MqttWillAndTestament will = MqttWillAndTestament.fromString(value); + logger.debug("Setting last will: {}", will); + conn.setLastWill(will); } else { logger.warn("Unrecognized property: {}", key); } @@ -103,8 +104,7 @@ public class MqttService implements ManagedService { try { con.start(); } catch (Exception e) { - logger.error("Error starting broker connection {} : {}", - con.getName(), e.getMessage()); + logger.error("Error starting broker connection", e); } } } @@ -125,7 +125,7 @@ public class MqttService implements ManagedService { Enumeration e = brokerConnections.keys(); while (e.hasMoreElements()) { MqttBrokerConnection conn = brokerConnections.get(e.nextElement()); - logger.info("Stopping connection {}", conn.getName()); + logger.info("Stopping broker connection '{}'", conn.getName()); conn.close(); } @@ -139,10 +139,9 @@ public class MqttService implements ManagedService { * to look for. * @return existing connection or new one if it didn't exist yet. */ - private MqttBrokerConnection getConnection(String brokerName) { + private synchronized MqttBrokerConnection getConnection(String brokerName) { - MqttBrokerConnection conn = brokerConnections.get(brokerName - .toLowerCase()); + MqttBrokerConnection conn = brokerConnections.get(brokerName.toLowerCase()); if (conn == null) { conn = new MqttBrokerConnection(brokerName); brokerConnections.put(brokerName.toLowerCase(), conn); @@ -158,8 +157,7 @@ public class MqttService implements ManagedService { * @param mqttMessageConsumer * Consumer which will process any received message. */ - public void registerMessageConsumer(String brokerName, - MqttMessageConsumer mqttMessageConsumer) { + public void registerMessageConsumer(String brokerName, MqttMessageConsumer mqttMessageConsumer) { mqttMessageConsumer.setEventPublisher(eventPublisher); getConnection(brokerName).addConsumer(mqttMessageConsumer); @@ -171,14 +169,12 @@ public class MqttService implements ManagedService { * @param mqttMessageConsumer * Consumer which needs to be unregistered. */ - public void unregisterMessageConsumer(String brokerName, - MqttMessageConsumer mqttMessageConsumer) { + public void unregisterMessageConsumer(String brokerName, MqttMessageConsumer mqttMessageConsumer) { getConnection(brokerName).removeConsumer(mqttMessageConsumer); } - public void registerMessageProducer(String brokerName, - MqttMessageProducer commandPublisher) { + public void registerMessageProducer(String brokerName, MqttMessageProducer commandPublisher) { getConnection(brokerName).addProducer(commandPublisher); } @@ -192,14 +188,13 @@ public class MqttService implements ManagedService { * @param mqttMessageProducer * Producer which generates the messages. */ - public void unregisterMessageProducer(String brokerName, - MqttMessageProducer commandPublisher) { + public void unregisterMessageProducer(String brokerName, MqttMessageProducer commandPublisher) { getConnection(brokerName).removeProducer(commandPublisher); } /** - * Set the publisher to use for publishing openHAB updates. + * Set the publisher to use for publishing SmartHome updates. * * @param eventPublisher * EventPublisher @@ -209,7 +204,7 @@ public class MqttService implements ManagedService { } /** - * Remove the publisher to use for publishing openHAB updates. + * Remove the publisher to use for publishing SmartHome updates. * * @param eventPublisher * EventPublisher diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttWillAndTestament.java b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttWillAndTestament.java new file mode 100644 index 00000000..1ff983d2 --- /dev/null +++ b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttWillAndTestament.java @@ -0,0 +1,164 @@ +/** + * Copyright (c) 2014 openHAB UG (haftungsbeschraenkt) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + */ +package org.eclipse.smarthome.io.transport.mqtt; + +import org.apache.commons.lang.StringUtils; + +/** + * Class encapsulating the last will and testament that is published after the + * client has gone offline. + * + * @author Markus Mann + * + */ +public class MqttWillAndTestament { + private String topic; + private byte[] payload; + private int qos = 0; + private boolean retain = false; + + /** + * Create an instance of the last will using a string with the following + * format:
+ * topic:message:qos:retained
+ * Where + *
    + *
  • topic is a normal topic string (no placeholders are allowed)
  • + *
  • message the message to send
  • + *
  • qos Valid values are 0 (Deliver at most once),1 (Deliver at least + * once) or 2
  • + *
  • retain true if messages shall be retained
  • + *
+ * + * @param string + * the string to parse. If null, null is returned + * @return the will instance, will be null only if parameter is null + */ + public static MqttWillAndTestament fromString(String string) { + if (string == null) { + return null; + } + MqttWillAndTestament result = new MqttWillAndTestament(); + String[] components = string.split(":"); + for (int i = 0; i < Math.min(components.length, 4); i++) { + String value = StringUtils.trimToEmpty(components[i]); + switch (i) { + case 0: + result.setTopic(value); + break; + case 1: + result.setPayload(value.getBytes()); + break; + case 2: + if (!"".equals(value)) { + result.setQos(Integer.valueOf(value)); + } + break; + case 3: + result.setRetain(Boolean.valueOf(value)); + break; + } + } + return result; + + } + + /** + * @return the topic for the last will. + */ + public String getTopic() { + return topic; + } + + /** + * Set the topic for the last will. + * + * @param topic + * the topic + */ + public void setTopic(String topic) { + this.topic = topic; + } + + /** + * @return the payload of the last will. + */ + public byte[] getPayload() { + return payload; + } + + /** + * Set the payload of the last will. + * + * @param payload + * the payload + */ + public void setPayload(byte[] payload) { + this.payload = payload; + } + + /** + * @return quality of service level. + */ + public int getQos() { + return qos; + } + + /** + * Set quality of service. Valid values are 0,1,2 + * + * @param qos + * level. + */ + public void setQos(int qos) { + if (qos >= 0 && qos <= 2) { + this.qos = qos; + } + } + + /** + * @return true if the last will should be retained by the broker. + */ + public boolean isRetain() { + return retain; + } + + /** + * Set whether the last will should be retained by the broker. + * + * @param retain + * true to retain. + */ + public void setRetain(boolean retain) { + this.retain = retain; + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("[").append(getClass()); + sb.append("] Send '"); + if (payload != null) { + sb.append(new String(payload)); + } else { + sb.append(payload); + } + sb.append("' to topic '"); + sb.append(topic); + sb.append("'"); + if (retain) { + sb.append(" retained"); + } + sb.append(" using qos mode ").append(qos); + return sb.toString(); + } + +} diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttBrokerConnection.java b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttBrokerConnection.java index a131a28d..bb660311 100644 --- a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttBrokerConnection.java +++ b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttBrokerConnection.java @@ -8,10 +8,10 @@ package org.eclipse.smarthome.io.transport.mqtt.internal; import java.security.cert.X509Certificate; -import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.Timer; +import java.util.concurrent.CopyOnWriteArrayList; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; @@ -31,6 +31,7 @@ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; import org.eclipse.smarthome.io.transport.mqtt.MqttMessageConsumer; import org.eclipse.smarthome.io.transport.mqtt.MqttMessageProducer; import org.eclipse.smarthome.io.transport.mqtt.MqttSenderChannel; +import org.eclipse.smarthome.io.transport.mqtt.MqttWillAndTestament; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,15 +65,17 @@ public class MqttBrokerConnection implements MqttCallback { private boolean async = true; + private MqttWillAndTestament lastWill; + private String clientId; private MqttClient client; private boolean started; - private List consumers = new ArrayList(); + private List consumers = new CopyOnWriteArrayList(); - private List producers = new ArrayList(); + private List producers = new CopyOnWriteArrayList(); private Timer reconnectTimer; @@ -94,7 +97,7 @@ public class MqttBrokerConnection implements MqttCallback { * @throws Exception * If connection could not be created. */ - public void start() throws Exception { + public synchronized void start() throws Exception { if (StringUtils.isEmpty(url)) { logger.debug("No url defined for MQTT broker connection '{}'. Not starting.", name); @@ -220,6 +223,14 @@ public class MqttBrokerConnection implements MqttCallback { this.retain = retain; } + public MqttWillAndTestament getLastWill() { + return lastWill; + } + + public void setLastWill(MqttWillAndTestament lastWill) { + this.lastWill = lastWill; + } + /** * @return true if messages are sent asynchronously. */ @@ -257,80 +268,74 @@ public class MqttBrokerConnection implements MqttCallback { * @throws Exception */ private void openConnection() throws Exception { - - try { - if (client != null && client.isConnected()) { - return; - } - - if (StringUtils.isBlank(url)) { - throw new Exception("Missing url."); - } - - if (client == null) { - if (StringUtils.isBlank(clientId) || clientId.length() > 23) { - clientId = MqttClient.generateClientId(); - } - - String tmpDir = System.getProperty("java.io.tmpdir"); - MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir + "/" + name); - logger.debug("Creating new client for '{}' using id '{}' and file store '{}'", new Object[] { url, clientId, - tmpDir + "/" + name }); - client = new MqttClient(url, clientId, dataStore); - client.setCallback(this); - } - - MqttConnectOptions options = new MqttConnectOptions(); - - if (!StringUtils.isBlank(user)) { - options.setUserName(user); - } - if (!StringUtils.isBlank(password)) { - options.setPassword(password.toCharArray()); - } - if (url.toLowerCase().contains("ssl")) { - - if (StringUtils.isNotBlank(System.getProperty("com.ibm.ssl.protocol"))) { - - // get all com.ibm.ssl properties from the system properties - // and set them as the SSL properties to use. - - Properties sslProps = new Properties(); - addSystemProperty("com.ibm.ssl.protocol", sslProps); - addSystemProperty("com.ibm.ssl.contextProvider", sslProps); - addSystemProperty("com.ibm.ssl.keyStore", sslProps); - addSystemProperty("com.ibm.ssl.keyStorePassword", sslProps); - addSystemProperty("com.ibm.ssl.keyStoreType", sslProps); - addSystemProperty("com.ibm.ssl.keyStoreProvider", sslProps); - addSystemProperty("com.ibm.ssl.trustStore", sslProps); - addSystemProperty("com.ibm.ssl.trustStorePassword", sslProps); - addSystemProperty("com.ibm.ssl.trustStoreType", sslProps); - addSystemProperty("com.ibm.ssl.trustStoreProvider", sslProps); - addSystemProperty("com.ibm.ssl.enabledCipherSuites", sslProps); - addSystemProperty("com.ibm.ssl.keyManager", sslProps); - addSystemProperty("com.ibm.ssl.trustManager", sslProps); - - options.setSSLProperties(sslProps); - - } else { - - // use standard JSSE available in the runtime and - // use TLSv1.2 which is the default for a secured mosquitto - SSLContext sslContext = SSLContext.getInstance("TLSv1.2"); - sslContext.init(null, new TrustManager[] { getVeryTrustingTrustManager() }, new java.security.SecureRandom()); - SSLSocketFactory socketFactory = sslContext.getSocketFactory(); - options.setSocketFactory(socketFactory); - } - } - - client.connect(options); - - } catch (MqttException e) { - logger.error("Error connecting to broker '{}' : {} : ReasonCode {} : Cause : {}", - new Object[] { name, e.getMessage(), e.getReasonCode(), e.getCause().getMessage() }); - throw e; + if (client != null && client.isConnected()) { + return; } + if (StringUtils.isBlank(url)) { + throw new Exception("Missing url"); + } + + if (client == null) { + if (StringUtils.isBlank(clientId) || clientId.length() > 23) { + clientId = MqttClient.generateClientId(); + } + + String tmpDir = System.getProperty("java.io.tmpdir") + "/" + name; + MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir); + logger.debug("Creating new client for '{}' using id '{}' and file store '{}'", url, clientId, tmpDir); + client = new MqttClient(url, clientId, dataStore); + client.setCallback(this); + } + + MqttConnectOptions options = new MqttConnectOptions(); + + if (!StringUtils.isBlank(user)) { + options.setUserName(user); + } + if (!StringUtils.isBlank(password)) { + options.setPassword(password.toCharArray()); + } + if (url.toLowerCase().contains("ssl")) { + + if (StringUtils.isNotBlank(System.getProperty("com.ibm.ssl.protocol"))) { + + // get all com.ibm.ssl properties from the system properties + // and set them as the SSL properties to use. + + Properties sslProps = new Properties(); + addSystemProperty("com.ibm.ssl.protocol", sslProps); + addSystemProperty("com.ibm.ssl.contextProvider", sslProps); + addSystemProperty("com.ibm.ssl.keyStore", sslProps); + addSystemProperty("com.ibm.ssl.keyStorePassword", sslProps); + addSystemProperty("com.ibm.ssl.keyStoreType", sslProps); + addSystemProperty("com.ibm.ssl.keyStoreProvider", sslProps); + addSystemProperty("com.ibm.ssl.trustStore", sslProps); + addSystemProperty("com.ibm.ssl.trustStorePassword", sslProps); + addSystemProperty("com.ibm.ssl.trustStoreType", sslProps); + addSystemProperty("com.ibm.ssl.trustStoreProvider", sslProps); + addSystemProperty("com.ibm.ssl.enabledCipherSuites", sslProps); + addSystemProperty("com.ibm.ssl.keyManager", sslProps); + addSystemProperty("com.ibm.ssl.trustManager", sslProps); + + options.setSSLProperties(sslProps); + + } else { + + // use standard JSSE available in the runtime and + // use TLSv1.2 which is the default for a secured mosquitto + SSLContext sslContext = SSLContext.getInstance("TLSv1.2"); + sslContext.init(null, new TrustManager[] { getVeryTrustingTrustManager() }, new java.security.SecureRandom()); + SSLSocketFactory socketFactory = sslContext.getSocketFactory(); + options.setSocketFactory(socketFactory); + } + } + + if (lastWill != null) { + options.setWill(lastWill.getTopic(), lastWill.getPayload(), lastWill.getQos(), lastWill.isRetain()); + } + + client.connect(options); } /** @@ -373,7 +378,7 @@ public class MqttBrokerConnection implements MqttCallback { * @param publisher * to add. */ - public void addProducer(MqttMessageProducer publisher) { + public synchronized void addProducer(MqttMessageProducer publisher) { producers.add(publisher); if (started) { startProducer(publisher); @@ -388,7 +393,7 @@ public class MqttBrokerConnection implements MqttCallback { */ private void startProducer(MqttMessageProducer publisher) { - logger.trace("Starting message producer for broker {}", name); + logger.trace("Starting message producer for broker '{}'", name); publisher.setSenderChannel(new MqttSenderChannel() { @@ -409,12 +414,12 @@ public class MqttBrokerConnection implements MqttCallback { MqttTopic mqttTopic = client.getTopic(topic); MqttDeliveryToken deliveryToken = mqttTopic.publish(message); - logger.debug("Publishing message {} to topic {} ", deliveryToken.getMessageId(), topic); + logger.debug("Publishing message {} to topic '{}'", deliveryToken.getMessageId(), topic); if (!async) { // wait for publish confirmation deliveryToken.waitForCompletion(10000); if (!deliveryToken.isComplete()) { - logger.error("Did not receive completion message within timeout limit whilst publishing to topic {} ", topic); + logger.error("Did not receive completion message within timeout limit whilst publishing to topic '{}'", topic); } } @@ -429,7 +434,7 @@ public class MqttBrokerConnection implements MqttCallback { * @param consumer * to add. */ - public void addConsumer(MqttMessageConsumer subscriber) { + public synchronized void addConsumer(MqttMessageConsumer subscriber) { consumers.add(subscriber); if (started) { startConsumer(subscriber); @@ -445,12 +450,12 @@ public class MqttBrokerConnection implements MqttCallback { private void startConsumer(MqttMessageConsumer subscriber) { String topic = subscriber.getTopic(); - logger.debug("Starting message consumer for broker {} on topic {}", name, topic); + logger.debug("Starting message consumer for broker '{}' on topic '{}'", name, topic); try { client.subscribe(topic, qos); } catch (Exception e) { - logger.error("Error starting consumer : ", e); + logger.error("Error starting consumer", e); } } @@ -460,8 +465,8 @@ public class MqttBrokerConnection implements MqttCallback { * @param publisher * to remove. */ - public void removeProducer(MqttMessageProducer publisher) { - logger.debug("Removing message producer for broker {}", name); + public synchronized void removeProducer(MqttMessageProducer publisher) { + logger.debug("Removing message producer for broker '{}'", name); publisher.setSenderChannel(null); producers.remove(publisher); } @@ -472,14 +477,15 @@ public class MqttBrokerConnection implements MqttCallback { * @param subscriber * to remove. */ - public void removeConsumer(MqttMessageConsumer subscriber) { - logger.debug("Removing message consumer for topic '{}' from '{}'", subscriber.getTopic(), name); + public synchronized void removeConsumer(MqttMessageConsumer subscriber) { + logger.debug("Unsubscribing message consumer for topic '{}' from broker '{}'", subscriber.getTopic(), name); + try { if (started) { client.unsubscribe(subscriber.getTopic()); } } catch (Exception e) { - logger.error("Error unsubscribing topic '{}' from '{}'", subscriber.getTopic(), name); + logger.error("Error unsubscribing topic from broker", e); } consumers.remove(subscriber); @@ -488,29 +494,31 @@ public class MqttBrokerConnection implements MqttCallback { /** * Close the MQTT connection. */ - public void close() { - logger.debug("Closing connection to {}", name); + public synchronized void close() { + logger.debug("Closing connection to broker '{}'", name); try { if (started) { client.disconnect(); } } catch (MqttException e) { - logger.error("Error closing connection to {}.", name, e); + logger.error("Error closing connection to broker", e); } started = false; } @Override - public void connectionLost(Throwable t) { - + public synchronized void connectionLost(Throwable t) { + + logger.error("MQTT connection to broker was lost", t); + if (t instanceof MqttException) { MqttException e = (MqttException) t; - logger.error("MQTT connection to '{}' was lost: {} : ReasonCode {} : Cause : {}", - new Object[] { name, e.getMessage(), e.getReasonCode(), e.getCause().getMessage() }); - } else { + logger.error("MQTT connection to '{}' was lost: {} : ReasonCode {} : Cause : {}", new Object[] { name, e.getMessage(), e.getReasonCode(), + (e.getCause() == null ? "Unknown" : e.getCause().getMessage()) }); + } else { logger.error("MQTT connection to '{}' was lost: {}", name, t.getMessage()); } - + started = false; logger.info("Starting connection helper to periodically try restore connection to broker '{}'", name); @@ -520,7 +528,6 @@ public class MqttBrokerConnection implements MqttCallback { } - @Override public void deliveryComplete(IMqttDeliveryToken token) { logger.trace("Message with id {} delivered.", token.getMessageId()); @@ -560,10 +567,10 @@ public class MqttBrokerConnection implements MqttCallback { regex = StringUtils.replace(regex, "#", ".*"); boolean result = source.matches(regex); if (result) { - logger.trace("Topic match for '{}' and '{}' using regex {}", new Object[] { source, target, regex }); + logger.trace("Topic match for '{}' and '{}' using regex {}", source, target, regex); return true; } else { - logger.trace("No topic match for '{}' and '{}' using regex {}", new Object[] { source, target, regex }); + logger.trace("No topic match for '{}' and '{}' using regex {}", source, target, regex); return false; } }