Merge pull request #37 from mariniss/BUG-423516-REFACTOR-WATCHSERVICE

Bug 423516 - Rules of type "System started" should start after Item Initalization
This commit is contained in:
kaikreuzer 2014-07-01 10:36:24 +02:00
Родитель f06bb550a4 40f0e2f38f
Коммит b79ef80b37
1 изменённых файлов: 322 добавлений и 167 удалений

Просмотреть файл

@ -7,70 +7,71 @@
*/
package org.eclipse.smarthome.model.core.internal.folder;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Date;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.WatchEvent;
import java.nio.file.WatchEvent.Kind;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Dictionary;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.ArrayUtils;
import org.eclipse.smarthome.model.core.ModelCoreConstants;
import org.eclipse.smarthome.model.core.ModelRepository;
import org.eclipse.smarthome.model.core.internal.util.MathUtils;
import org.apache.commons.lang.StringUtils;
import org.eclipse.smarthome.config.core.ConfigDispatcher;
import org.eclipse.smarthome.model.core.ModelRepository;
import org.osgi.service.cm.ConfigurationException;
import org.osgi.service.cm.ManagedService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
/**
* This class is able to observe multiple folders for changes. It checks the
* last modified date in a configurable frequency and notifies the model repository
* about every change, so that it can update itself.
*
* This logic is run as a separate thread, so that it can always detect changes.
* This class is able to observe multiple folders for changes and notifies the
* model repository about every change, so that it can update itself.
*
* @author Kai Kreuzer - Initial contribution and API
* @author Fabio Marini - Refactoring to use WatchService
*
*/
public class FolderObserver extends Thread implements ManagedService {
public class FolderObserver implements ManagedService {
private static final Logger logger = LoggerFactory
.getLogger(FolderObserver.class);
/* map that lists all foldernames that should be observed and the frequency for checks in seconds */
private final Map<String, Integer> folderRefreshMap = new ConcurrentHashMap<String, Integer>();
/* map that stores a list of valid file extensions for each folder */
private final Map<String, String[]> folderFileExtMap = new ConcurrentHashMap<String, String[]>();
/* map that stores the time of the last check of a filename in milliseconds */
private Map<String, Long> lastCheckedMap = new ConcurrentHashMap<String, Long>();
/* map that remembers all filenames of the last check, so that it can detect file deletions */
private Map<String, Set<String>> lastFileNames = new ConcurrentHashMap<String, Set<String>>();
/* the greatest common divisor of all folder refresh rates */
private int gcdRefresh = 1;
/* the least common multiple of all folder refresh rates */
private int lcmRefresh = 1;
/* a counter to know which folders need to be refreshed when waking up */
private int refreshCount = 0;
/* the watch service */
private WatchService watchService;
/* the model repository is provided as a service */
private ModelRepository modelRepo = null;
public FolderObserver() {
super("FolderObserver");
}
/* map that stores a list of valid file extensions for each folder */
private final Map<String, String[]> folderFileExtMap = new ConcurrentHashMap<String, String[]>();
public void setModelRepository(ModelRepository modelRepo) {
this.modelRepo = modelRepo;
@ -80,162 +81,316 @@ public class FolderObserver extends Thread implements ManagedService {
this.modelRepo = null;
}
@Override
public void run() {
while(!folderRefreshMap.isEmpty()) { // keep the thread running as long as there are folders to observe
public void activate() {
}
public void deactivate() {
stopWatchService();
}
private void initializeWatchService() {
if (watchService != null) {
try {
for(String foldername : folderRefreshMap.keySet()) {
// if folder has been checked at least once and it is not time yet to refresh, skip
if( lastFileNames.get(foldername) != null &&
(refreshCount % folderRefreshMap.get(foldername) > 0)) {
logger.debug("skipping refresh of folder '{}' folderRefreshMap={}",
foldername, folderRefreshMap.get(foldername));
continue;
}
logger.debug("Refreshing folder '{}'", foldername);
checkFolder(foldername);
}
// increase the counter and set it to 0, if it reaches the max value
refreshCount = (refreshCount + gcdRefresh) % lcmRefresh;
} catch(Throwable e) {
logger.error("An unexpected exception has occured", e);
watchService.close();
} catch (IOException e) {
logger.warn("Cannot deactivate folder watcher", e);
}
}
String pathToWatch = ConfigDispatcher.getConfigFolder();
if (StringUtils.isNotBlank(pathToWatch)
&& MapUtils.isNotEmpty(folderFileExtMap)) {
try {
if(gcdRefresh <= 0) break;
synchronized(FolderObserver.this) {
wait(gcdRefresh * 1000L);
}
} catch (InterruptedException e) {
break;
watchService = FileSystems.getDefault().newWatchService();
Files.walkFileTree(Paths.get(pathToWatch),
new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir,
BasicFileAttributes attrs)
throws IOException {
String folderName = dir.getFileName()
.toString();
if (folderFileExtMap.containsKey(folderName)) {
dir.register(watchService, ENTRY_CREATE,
ENTRY_DELETE, ENTRY_MODIFY);
}
return FileVisitResult.CONTINUE;
}
});
WatchQueueReader reader = new WatchQueueReader(watchService,
folderFileExtMap, modelRepo);
Thread qr = new Thread(reader, "Model Dir Watcher");
qr.start();
} catch (IOException e) {
logger.error("Cannot activate folder watcher for folder ", e);
}
}
}
private void checkFolder(String foldername) {
File folder = getFolder(foldername);
if(!folder.exists()) {
return;
private void stopWatchService() {
try {
watchService.close();
} catch (IOException e) {
logger.warn("Cannot deactivate folder watcher", e);
}
String[] extensions = folderFileExtMap.get(foldername);
watchService = null;
}
// check current files and add or refresh them accordingly
Set<String> currentFileNames = new HashSet<String>();
for(File file : folder.listFiles()) {
if(file.isDirectory()) continue;
if(!file.getName().contains(".")) continue;
if(file.getName().startsWith(".")) continue;
private static class WatchQueueReader implements Runnable {
// if there is an extension filter defined, continue if the file has a different extension
String fileExt = getExtension(file.getName());
if(extensions!=null && extensions.length>0 && !ArrayUtils.contains(extensions, fileExt)) continue;
private WatchService watchService;
currentFileNames.add(file.getName());
Long timeLastCheck = lastCheckedMap.get(file.getName());
if(timeLastCheck==null) timeLastCheck = 0L;
if(FileUtils.isFileNewer(file, timeLastCheck)) {
if(modelRepo!=null) {
private Map<String, String[]> folderFileExtMap = new ConcurrentHashMap<String, String[]>();
private ModelRepository modelRepo = null;
public WatchQueueReader(WatchService watchService,
Map<String, String[]> folderFileExtMap,
ModelRepository modelRepo) {
super();
this.watchService = watchService;
this.folderFileExtMap = folderFileExtMap;
this.modelRepo = modelRepo;
}
@SuppressWarnings("unchecked")
@Override
public void run() {
try {
for (;;) {
WatchKey key = null;
try {
if(modelRepo.addOrRefreshModel(file.getName(), FileUtils.openInputStream(file))) {
lastCheckedMap.put(file.getName(), new Date().getTime());
key = watchService.take();
} catch (InterruptedException e) {
return;
}
for (WatchEvent<?> event : key.pollEvents()) {
WatchEvent.Kind<?> kind = event.kind();
if (kind == OVERFLOW) {
continue;
}
WatchEvent<Path> ev = (WatchEvent<Path>) event;
Path name = ev.context();
File toCheck = getFileByFileExtMap(folderFileExtMap,
name.toString());
if (toCheck != null) {
checkFile(modelRepo, toCheck, kind);
}
} catch (IOException e) {
logger.warn("Cannot open file '"+ file.getAbsolutePath() + "' for reading.", e);
}
key.reset();
}
} catch (ClosedWatchServiceException ecx) {
logger.debug(
"ClosedWatchServiceException catched! {}. \n{} Stopping ",
ecx.getLocalizedMessage(), Thread.currentThread()
.getName());
return;
}
}
// check for files that have been deleted meanwhile
if(lastFileNames.get(foldername)!=null) {;
for(String fileName : lastFileNames.get(foldername)) {
if(!currentFileNames.contains(fileName)) {
logger.info("File '{}' has been deleted", fileName);
if(modelRepo!=null) {
modelRepo.removeModel(fileName);
}
}
}
}
lastFileNames.put(foldername, currentFileNames);
}
private String getExtension(String filename) {
String fileExt = filename.substring(filename.lastIndexOf(".") + 1);
return fileExt;
}
@SuppressWarnings("rawtypes")
public void updated(Dictionary config) throws ConfigurationException {
public synchronized void updated(Dictionary config)
throws ConfigurationException {
if (config != null) {
// necessary to check removed models
Map<String, String[]> previousFolderFileExtMap = new ConcurrentHashMap<String, String[]>(
folderFileExtMap);
// make sure to clear the caches first
lastFileNames.clear();
lastCheckedMap.clear();
folderFileExtMap.clear();
folderRefreshMap.clear();
Enumeration keys = config.keys();
while (keys.hasMoreElements()) {
String foldername = (String) keys.nextElement();
if(foldername.equals("service.pid")) continue;
String[] values = ((String) config.get(foldername)).split(",");
try {
Integer refreshValue = Integer.valueOf(values[0]);
String[] fileExts = (String[]) ArrayUtils.remove(values, 0);
File folder = getFolder(foldername);
if (folder.exists() && folder.isDirectory()) {
folderFileExtMap.put(foldername, fileExts);
if (refreshValue > 0) {
folderRefreshMap.put(foldername, refreshValue);
if(!this.isAlive()) {
// seems we have the first folder to observe, so let's start the thread
this.start();
} else {
// make sure that we notify the sleeping thread and directly refresh the folders
synchronized (FolderObserver.this) {
notify();
checkFolder(foldername);
}
}
} else {
// deactivate the refresh for this folder
folderRefreshMap.remove(foldername);
checkFolder(foldername);
}
} else {
logger.warn(
"Directory '{}' does not exist in '{}'. Please check your configuration settings!",
foldername, ConfigDispatcher.getConfigFolder());
}
if (foldername.equals("service.pid"))
continue;
// now update the refresh information for the thread
Integer[] refreshValues = folderRefreshMap.values().toArray(new Integer[0]);
if(refreshValues.length>0) {
gcdRefresh = MathUtils.gcd(refreshValues);
lcmRefresh = MathUtils.lcm(refreshValues);
}
refreshCount = 0;
} catch (NumberFormatException e) {
String[] fileExts = ((String) config.get(foldername))
.split(",");
File folder = getFile(foldername);
if (folder.exists() && folder.isDirectory()) {
folderFileExtMap.put(foldername, fileExts);
} else {
logger.warn(
"Invalid value '{}' for configuration '{}'. Integer value expected!",
values[0], ModelCoreConstants.SERVICE_PID + ":"
+ foldername);
"Directory '{}' does not exist in '{}'. Please check your configuration settings!",
foldername, ConfigDispatcher.getConfigFolder());
}
}
notifyUpdateToModelRepo(previousFolderFileExtMap);
initializeWatchService();
}
}
private void notifyUpdateToModelRepo(
Map<String, String[]> previousFolderFileExtMap) {
checkDeletedModels(previousFolderFileExtMap);
if (MapUtils.isNotEmpty(folderFileExtMap)) {
Iterator<String> iterator = folderFileExtMap.keySet().iterator();
while (iterator.hasNext()) {
String folderName = iterator.next();
final String[] validExtension = folderFileExtMap
.get(folderName);
if (validExtension != null && validExtension.length > 0) {
File folder = getFile(folderName);
File[] files = folder.listFiles(new FileExtensionsFilter(
validExtension));
if (files != null && files.length > 0) {
for (File file : files) {
checkFile(modelRepo, file, ENTRY_CREATE);
}
}
}
}
}
}
private void checkDeletedModels(
Map<String, String[]> previousFolderFileExtMap) {
if (MapUtils.isNotEmpty(previousFolderFileExtMap)) {
List<String> modelsToRemove = new LinkedList<String>();
if (MapUtils.isNotEmpty(folderFileExtMap)) {
Set<String> folders = previousFolderFileExtMap.keySet();
for (String folder : folders) {
if (!folderFileExtMap.containsKey(folder)) {
Iterable<String> models = modelRepo
.getAllModelNamesOfType(folder);
if (models != null) {
modelsToRemove.addAll(Lists.newLinkedList(models));
}
}
}
} else {
Set<String> folders = previousFolderFileExtMap.keySet();
for (String folder : folders) {
synchronized (FolderObserver.class) {
Iterable<String> models = modelRepo
.getAllModelNamesOfType(folder);
if (models != null) {
modelsToRemove.addAll(Lists.newLinkedList(models));
}
}
}
}
if (CollectionUtils.isNotEmpty(modelsToRemove)) {
for (String modelToRemove : modelsToRemove) {
synchronized (FolderObserver.class) {
modelRepo.removeModel(modelToRemove);
}
}
}
}
}
protected class FileExtensionsFilter implements FilenameFilter {
private String[] validExtensions;
public FileExtensionsFilter(String[] validExtensions) {
this.validExtensions = validExtensions;
}
@Override
public boolean accept(File dir, String name) {
if (validExtensions != null && validExtensions.length > 0) {
for (String extension : validExtensions) {
if (name.toLowerCase().endsWith("." + extension)) {
return true;
}
}
}
return false;
}
}
@SuppressWarnings("rawtypes")
private static void checkFile(ModelRepository modelRepo, File file,
Kind kind) {
if (modelRepo != null && file != null) {
try {
synchronized (FolderObserver.class) {
if ((kind == ENTRY_CREATE || kind == ENTRY_MODIFY)
&& file != null) {
modelRepo.addOrRefreshModel(file.getName(),
FileUtils.openInputStream(file));
} else if (kind == ENTRY_DELETE) {
modelRepo.removeModel(file.getName());
}
}
} catch (IOException e) {
logger.warn("Cannot open file '" + file.getAbsolutePath()
+ "' for reading.", e);
}
}
}
private static File getFileByFileExtMap(
Map<String, String[]> folderFileExtMap, String filename) {
if (StringUtils.isNotBlank(filename)
&& MapUtils.isNotEmpty(folderFileExtMap)) {
String extension = getExtension(filename);
if (StringUtils.isNotBlank(extension)) {
Set<Entry<String, String[]>> entries = folderFileExtMap
.entrySet();
Iterator<Entry<String, String[]>> iterator = entries.iterator();
while (iterator.hasNext()) {
Entry<String, String[]> entry = iterator.next();
if (ArrayUtils.contains(entry.getValue(), extension)) {
return new File(getFile(entry.getKey())
+ File.separator + filename);
}
}
}
}
return null;
}
/**
* returns the {@link File} object for a given foldername
* @param foldername the foldername to get the {@link File} for
* Returns the {@link File} object for the given filename. <br />
* It must be contained in the configuration folder
*
* @param filename
* the file name to get the {@link File} for
* @return the corresponding {@link File}
*/
private File getFolder(String foldername) {
public static File getFile(String filename) {
File folder = new File(ConfigDispatcher.getConfigFolder()
+ File.separator + foldername);
+ File.separator + filename);
return folder;
}
/**
* Returns the extension of the given file
*
* @param filename
* the file name to get the extension
* @return the file's extension
*/
public static String getExtension(String filename) {
String fileExt = filename.substring(filename.lastIndexOf(".") + 1);
return fileExt;
}
}