NIFIREG-160 - Making event hooks asynchronous

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #117.
This commit is contained in:
Bryan Bende 2018-05-08 13:38:33 -04:00 коммит произвёл Pierre Villard
Родитель 872a5b5dc8
Коммит 51b6c78b6f
40 изменённых файлов: 1094 добавлений и 317 удалений

Просмотреть файл

@ -170,7 +170,7 @@ The following binary components are provided under the Apache Software License v
(ASLv2) Spring Security
The following NOTICE information applies:
Spring Framework 5.0.0.RELEASE
Spring Framework 5.0.5.RELEASE
Copyright (c) 2002-2017 Pivotal, Inc.
This product includes software developed by Spring Security

Просмотреть файл

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.event;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.hook.Event;
import org.apache.nifi.registry.hook.EventFieldName;
import org.apache.nifi.registry.hook.EventType;
import org.apache.nifi.registry.security.authorization.user.NiFiUserUtils;
/**
* Factory to create Events from domain objects.
*/
public class EventFactory {
public static Event bucketCreated(final Bucket bucket) {
return new StandardEvent.Builder()
.eventType(EventType.CREATE_BUCKET)
.addField(EventFieldName.BUCKET_ID, bucket.getIdentifier())
.addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
.build();
}
public static Event bucketUpdated(final Bucket bucket) {
return new StandardEvent.Builder()
.eventType(EventType.UPDATE_BUCKET)
.addField(EventFieldName.BUCKET_ID, bucket.getIdentifier())
.addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
.build();
}
public static Event bucketDeleted(final Bucket bucket) {
return new StandardEvent.Builder()
.eventType(EventType.DELETE_BUCKET)
.addField(EventFieldName.BUCKET_ID, bucket.getIdentifier())
.addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
.build();
}
public static Event flowCreated(final VersionedFlow versionedFlow) {
return new StandardEvent.Builder()
.eventType(EventType.CREATE_FLOW)
.addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier())
.addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier())
.addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
.build();
}
public static Event flowUpdated(final VersionedFlow versionedFlow) {
return new StandardEvent.Builder()
.eventType(EventType.UPDATE_FLOW)
.addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier())
.addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier())
.addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
.build();
}
public static Event flowDeleted(final VersionedFlow versionedFlow) {
return new StandardEvent.Builder()
.eventType(EventType.DELETE_FLOW)
.addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier())
.addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier())
.addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
.build();
}
public static Event flowVersionCreated(final VersionedFlowSnapshot versionedFlowSnapshot) {
return new StandardEvent.Builder()
.eventType(EventType.CREATE_FLOW_VERSION)
.addField(EventFieldName.BUCKET_ID, versionedFlowSnapshot.getSnapshotMetadata().getBucketIdentifier())
.addField(EventFieldName.FLOW_ID, versionedFlowSnapshot.getSnapshotMetadata().getFlowIdentifier())
.addField(EventFieldName.VERSION, String.valueOf(versionedFlowSnapshot.getSnapshotMetadata().getVersion()))
.addField(EventFieldName.USER, versionedFlowSnapshot.getSnapshotMetadata().getAuthor())
.addField(EventFieldName.COMMENT, versionedFlowSnapshot.getSnapshotMetadata().getComments())
.build();
}
}

Просмотреть файл

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.event;
import org.apache.nifi.registry.hook.Event;
import org.apache.nifi.registry.hook.EventHookProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Service used for publishing events and passing events to the hook providers.
*/
@Service
public class EventService implements DisposableBean {
private static final Logger LOGGER = LoggerFactory.getLogger(EventService.class);
// Should only be a few events in the queue at a time, but setting a capacity just so it isn't unbounded
static final int EVENT_QUEUE_SIZE = 10_000;
private final BlockingQueue<Event> eventQueue;
private final ExecutorService scheduledExecutorService;
private final List<EventHookProvider> eventHookProviders;
@Autowired
public EventService(final List<EventHookProvider> eventHookProviders) {
this.eventQueue = new LinkedBlockingQueue<>(EVENT_QUEUE_SIZE);
this.scheduledExecutorService = Executors.newSingleThreadExecutor();
this.eventHookProviders = new ArrayList<>(eventHookProviders);
}
@PostConstruct
public void postConstruct() {
LOGGER.info("Starting event consumer...");
this.scheduledExecutorService.execute(() -> {
while (!Thread.interrupted()) {
try {
final Event event = eventQueue.poll(1000, TimeUnit.MILLISECONDS);
if (event == null) {
continue;
}
// event was available so notify each provider, contain errors per-provider
for(final EventHookProvider provider : eventHookProviders) {
try {
provider.handle(event);
} catch (Exception e) {
LOGGER.error("Error handling event hook", e);
}
}
} catch (InterruptedException e) {
LOGGER.warn("Interrupted while polling event queue");
return;
}
}
});
LOGGER.info("Event consumer started!");
}
@Override
public void destroy() throws Exception {
LOGGER.info("Shutting down event consumer...");
this.scheduledExecutorService.shutdownNow();
LOGGER.info("Event consumer shutdown!");
}
public void publish(final Event event) {
if (event == null) {
return;
}
try {
event.validate();
final boolean queued = eventQueue.offer(event);
if (!queued) {
LOGGER.error("Unable to queue event because queue is full");
}
} catch (IllegalStateException e) {
LOGGER.error("Invalid event due to: " + e.getMessage(), e);
}
}
}

Просмотреть файл

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.event;
import org.apache.commons.lang3.Validate;
import org.apache.nifi.registry.hook.Event;
import org.apache.nifi.registry.hook.EventField;
import org.apache.nifi.registry.hook.EventFieldName;
import org.apache.nifi.registry.hook.EventType;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* Standard implementation of Event.
*/
public class StandardEvent implements Event {
private final EventType eventType;
private final List<EventField> eventFields;
private StandardEvent(final Builder builder) {
this.eventType = builder.eventType;
this.eventFields = Collections.unmodifiableList(builder.eventFields == null
? Collections.emptyList() : new ArrayList<>(builder.eventFields));
Validate.notNull(this.eventType);
}
@Override
public EventType getEventType() {
return eventType;
}
@Override
public List<EventField> getFields() {
return eventFields;
}
@Override
public EventField getField(final EventFieldName fieldName) {
if (fieldName == null) {
return null;
}
return eventFields.stream().filter(e -> fieldName.equals(e.getName())).findFirst().orElse(null);
}
@Override
public void validate() throws IllegalStateException {
final int numProvidedFields = eventFields.size();
final int numRequiredFields = eventType.getFieldNames().size();
if (numProvidedFields != numRequiredFields) {
throw new IllegalStateException(numRequiredFields + " fields were required, but only " + numProvidedFields + " were provided");
}
for (int i=0; i < numRequiredFields; i++) {
final EventFieldName required = eventType.getFieldNames().get(i);
final EventFieldName provided = eventFields.get(i).getName();
if (!required.equals(provided)) {
throw new IllegalStateException("Expected " + required.name() + ", but found " + provided.name());
}
}
}
/**
* Builder for Events.
*/
public static class Builder {
private EventType eventType;
private List<EventField> eventFields = new ArrayList<>();
public Builder eventType(final EventType eventType) {
this.eventType = eventType;
return this;
}
public Builder addField(final EventFieldName name, final String value) {
this.eventFields.add(new StandardEventField(name, value));
return this;
}
public Builder addField(final EventField arg) {
if (arg != null) {
this.eventFields.add(arg);
}
return this;
}
public Builder addFields(final Collection<EventField> fields) {
if (fields != null) {
this.eventFields.addAll(fields);
}
return this;
}
public Builder clearFields() {
this.eventFields.clear();
return this;
}
public Event build() {
return new StandardEvent(this);
}
}
}

Просмотреть файл

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.event;
import org.apache.commons.lang3.Validate;
import org.apache.nifi.registry.hook.EventField;
import org.apache.nifi.registry.hook.EventFieldName;
/**
* Standard implementation of EventField.
*/
public class StandardEventField implements EventField {
private final EventFieldName name;
private final String value;
public StandardEventField(final EventFieldName name, final String value) {
this.name = name;
this.value = value;
Validate.notNull(this.name);
Validate.notBlank(this.value);
}
@Override
public EventFieldName getName() {
return name;
}
@Override
public String getValue() {
return value;
}
}

Просмотреть файл

@ -17,12 +17,12 @@
package org.apache.nifi.registry.extension;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.registry.hook.EventHookProvider;
import org.apache.nifi.registry.security.authentication.IdentityProvider;
import org.apache.nifi.registry.security.authorization.AccessPolicyProvider;
import org.apache.nifi.registry.security.authorization.Authorizer;
import org.apache.nifi.registry.security.authorization.UserGroupProvider;
import org.apache.nifi.registry.flow.FlowPersistenceProvider;
import org.apache.nifi.registry.hook.FlowHookProvider;
import org.apache.nifi.registry.properties.NiFiRegistryProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -57,7 +57,7 @@ public class ExtensionManager {
classes.add(AccessPolicyProvider.class);
classes.add(Authorizer.class);
classes.add(IdentityProvider.class);
classes.add(FlowHookProvider.class);
classes.add(EventHookProvider.class);
EXTENSION_CLASSES = Collections.unmodifiableList(classes);
}

Просмотреть файл

@ -19,7 +19,7 @@ package org.apache.nifi.registry.provider;
import java.util.List;
import org.apache.nifi.registry.flow.FlowPersistenceProvider;
import org.apache.nifi.registry.hook.FlowHookProvider;
import org.apache.nifi.registry.hook.EventHookProvider;
/**
* A factory for obtaining the configured providers.
@ -41,6 +41,6 @@ public interface ProviderFactory {
/**
* @return the configured FlowHookProviders
*/
List<FlowHookProvider> getFlowHookProviders();
List<EventHookProvider> getEventHookProviders();
}

Просмотреть файл

@ -18,7 +18,7 @@ package org.apache.nifi.registry.provider;
import org.apache.nifi.registry.extension.ExtensionManager;
import org.apache.nifi.registry.flow.FlowPersistenceProvider;
import org.apache.nifi.registry.hook.FlowHookProvider;
import org.apache.nifi.registry.hook.EventHookProvider;
import org.apache.nifi.registry.properties.NiFiRegistryProperties;
import org.apache.nifi.registry.provider.generated.Property;
import org.apache.nifi.registry.provider.generated.Providers;
@ -74,7 +74,7 @@ public class StandardProviderFactory implements ProviderFactory {
private final AtomicReference<Providers> providersHolder = new AtomicReference<>(null);
private FlowPersistenceProvider flowPersistenceProvider;
private List<FlowHookProvider> flowHookProviders;
private List<EventHookProvider> eventHookProviders;
@Autowired
public StandardProviderFactory(final NiFiRegistryProperties properties, final ExtensionManager extensionManager) {
@ -156,52 +156,52 @@ public class StandardProviderFactory implements ProviderFactory {
@Bean
@Override
public List<FlowHookProvider> getFlowHookProviders() {
if (flowHookProviders == null) {
flowHookProviders = new ArrayList<FlowHookProvider>();
public List<EventHookProvider> getEventHookProviders() {
if (eventHookProviders == null) {
eventHookProviders = new ArrayList<>();
if (providersHolder.get() == null) {
throw new ProviderFactoryException("ProviderFactory must be initialized before obtaining a Provider");
}
final Providers providers = providersHolder.get();
final List<org.apache.nifi.registry.provider.generated.Provider> jaxbFlowHookProvider = providers.getFlowHookProvider();
final List<org.apache.nifi.registry.provider.generated.Provider> jaxbHookProvider = providers.getEventHookProvider();
if(jaxbFlowHookProvider == null || jaxbFlowHookProvider.isEmpty()) {
if(jaxbHookProvider == null || jaxbHookProvider.isEmpty()) {
// no hook provided
return flowHookProviders;
return eventHookProviders;
}
for (org.apache.nifi.registry.provider.generated.Provider flowHookProvider : jaxbFlowHookProvider) {
for (org.apache.nifi.registry.provider.generated.Provider hookProvider : jaxbHookProvider) {
final String flowHookProviderClassName = flowHookProvider.getClazz();
FlowHookProvider hook;
final String hookProviderClassName = hookProvider.getClazz();
EventHookProvider hook;
try {
final ClassLoader classLoader = extensionManager.getExtensionClassLoader(flowHookProviderClassName);
final ClassLoader classLoader = extensionManager.getExtensionClassLoader(hookProviderClassName);
if (classLoader == null) {
throw new IllegalStateException("Extension not found in any of the configured class loaders: " + flowHookProviderClassName);
throw new IllegalStateException("Extension not found in any of the configured class loaders: " + hookProviderClassName);
}
final Class<?> rawFlowHookProviderClass = Class.forName(flowHookProviderClassName, true, classLoader);
final Class<? extends FlowHookProvider> flowHookProviderClass = rawFlowHookProviderClass.asSubclass(FlowHookProvider.class);
final Class<?> rawHookProviderClass = Class.forName(hookProviderClassName, true, classLoader);
final Class<? extends EventHookProvider> hookProviderClass = rawHookProviderClass.asSubclass(EventHookProvider.class);
final Constructor constructor = flowHookProviderClass.getConstructor();
hook = (FlowHookProvider) constructor.newInstance();
final Constructor constructor = hookProviderClass.getConstructor();
hook = (EventHookProvider) constructor.newInstance();
LOGGER.info("Instantiated FlowHookProvider with class name {}", new Object[] {flowHookProviderClassName});
LOGGER.info("Instantiated EventHookProvider with class name {}", new Object[] {hookProviderClassName});
} catch (Exception e) {
throw new ProviderFactoryException("Error creating FlowHookProvider with class name: " + flowHookProviderClassName, e);
throw new ProviderFactoryException("Error creating EventHookProvider with class name: " + hookProviderClassName, e);
}
final ProviderConfigurationContext configurationContext = createConfigurationContext(flowHookProvider.getProperty());
final ProviderConfigurationContext configurationContext = createConfigurationContext(hookProvider.getProperty());
hook.onConfigured(configurationContext);
flowHookProviders.add(hook);
LOGGER.info("Configured FlowHookProvider with class name {}", new Object[] {flowHookProviderClassName});
eventHookProviders.add(hook);
LOGGER.info("Configured EventHookProvider with class name {}", new Object[] {hookProviderClassName});
}
}
return flowHookProviders;
return eventHookProviders;
}
private ProviderConfigurationContext createConfigurationContext(final List<Property> configProperties) {

Просмотреть файл

@ -35,7 +35,6 @@ public class StandardFlowSnapshotContext implements FlowSnapshotContext {
private final String comments;
private final String author;
private final long snapshotTimestamp;
private final String author;
private StandardFlowSnapshotContext(final Builder builder) {
this.bucketId = builder.bucketId;
@ -46,7 +45,6 @@ public class StandardFlowSnapshotContext implements FlowSnapshotContext {
this.comments = builder.comments;
this.author = builder.author;
this.snapshotTimestamp = builder.snapshotTimestamp;
this.author = builder.author;
Validate.notBlank(bucketId);
Validate.notBlank(bucketName);
@ -109,7 +107,6 @@ public class StandardFlowSnapshotContext implements FlowSnapshotContext {
private String comments;
private String author;
private long snapshotTimestamp;
private String author;
public Builder() {
@ -124,7 +121,6 @@ public class StandardFlowSnapshotContext implements FlowSnapshotContext {
comments(snapshotMetadata.getComments());
author(snapshotMetadata.getAuthor());
snapshotTimestamp(snapshotMetadata.getTimestamp());
author(snapshotMetadata.getAuthor());
}
public Builder bucketId(final String bucketId) {
@ -167,11 +163,6 @@ public class StandardFlowSnapshotContext implements FlowSnapshotContext {
return this;
}
public Builder author(final String author) {
this.author = author;
return this;
}
public StandardFlowSnapshotContext build() {
return new StandardFlowSnapshotContext(this);
}

Просмотреть файл

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.provider.hook;
import org.apache.nifi.registry.hook.Event;
import org.apache.nifi.registry.hook.EventField;
import org.apache.nifi.registry.hook.EventHookException;
import org.apache.nifi.registry.hook.EventHookProvider;
import org.apache.nifi.registry.provider.ProviderConfigurationContext;
import org.apache.nifi.registry.provider.ProviderCreationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LoggingEventHookProvider implements EventHookProvider {
static final Logger LOGGER = LoggerFactory.getLogger(LoggingEventHookProvider.class);
@Override
public void onConfigured(final ProviderConfigurationContext configurationContext) throws ProviderCreationException {
// nothing to do
}
@Override
public void handle(final Event event) throws EventHookException {
final StringBuilder builder = new StringBuilder()
.append(event.getEventType())
.append(" [");
int count = 0;
for (final EventField argument : event.getFields()) {
if (count > 0) {
builder.append(", ");
}
builder.append(argument.getName()).append("=").append(argument.getValue());
count++;
}
builder.append("] ");
LOGGER.info(builder.toString());
}
}

Просмотреть файл

@ -23,10 +23,9 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.registry.flow.FlowSnapshotContext;
import org.apache.nifi.registry.hook.FlowHookEvent;
import org.apache.nifi.registry.hook.FlowHookException;
import org.apache.nifi.registry.hook.FlowHookProvider;
import org.apache.nifi.registry.hook.Event;
import org.apache.nifi.registry.hook.EventField;
import org.apache.nifi.registry.hook.EventHookProvider;
import org.apache.nifi.registry.provider.ProviderConfigurationContext;
import org.apache.nifi.registry.provider.ProviderCreationException;
import org.apache.nifi.registry.util.FileUtils;
@ -34,77 +33,24 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A FlowHookProvider that is used to execute a script before a flow snapshot version is committed.
* A EventHookProvider that is used to execute a script to handle the event.
*/
public class ScriptFlowHookProvider implements FlowHookProvider {
public class ScriptEventHookProvider implements EventHookProvider {
static final Logger LOGGER = LoggerFactory.getLogger(ScriptFlowHookProvider.class);
static final Logger LOGGER = LoggerFactory.getLogger(ScriptEventHookProvider.class);
static final String SCRIPT_PATH_PROP = "Script Path";
static final String SCRIPT_WORKDIR_PROP = "Working Directory";
private File scriptFile;
private File workDirFile;
@Override
public void postCreateBucket(String bucketId) throws FlowHookException {
this.executeScript(FlowHookEvent.CREATE_BUCKET, bucketId, null, null, null, null);
}
@Override
public void postCreateFlow(String bucketId, String flowId) throws FlowHookException {
this.executeScript(FlowHookEvent.CREATE_FLOW, bucketId, flowId, null, null, null);
}
@Override
public void postDeleteBucket(String bucketId) throws FlowHookException {
this.executeScript(FlowHookEvent.DELETE_BUCKET, bucketId, null, null, null, null);
}
@Override
public void postDeleteFlow(String bucketId, String flowId) throws FlowHookException {
this.executeScript(FlowHookEvent.DELETE_FLOW, bucketId, flowId, null, null, null);
}
@Override
public void postDeleteFlowVersion(String bucketId, String flowId, int version) throws FlowHookException {
this.executeScript(FlowHookEvent.DELETE_FLOW, bucketId, flowId, Integer.toString(version), null, null);
}
@Override
public void postUpdateBucket(String bucketId) throws FlowHookException {
this.executeScript(FlowHookEvent.UPDATE_BUCKET, bucketId, null, null, null, null);
}
@Override
public void postUpdateFlow(String bucketId, String flowId) throws FlowHookException {
this.executeScript(FlowHookEvent.UPDATE_FLOW, bucketId, flowId, null, null, null);
}
@Override
public void postCreateFlowVersion(final FlowSnapshotContext flowSnapshotContext) throws FlowHookException {
this.executeScript(FlowHookEvent.CREATE_VERSION, flowSnapshotContext.getBucketId(), flowSnapshotContext.getFlowId(),
Integer.toString(flowSnapshotContext.getVersion()), flowSnapshotContext.getComments(), flowSnapshotContext.getAuthor());
}
private void executeScript(final FlowHookEvent eventType, String bucketId, String flowId, String version, String comment, String author) {
public void handle(final Event event) {
List<String> command = new ArrayList<String>();
command.add(scriptFile.getAbsolutePath());
command.add(eventType.name());
command.add(bucketId);
command.add(event.getEventType().name());
if(flowId != null) {
command.add(flowId);
}
if(version != null) {
command.add(version);
}
if(comment != null) {
command.add(comment);
}
if(author != null) {
command.add(author);
for (EventField arg : event.getFields()) {
command.add(arg.getValue());
}
final String commandString = StringUtils.join(command, " ");
@ -143,7 +89,7 @@ public class ScriptFlowHookProvider implements FlowHookProvider {
scriptFile = new File(scripPath);
if(scriptFile.isFile() && scriptFile.canExecute()) {
LOGGER.info("Configured ScriptFlowHookProvider with script {}", new Object[] {scriptFile.getAbsolutePath()});
LOGGER.info("Configured ScriptEventHookProvider with script {}", new Object[] {scriptFile.getAbsolutePath()});
} else {
throw new ProviderCreationException("The script file " + scriptFile.getAbsolutePath() + " cannot be executed.");
}

Просмотреть файл

@ -42,7 +42,6 @@ import org.apache.nifi.registry.flow.diff.FlowComparison;
import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
import org.apache.nifi.registry.hook.FlowHookProvider;
import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext;
import org.apache.nifi.registry.serialization.Serializer;
import org.slf4j.Logger;
@ -85,7 +84,6 @@ public class RegistryService {
private final MetadataService metadataService;
private final FlowPersistenceProvider flowPersistenceProvider;
private final List<FlowHookProvider> flowHookProviders;
private final Serializer<VersionedProcessGroup> processGroupSerializer;
private final Validator validator;
@ -96,12 +94,10 @@ public class RegistryService {
@Autowired
public RegistryService(final MetadataService metadataService,
final FlowPersistenceProvider flowPersistenceProvider,
final List<FlowHookProvider> flowHookProviders,
final Serializer<VersionedProcessGroup> processGroupSerializer,
final Validator validator) {
this.metadataService = metadataService;
this.flowPersistenceProvider = flowPersistenceProvider;
this.flowHookProviders = flowHookProviders;
this.processGroupSerializer = processGroupSerializer;
this.validator = validator;
Validate.notNull(this.metadataService);
@ -138,17 +134,6 @@ public class RegistryService {
}
final BucketEntity createdBucket = metadataService.createBucket(DataModelMapper.map(bucket));
// call the post-event hook
for(FlowHookProvider flowHookProvider : flowHookProviders) {
try {
flowHookProvider.postCreateBucket(createdBucket.getId());
} catch (Exception e) {
// we don't want to throw anything here, hook are provided on best effort
LOGGER.error("Error while calling post-event hook", e);
}
}
return DataModelMapper.map(createdBucket);
} finally {
writeLock.unlock();
@ -240,17 +225,6 @@ public class RegistryService {
// perform the actual update
final BucketEntity updatedBucket = metadataService.updateBucket(existingBucketById);
// call the post-event hook
for(FlowHookProvider flowHookProvider : flowHookProviders) {
try {
flowHookProvider.postUpdateBucket(existingBucketById.getId());
} catch (Exception e) {
// we don't want to throw anything here, hook are provided on best effort
LOGGER.error("Error while calling post-event hook", e);
}
}
return DataModelMapper.map(updatedBucket);
} finally {
writeLock.unlock();
@ -279,16 +253,6 @@ public class RegistryService {
// now delete the bucket from the metadata provider, which deletes all flows referencing it
metadataService.deleteBucket(existingBucket);
// call the post-event hook
for(FlowHookProvider flowHookProvider : flowHookProviders) {
try {
flowHookProvider.postDeleteBucket(existingBucket.getId());
} catch (Exception e) {
// we don't want to throw anything here, hook are provided on best effort
LOGGER.error("Error while calling post-event hook", e);
}
}
return DataModelMapper.map(existingBucket);
} finally {
writeLock.unlock();
@ -392,17 +356,6 @@ public class RegistryService {
// persist the flow and return the created entity
final FlowEntity createdFlow = metadataService.createFlow(flowEntity);
// call the post-event hook
for(FlowHookProvider flowHookProvider : flowHookProviders) {
try {
flowHookProvider.postCreateFlow(existingBucket.getId(), createdFlow.getId());
} catch (Exception e) {
// we don't want to throw anything here, hook are provided on best effort
LOGGER.error("Error while calling post-event hook", e);
}
}
return DataModelMapper.map(existingBucket, createdFlow);
} finally {
writeLock.unlock();
@ -544,17 +497,6 @@ public class RegistryService {
// perform the actual update
final FlowEntity updatedFlow = metadataService.updateFlow(existingFlow);
// call the post-event hook
for(FlowHookProvider flowHookProvider : flowHookProviders) {
try {
flowHookProvider.postUpdateFlow(existingFlow.getBucketId(), existingFlow.getId());
} catch (Exception e) {
// we don't want to throw anything here, hook are provided on best effort
LOGGER.error("Error while calling post-event hook", e);
}
}
return DataModelMapper.map(existingBucket, updatedFlow);
} finally {
writeLock.unlock();
@ -595,16 +537,6 @@ public class RegistryService {
// now delete the flow from the metadata provider
metadataService.deleteFlow(existingFlow);
// call the post-event hook
for(FlowHookProvider flowHookProvider : flowHookProviders) {
try {
flowHookProvider.postDeleteFlow(existingFlow.getBucketId(), existingFlow.getId());
} catch (Exception e) {
// we don't want to throw anything here, hook are provided on best effort
LOGGER.error("Error while calling post-event hook", e);
}
}
return DataModelMapper.map(existingBucket, existingFlow);
} finally {
writeLock.unlock();
@ -698,16 +630,6 @@ public class RegistryService {
}
final VersionedFlow updatedVersionedFlow = DataModelMapper.map(existingBucket, updatedFlow);
// call the post-event hook
for(FlowHookProvider flowHookProvider : flowHookProviders) {
try {
flowHookProvider.postCreateFlowVersion(context);
} catch (Exception e) {
// we don't want to throw anything here, hook are provided on best effort
LOGGER.error("Error while calling post-event hook", e);
}
}
flowSnapshot.setBucket(bucket);
flowSnapshot.setFlow(updatedVersionedFlow);
return flowSnapshot;
@ -957,17 +879,6 @@ public class RegistryService {
// delete the snapshot itself
metadataService.deleteFlowSnapshot(snapshotEntity);
// call the post-event hook
for(FlowHookProvider flowHookProvider : flowHookProviders) {
try {
flowHookProvider.postDeleteFlowVersion(bucketIdentifier, flowIdentifier, version);
} catch (Exception e) {
// we don't want to throw anything here, hook are provided on best effort
LOGGER.error("Error while calling post-event hook", e);
}
}
return DataModelMapper.map(existingBucket, snapshotEntity);
} finally {
writeLock.unlock();

Просмотреть файл

@ -12,4 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.registry.provider.hook.ScriptFlowHookProvider
org.apache.nifi.registry.provider.hook.ScriptEventHookProvider
org.apache.nifi.registry.provider.hook.LoggingEventHookProvider

Просмотреть файл

@ -43,7 +43,7 @@
<xs:complexType>
<xs:sequence>
<xs:element name="flowPersistenceProvider" type="Provider" minOccurs="1" maxOccurs="1" />
<xs:element name="flowHookProvider" type="Provider" minOccurs="0" maxOccurs="unbounded" />
<xs:element name="eventHookProvider" type="Provider" minOccurs="0" maxOccurs="unbounded" />
</xs:sequence>
</xs:complexType>
</xs:element>

Просмотреть файл

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.event;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.hook.Event;
import org.apache.nifi.registry.hook.EventFieldName;
import org.apache.nifi.registry.hook.EventType;
import org.junit.Before;
import org.junit.Test;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
public class TestEventFactory {
private Bucket bucket;
private VersionedFlow versionedFlow;
private VersionedFlowSnapshot versionedFlowSnapshot;
@Before
public void setup() {
bucket = new Bucket();
bucket.setName("Bucket1");
bucket.setIdentifier(UUID.randomUUID().toString());
bucket.setCreatedTimestamp(System.currentTimeMillis());
versionedFlow = new VersionedFlow();
versionedFlow.setIdentifier(UUID.randomUUID().toString());
versionedFlow.setName("Flow 1");
versionedFlow.setBucketIdentifier(bucket.getIdentifier());
versionedFlow.setBucketName(bucket.getName());
VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
metadata.setAuthor("user1");
metadata.setComments("This is flow 1");
metadata.setVersion(1);
metadata.setBucketIdentifier(bucket.getIdentifier());
metadata.setFlowIdentifier(versionedFlow.getIdentifier());
versionedFlowSnapshot = new VersionedFlowSnapshot();
versionedFlowSnapshot.setSnapshotMetadata(metadata);
versionedFlowSnapshot.setFlowContents(new VersionedProcessGroup());
}
@Test
public void testBucketCreatedEvent() {
final Event event = EventFactory.bucketCreated(bucket);
event.validate();
assertEquals(EventType.CREATE_BUCKET, event.getEventType());
assertEquals(2, event.getFields().size());
assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals("unknown", event.getField(EventFieldName.USER).getValue());
}
@Test
public void testBucketUpdatedEvent() {
final Event event = EventFactory.bucketUpdated(bucket);
event.validate();
assertEquals(EventType.UPDATE_BUCKET, event.getEventType());
assertEquals(2, event.getFields().size());
assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals("unknown", event.getField(EventFieldName.USER).getValue());
}
@Test
public void testBucketDeletedEvent() {
final Event event = EventFactory.bucketDeleted(bucket);
event.validate();
assertEquals(EventType.DELETE_BUCKET, event.getEventType());
assertEquals(2, event.getFields().size());
assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals("unknown", event.getField(EventFieldName.USER).getValue());
}
@Test
public void testFlowCreated() {
final Event event = EventFactory.flowCreated(versionedFlow);
event.validate();
assertEquals(EventType.CREATE_FLOW, event.getEventType());
assertEquals(3, event.getFields().size());
assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals(versionedFlow.getIdentifier(), event.getField(EventFieldName.FLOW_ID).getValue());
assertEquals("unknown", event.getField(EventFieldName.USER).getValue());
}
@Test
public void testFlowUpdated() {
final Event event = EventFactory.flowUpdated(versionedFlow);
event.validate();
assertEquals(EventType.UPDATE_FLOW, event.getEventType());
assertEquals(3, event.getFields().size());
assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals(versionedFlow.getIdentifier(), event.getField(EventFieldName.FLOW_ID).getValue());
assertEquals("unknown", event.getField(EventFieldName.USER).getValue());
}
@Test
public void testFlowDeleted() {
final Event event = EventFactory.flowDeleted(versionedFlow);
event.validate();
assertEquals(EventType.DELETE_FLOW, event.getEventType());
assertEquals(3, event.getFields().size());
assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals(versionedFlow.getIdentifier(), event.getField(EventFieldName.FLOW_ID).getValue());
assertEquals("unknown", event.getField(EventFieldName.USER).getValue());
}
@Test
public void testFlowVersionedCreated() {
final Event event = EventFactory.flowVersionCreated(versionedFlowSnapshot);
event.validate();
assertEquals(EventType.CREATE_FLOW_VERSION, event.getEventType());
assertEquals(5, event.getFields().size());
assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals(versionedFlow.getIdentifier(), event.getField(EventFieldName.FLOW_ID).getValue());
assertEquals(String.valueOf(versionedFlowSnapshot.getSnapshotMetadata().getVersion()),
event.getField(EventFieldName.VERSION).getValue());
assertEquals(versionedFlowSnapshot.getSnapshotMetadata().getAuthor(),
event.getField(EventFieldName.USER).getValue());
assertEquals(versionedFlowSnapshot.getSnapshotMetadata().getComments(),
event.getField(EventFieldName.COMMENT).getValue());
}
}

Просмотреть файл

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.event;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.hook.Event;
import org.apache.nifi.registry.hook.EventHookException;
import org.apache.nifi.registry.hook.EventHookProvider;
import org.apache.nifi.registry.provider.ProviderConfigurationContext;
import org.apache.nifi.registry.provider.ProviderCreationException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
public class TestEventService {
private CapturingEventHook eventHook;
private EventService eventService;
@Before
public void setup() {
eventHook = new CapturingEventHook();
eventService = new EventService(Collections.singletonList(eventHook));
eventService.postConstruct();
}
@After
public void teardown() throws Exception {
eventService.destroy();
}
@Test
public void testPublishConsume() throws InterruptedException {
final Bucket bucket = new Bucket();
bucket.setIdentifier(UUID.randomUUID().toString());
final Event bucketCreatedEvent = EventFactory.bucketCreated(bucket);
eventService.publish(bucketCreatedEvent);
final Event bucketDeletedEvent = EventFactory.bucketDeleted(bucket);
eventService.publish(bucketDeletedEvent);
Thread.sleep(1000);
final List<Event> events = eventHook.getEvents();
Assert.assertEquals(2, events.size());
final Event firstEvent = events.get(0);
Assert.assertEquals(bucketCreatedEvent.getEventType(), firstEvent.getEventType());
final Event secondEvent = events.get(1);
Assert.assertEquals(bucketDeletedEvent.getEventType(), secondEvent.getEventType());
}
/**
* Simple implementation of EventHookProvider that captures event for later verification.
*/
private class CapturingEventHook implements EventHookProvider {
private List<Event> events = new ArrayList<>();
@Override
public void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException {
}
@Override
public void handle(Event event) throws EventHookException {
events.add(event);
}
public List<Event> getEvents() {
return events;
}
}
}

Просмотреть файл

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.event;
import org.apache.nifi.registry.hook.Event;
import org.apache.nifi.registry.hook.EventField;
import org.apache.nifi.registry.hook.EventFieldName;
import org.apache.nifi.registry.hook.EventType;
import org.junit.Assert;
import org.junit.Test;
public class TestStandardEvent {
@Test(expected = IllegalStateException.class)
public void testInvalidEvent() {
final Event event = new StandardEvent.Builder()
.eventType(EventType.CREATE_BUCKET)
.build();
event.validate();
}
@Test
public void testGetFieldWhenDoesNotExist() {
final Event event = new StandardEvent.Builder()
.eventType(EventType.CREATE_BUCKET)
.build();
final EventField field = event.getField(EventFieldName.BUCKET_ID);
Assert.assertNull(field);
}
}

Просмотреть файл

@ -27,7 +27,7 @@ import org.apache.nifi.registry.provider.StandardProviderFactory;
import org.junit.Test;
import org.mockito.Mockito;
public class TestScriptFlowHookProvider {
public class TestScriptEventHookProvider {
@Test(expected = ProviderCreationException.class)
public void testBadScriptProvider() {
@ -39,7 +39,7 @@ public class TestScriptFlowHookProvider {
final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager);
providerFactory.initialize();
providerFactory.getFlowHookProviders();
providerFactory.getEventHookProviders();
}
}

Просмотреть файл

@ -30,7 +30,6 @@ import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.registry.hook.FlowHookProvider;
import org.apache.nifi.registry.serialization.Serializer;
import org.apache.nifi.registry.serialization.VersionedProcessGroupSerializer;
import org.junit.Assert;
@ -73,7 +72,6 @@ public class TestRegistryService {
private MetadataService metadataService;
private FlowPersistenceProvider flowPersistenceProvider;
private FlowHookProvider flowHookProvider;
private Serializer<VersionedProcessGroup> snapshotSerializer;
private Validator validator;
@ -83,13 +81,12 @@ public class TestRegistryService {
public void setup() {
metadataService = mock(MetadataService.class);
flowPersistenceProvider = mock(FlowPersistenceProvider.class);
// flowHookProvider = mock(FlowHookProvider.class);
snapshotSerializer = mock(VersionedProcessGroupSerializer.class);
final ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory();
validator = validatorFactory.getValidator();
registryService = new RegistryService(metadataService, flowPersistenceProvider, new ArrayList<FlowHookProvider>(), snapshotSerializer, validator);
registryService = new RegistryService(metadataService, flowPersistenceProvider, snapshotSerializer, validator);
}
// ---------------------- Test Bucket methods ---------------------------------------------

Просмотреть файл

@ -21,10 +21,10 @@
<property name="Flow Property 2">flow bar</property>
</flowPersistenceProvider>
<flowHookProvider>
<class>org.apache.nifi.registry.provider.hook.ScriptFlowHookProvider</class>
<eventHookProvider>
<class>org.apache.nifi.registry.provider.hook.ScriptEventHookProvider</class>
<property name="Script Path"></property>
<property name="Working Directory"></property>
</flowHookProvider>
</eventHookProvider>
</providers>

Просмотреть файл

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.hook;
import java.util.List;
/**
* An event that will be passed to EventHookProviders.
*/
public interface Event {
/**
* @return the type of the event
*/
EventType getEventType();
/**
* @return the fields of the event in the order they were added to the event
*/
List<EventField> getFields();
/**
* @param fieldName the name of the field to return
* @return the EventField with the given name, or null if it does not exist
*/
EventField getField(EventFieldName fieldName);
/**
* Will be called before publishing the event to ensure the event contains the required
* fields for the given event type in the order specified by the type.
*
* @throws IllegalStateException if the event does not contain the required fields
*/
void validate() throws IllegalStateException;
}

Просмотреть файл

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.hook;
/**
* A field for an event.
*/
public interface EventField {
/**
* @return the name of the field
*/
EventFieldName getName();
/**
* @return the value of the field
*/
String getValue();
}

Просмотреть файл

@ -14,18 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.hook;
public enum FlowHookEvent {
CREATE_BUCKET,
CREATE_FLOW,
CREATE_VERSION,
/**
* Enumeration of possible field names for an EventField.
*/
public enum EventFieldName {
UPDATE_BUCKET,
UPDATE_FLOW,
BUCKET_ID,
FLOW_ID,
VERSION,
USER,
COMMENT;
DELETE_BUCKET,
DELETE_FLOW,
DELETE_VERSION;
}

Просмотреть файл

@ -17,15 +17,15 @@
package org.apache.nifi.registry.hook;
/**
* An Exception for errors encountered when a FlowHookProvider executes an action before/after a commit.
* An Exception for errors encountered when a EventHookProvider executes an action before/after a commit.
*/
public class FlowHookException extends RuntimeException {
public class EventHookException extends RuntimeException {
public FlowHookException(String message) {
public EventHookException(String message) {
super(message);
}
public FlowHookException(String message, Throwable cause) {
public EventHookException(String message, Throwable cause) {
super(message, cause);
}
}

Просмотреть файл

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.hook;
import org.apache.nifi.registry.provider.Provider;
/**
* An extension point that will be passed events produced by actions take in the registry.
*
* The list of event types can be found in {@link org.apache.nifi.registry.hook.EventType}.
*
* NOTE: Although this interface is intended to be an extension point, it is not yet considered stable and thus may
* change across releases until the registry matures.
*/
public interface EventHookProvider extends Provider {
/**
* Handles the given event.
*
* @param event the event to handle
* @throws EventHookException if an error occurs handling the event
*/
void handle(Event event) throws EventHookException;
}

Просмотреть файл

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.hook;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* Enumeration of possible EventTypes with the expected fields for each event.
*
* Producers of events must produce events with the fields in the same order specified here.
*/
public enum EventType {
CREATE_BUCKET(
EventFieldName.BUCKET_ID,
EventFieldName.USER),
CREATE_FLOW(
EventFieldName.BUCKET_ID,
EventFieldName.FLOW_ID,
EventFieldName.USER),
CREATE_FLOW_VERSION(
EventFieldName.BUCKET_ID,
EventFieldName.FLOW_ID,
EventFieldName.VERSION,
EventFieldName.USER,
EventFieldName.COMMENT),
UPDATE_BUCKET(
EventFieldName.BUCKET_ID,
EventFieldName.USER),
UPDATE_FLOW(
EventFieldName.BUCKET_ID,
EventFieldName.FLOW_ID,
EventFieldName.USER),
DELETE_BUCKET(
EventFieldName.BUCKET_ID,
EventFieldName.USER),
DELETE_FLOW(
EventFieldName.BUCKET_ID,
EventFieldName.FLOW_ID,
EventFieldName.USER);
private List<EventFieldName> fieldNames;
EventType(EventFieldName... fieldNames) {
this.fieldNames = Collections.unmodifiableList(Arrays.asList(fieldNames));
}
public List<EventFieldName> getFieldNames() {
return this.fieldNames;
}
}

Просмотреть файл

@ -1,81 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.hook;
import org.apache.nifi.registry.flow.FlowSnapshotContext;
import org.apache.nifi.registry.provider.Provider;
/**
* A service that defines post event action hook
*
* NOTE: Although this interface is intended to be an extension point, it is not yet considered stable and thus may
* change across releases until the registry matures.
*/
public interface FlowHookProvider extends Provider {
/**
* @param bucketId
* @throws FlowHookException
*/
default void postCreateBucket(String bucketId) throws FlowHookException { }
/**
* @param bucketId
* @param flowId
* @throws FlowHookException
*/
default void postCreateFlow(String bucketId, String flowId) throws FlowHookException { }
/**
* @param flowSnapshotContext
* @throws FlowHookException
*/
default void postCreateFlowVersion(FlowSnapshotContext flowSnapshotContext) throws FlowHookException { }
/**
* @param bucketId
* @throws FlowHookException
*/
default void postDeleteBucket(String bucketId) throws FlowHookException { }
/**
* @param bucketId
* @param flowId
* @throws FlowHookException
*/
default void postDeleteFlow(String bucketId, String flowId) throws FlowHookException { }
/**
* @param flowSnapshotContext
* @throws FlowHookException
*/
default void postDeleteFlowVersion(String bucketIdentifier, String flowIdentifier, int version) throws FlowHookException { }
/**
* @param bucketId
* @throws FlowHookException
*/
default void postUpdateBucket(String bucketId) throws FlowHookException { }
/**
* @param bucketId
* @param flowId
* @throws FlowHookException
*/
default void postUpdateFlow(String bucketId, String flowId) throws FlowHookException { }
}

Просмотреть файл

@ -58,6 +58,24 @@
</encoder>
</appender>
<appender name="EVENTS_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${org.apache.nifi.registry.bootstrap.config.log.dir}/nifi-registry-event.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!--
For daily rollover, use 'user_%d.log'.
For hourly rollover, use 'user_%d{yyyy-MM-dd_HH}.log'.
To GZIP rolled files, replace '.log' with '.log.gz'.
To ZIP rolled files, replace '.log' with '.log.zip'.
-->
<fileNamePattern>${org.apache.nifi.registry.bootstrap.config.log.dir}/nifi-registry-event_%d.log</fileNamePattern>
<!-- keep 5 log files worth of history -->
<maxHistory>5</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%date ## %msg%n</pattern>
</encoder>
</appender>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
@ -94,6 +112,11 @@
<appender-ref ref="BOOTSTRAP_FILE" />
</logger>
<!-- This will log all events to a separate file when the LoggingEventHookProvider is enabled in providers.xml -->
<logger name="org.apache.nifi.registry.provider.hook.LoggingEventHookProvider" level="INFO" additivity="false">
<appender-ref ref="EVENTS_FILE" />
</logger>
<root level="INFO">
<appender-ref ref="APP_FILE"/>
</root>

Просмотреть файл

@ -31,11 +31,18 @@
-->
<!--
<flowHookProvider>
<class>org.apache.nifi.registry.provider.hook.ScriptFlowHookProvider</class>
<eventHookProvider>
<class>org.apache.nifi.registry.provider.hook.ScriptEventHookProvider</class>
<property name="Script Path"></property>
<property name="Working Directory"></property>
</flowHookProvider>
</eventHookProvider>
-->
<!-- This will log all events to a separate file specified by the EVENT_APPENDER in logback.xml -->
<!--
<eventHookProvider>
<class>org.apache.nifi.registry.provider.hook.LoggingEventHookProvider</class>
</eventHookProvider>
-->
</providers>

Просмотреть файл

@ -27,6 +27,7 @@ import io.swagger.annotations.ExtensionProperty;
import org.apache.nifi.registry.authorization.AccessPolicy;
import org.apache.nifi.registry.authorization.AccessPolicySummary;
import org.apache.nifi.registry.authorization.Resource;
import org.apache.nifi.registry.event.EventService;
import org.apache.nifi.registry.exception.ResourceNotFoundException;
import org.apache.nifi.registry.security.authorization.Authorizer;
import org.apache.nifi.registry.security.authorization.AuthorizerCapabilityDetection;
@ -73,8 +74,9 @@ public class AccessPolicyResource extends AuthorizableApplicationResource {
@Autowired
public AccessPolicyResource(
Authorizer authorizer,
AuthorizationService authorizationService) {
super(authorizationService);
AuthorizationService authorizationService,
EventService eventService) {
super(authorizationService, eventService);
this.authorizer = authorizer;
}

Просмотреть файл

@ -24,6 +24,7 @@ import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.registry.authorization.CurrentUser;
import org.apache.nifi.registry.event.EventService;
import org.apache.nifi.registry.exception.AdministrationException;
import org.apache.nifi.registry.properties.NiFiRegistryProperties;
import org.apache.nifi.registry.security.authentication.AuthenticationRequest;
@ -86,7 +87,9 @@ public class AccessResource extends ApplicationResource {
JwtService jwtService,
X509IdentityProvider x509IdentityProvider,
@Nullable KerberosSpnegoIdentityProvider kerberosSpnegoIdentityProvider,
@Nullable IdentityProvider identityProvider) {
@Nullable IdentityProvider identityProvider,
EventService eventService) {
super(eventService);
this.properties = properties;
this.jwtService = jwtService;
this.x509IdentityProvider = x509IdentityProvider;

Просмотреть файл

@ -17,6 +17,9 @@
package org.apache.nifi.registry.web.api;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.nifi.registry.event.EventService;
import org.apache.nifi.registry.hook.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -52,6 +55,17 @@ public class ApplicationResource {
@Context
private UriInfo uriInfo;
private final EventService eventService;
public ApplicationResource(final EventService eventService) {
this.eventService = eventService;
Validate.notNull(this.eventService);
}
protected void publish(final Event event) {
eventService.publish(event);
}
protected String generateResourceUri(final String... path) {
final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
uriBuilder.segment(path);

Просмотреть файл

@ -18,6 +18,7 @@ package org.apache.nifi.registry.web.api;
import org.apache.nifi.registry.authorization.Resource;
import org.apache.nifi.registry.bucket.BucketItem;
import org.apache.nifi.registry.event.EventService;
import org.apache.nifi.registry.security.authorization.AuthorizableLookup;
import org.apache.nifi.registry.security.authorization.RequestAction;
import org.apache.nifi.registry.security.authorization.resource.Authorizable;
@ -38,7 +39,9 @@ public class AuthorizableApplicationResource extends ApplicationResource {
protected final AuthorizableLookup authorizableLookup;
protected AuthorizableApplicationResource(
AuthorizationService authorizationService) {
AuthorizationService authorizationService,
EventService eventService) {
super(eventService);
this.authorizationService = authorizationService;
this.authorizableLookup = authorizationService.getAuthorizableLookup();
}

Просмотреть файл

@ -27,6 +27,8 @@ import io.swagger.annotations.ExtensionProperty;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.registry.bucket.BucketItem;
import org.apache.nifi.registry.diff.VersionedFlowDifference;
import org.apache.nifi.registry.event.EventFactory;
import org.apache.nifi.registry.event.EventService;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
@ -76,8 +78,9 @@ public class BucketFlowResource extends AuthorizableApplicationResource {
final RegistryService registryService,
final LinkService linkService,
final PermissionsService permissionsService,
final AuthorizationService authorizationService) {
super(authorizationService);
final AuthorizationService authorizationService,
final EventService eventService) {
super(authorizationService, eventService);
this.registryService = registryService;
this.linkService = linkService;
this.permissionsService =permissionsService;
@ -111,7 +114,10 @@ public class BucketFlowResource extends AuthorizableApplicationResource {
authorizeBucketAccess(RequestAction.WRITE, bucketId);
verifyPathParamsMatchBody(bucketId, flow);
final VersionedFlow createdFlow = registryService.createFlow(bucketId, flow);
publish(EventFactory.flowCreated(createdFlow));
permissionsService.populateItemPermissions(createdFlow);
linkService.populateFlowLinks(createdFlow);
return Response.status(Response.Status.OK).entity(createdFlow).build();
@ -222,6 +228,7 @@ public class BucketFlowResource extends AuthorizableApplicationResource {
setBucketItemMetadataIfMissing(bucketId, flowId, flow);
final VersionedFlow updatedFlow = registryService.updateFlow(flow);
publish(EventFactory.flowUpdated(updatedFlow));
permissionsService.populateItemPermissions(updatedFlow);
linkService.populateFlowLinks(updatedFlow);
@ -256,6 +263,7 @@ public class BucketFlowResource extends AuthorizableApplicationResource {
authorizeBucketAccess(RequestAction.DELETE, bucketId);
final VersionedFlow deletedFlow = registryService.deleteFlow(bucketId, flowId);
publish(EventFactory.flowDeleted(deletedFlow));
return Response.status(Response.Status.OK).entity(deletedFlow).build();
}
@ -300,6 +308,8 @@ public class BucketFlowResource extends AuthorizableApplicationResource {
snapshot.getSnapshotMetadata().setAuthor(userIdentity);
final VersionedFlowSnapshot createdSnapshot = registryService.createFlowSnapshot(snapshot);
publish(EventFactory.flowVersionCreated(createdSnapshot));
if (createdSnapshot.getSnapshotMetadata() != null) {
linkService.populateSnapshotLinks(createdSnapshot.getSnapshotMetadata());
}

Просмотреть файл

@ -27,6 +27,8 @@ import io.swagger.annotations.ExtensionProperty;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.bucket.BucketItem;
import org.apache.nifi.registry.event.EventFactory;
import org.apache.nifi.registry.event.EventService;
import org.apache.nifi.registry.field.Fields;
import org.apache.nifi.registry.security.authorization.RequestAction;
import org.apache.nifi.registry.security.authorization.exception.AccessDeniedException;
@ -83,8 +85,9 @@ public class BucketResource extends AuthorizableApplicationResource {
final RegistryService registryService,
final LinkService linkService,
final PermissionsService permissionsService,
final AuthorizationService authorizationService) {
super(authorizationService);
final AuthorizationService authorizationService,
final EventService eventService) {
super(authorizationService, eventService);
this.registryService = registryService;
this.linkService = linkService;
this.permissionsService = permissionsService;
@ -110,7 +113,10 @@ public class BucketResource extends AuthorizableApplicationResource {
@ApiParam(value = "The bucket to create", required = true)
final Bucket bucket) {
authorizeAccess(RequestAction.WRITE);
final Bucket createdBucket = registryService.createBucket(bucket);
publish(EventFactory.bucketCreated(createdBucket));
permissionsService.populateBucketPermissions(createdBucket);
linkService.populateBucketLinks(createdBucket);
return Response.status(Response.Status.OK).entity(createdBucket).build();
@ -224,6 +230,8 @@ public class BucketResource extends AuthorizableApplicationResource {
authorizeBucketAccess(RequestAction.WRITE, bucketId);
final Bucket updatedBucket = registryService.updateBucket(bucket);
publish(EventFactory.bucketUpdated(updatedBucket));
permissionsService.populateBucketPermissions(updatedBucket);
linkService.populateBucketLinks(updatedBucket);
return Response.status(Response.Status.OK).entity(updatedBucket).build();
@ -256,7 +264,10 @@ public class BucketResource extends AuthorizableApplicationResource {
throw new BadRequestException("Bucket id cannot be blank");
}
authorizeBucketAccess(RequestAction.DELETE, bucketId);
final Bucket deletedBucket = registryService.deleteBucket(bucketId);
publish(EventFactory.bucketDeleted(deletedBucket));
return Response.status(Response.Status.OK).entity(deletedBucket).build();
}

Просмотреть файл

@ -25,6 +25,7 @@ import io.swagger.annotations.Authorization;
import io.swagger.annotations.Extension;
import io.swagger.annotations.ExtensionProperty;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.registry.event.EventService;
import org.apache.nifi.registry.field.Fields;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
@ -65,8 +66,9 @@ public class FlowResource extends AuthorizableApplicationResource {
public FlowResource(final RegistryService registryService,
final LinkService linkService,
final PermissionsService permissionsService,
final AuthorizationService authorizationService) {
super(authorizationService);
final AuthorizationService authorizationService,
final EventService eventService) {
super(authorizationService, eventService);
this.registryService = registryService;
this.linkService = linkService;
this.permissionsService = permissionsService;

Просмотреть файл

@ -25,6 +25,7 @@ import io.swagger.annotations.Authorization;
import io.swagger.annotations.Extension;
import io.swagger.annotations.ExtensionProperty;
import org.apache.nifi.registry.bucket.BucketItem;
import org.apache.nifi.registry.event.EventService;
import org.apache.nifi.registry.field.Fields;
import org.apache.nifi.registry.security.authorization.RequestAction;
import org.apache.nifi.registry.service.AuthorizationService;
@ -73,8 +74,9 @@ public class ItemResource extends AuthorizableApplicationResource {
final RegistryService registryService,
final LinkService linkService,
final PermissionsService permissionsService,
final AuthorizationService authorizationService) {
super(authorizationService);
final AuthorizationService authorizationService,
final EventService eventService) {
super(authorizationService, eventService);
this.registryService = registryService;
this.linkService = linkService;
this.permissionsService = permissionsService;

Просмотреть файл

@ -27,6 +27,7 @@ import io.swagger.annotations.ExtensionProperty;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.registry.authorization.User;
import org.apache.nifi.registry.authorization.UserGroup;
import org.apache.nifi.registry.event.EventService;
import org.apache.nifi.registry.exception.ResourceNotFoundException;
import org.apache.nifi.registry.security.authorization.Authorizer;
import org.apache.nifi.registry.security.authorization.AuthorizerCapabilityDetection;
@ -70,8 +71,8 @@ public class TenantResource extends AuthorizableApplicationResource {
private Authorizer authorizer;
@Autowired
public TenantResource(AuthorizationService authorizationService) {
super(authorizationService);
public TenantResource(AuthorizationService authorizationService, EventService eventService) {
super(authorizationService, eventService);
authorizer = authorizationService.getAuthorizer();
}

Просмотреть файл

@ -66,7 +66,7 @@ The following binary components are provided under the Apache Software License v
(ASLv2) Spring Security
The following NOTICE information applies:
Spring Framework 5.0.0.RELEASE
Spring Framework 5.0.5.RELEASE
Copyright (c) 2002-2017 Pivotal, Inc.
This product includes software developed by Spring Security

Просмотреть файл

@ -110,8 +110,8 @@
<jax.rs.api.version>2.1</jax.rs.api.version>
<jersey.version>2.26</jersey.version>
<jackson.version>2.9.2</jackson.version>
<spring.boot.version>2.0.0.M7</spring.boot.version>
<spring.security.version>5.0.0.RELEASE</spring.security.version>
<spring.boot.version>2.0.2.RELEASE</spring.boot.version>
<spring.security.version>5.0.5.RELEASE</spring.security.version>
<flyway.version>4.2.0</flyway.version>
</properties>