* Adding Kafka Trigger

* Add methods for windows support

* remove CustomBindings
This commit is contained in:
Tsuyoshi Ushio 2020-07-06 19:13:18 -07:00 коммит произвёл GitHub
Родитель 3c4e141c57
Коммит b12f2d4f70
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 272 добавлений и 1 удалений

5
.gitignore поставляемый
Просмотреть файл

@ -35,4 +35,7 @@ hs_err_pid*
/azure-functions-java-worker/
#IDE
*.iml
*.iml
#OSX
.DS_Store

Просмотреть файл

@ -0,0 +1,25 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for
* license information.
*/
package com.microsoft.azure.functions;
/**
* Defines the broker authentication modes
*/
public enum BrokerAuthenticationMode {
NOTSET(-1),
GSSAPI(0),
PLAIN(1),
SCRAMSHA256(2),
SCRAMSHA512(3);
private final int value;
BrokerAuthenticationMode(final int value) {
this.value = value;
}
public int getValue() { return value; }
}

Просмотреть файл

@ -0,0 +1,19 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for
* license information.
*/
package com.microsoft.azure.functions;
public enum BrokerProtocol {
NOTSET(-1),
PLAINTEXT(0),
SSL(1),
SASLPLAINTEXT(2),
SASLSSL(3);
private int value;
BrokerProtocol(final int value) {
this.value = value;
}
}

Просмотреть файл

@ -0,0 +1,115 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for
* license information.
*/
package com.microsoft.azure.functions.annotation;
import com.microsoft.azure.functions.BrokerAuthenticationMode;
import com.microsoft.azure.functions.BrokerProtocol;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* <p>Annotation for Kafka output bindings</p>
*/
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
public @interface KafkaOutput {
/**
* Gets the Topic.
* @return
*/
String topic();
/**
* Gets or sets the BrokerList.
*/
String brokerList();
/**
* Gets or sets the Maximum transmit message size. Default: 1MB
*/
int maxMessageBytes() default 1000012; // Follow the kafka spec https://kafka.apache.org/documentation/
/**
* Maximum number of messages batched in one MessageSet. default: 10000
*/
int batchSize() default 10000;
/**
* When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. default: false
*/
boolean enableIdempotence() default false;
/**
* Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. default: 300000
*/
int messageTimeoutMs() default 300000;
/**
* The ack timeout of the producer request in milliseconds. default: 5000
*/
int requestTimeoutMs() default 5000;
/**
* How many times to retry sending a failing Message. **Note:** default: 2
* Retrying may cause reordering unless EnableIdempotence is set to true.
* @see #enableIdempotence()
*/
int maxRetries() default 2;
/**
* SASL mechanism to use for authentication.
* Default: PLAIN
*/
BrokerAuthenticationMode authenticationMode() default BrokerAuthenticationMode.NOTSET; // TODO double check if it is OK
/**
* SASL username with the PLAIN and SASL-SCRAM-.. mechanisms
* Default: ""
*/
String username() default "";
/**
* SASL password with the PLAIN and SASL-SCRAM-.. mechanisms
* Default is plaintext
*
* security.protocol in librdkafka
*/
String password() default "";
/**
* Gets or sets the security protocol used to communicate with brokers
* default is PLAINTEXT
*/
BrokerProtocol protocol() default BrokerProtocol.NOTSET;
/**
* Path to client's private key (PEM) used for authentication.
* Default ""
* ssl.key.location in librdkafka
*/
String sslKeyLocation() default "";
/**
* Path to CA certificate file for verifying the broker's certificate.
* ssl.ca.location in librdkafka
*/
String sslCaLocation() default "";
/**
* Path to client's certificate.
* ssl.certificate.location in librdkafka
*/
String sslCertificateLocation() default "";
/**
* Password for client's certificate.
* ssl.key.password in librdkafka
*/
String sslKeyPassword() default "";
}

Просмотреть файл

@ -0,0 +1,109 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for
* license information.
*/
package com.microsoft.azure.functions.annotation;
import com.microsoft.azure.functions.BrokerAuthenticationMode;
import com.microsoft.azure.functions.BrokerProtocol;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.ElementType;
/**
* <p>Annotation for KafkaTrigger bindings</p>
*/
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
public @interface KafkaTrigger {
/**
* Gets the Topic.
*/
String topic();
/**
* Gets or sets the BrokerList.
*/
String brokerList();
/**
* Gets or sets the EventHub connection string when using KafkaOutput protocol header feature of Azure EventHubs.
*/
String eventHubConnectionString() default "";
/**
* Cardinality of the trigger input.
* Choose 'One' if the input is a single message or 'Many' if the input is an array of messages.
* If you choose 'Many', please set a dataType.
* Default: 'One'
*/
Cardinality cardinality() default Cardinality.ONE;
/**
* DataType for the Cardinality settings. If you set the cardinality as Cardinality.MANY, Azure Functions Host will deserialize
* the kafka events as an array of this type.
* Allowed values: string, binary, stream
* Default: ""
*/
String dataType() default "";
/**
* Gets or sets the consumer group.
*/
String consumerGroup();
/**
* SASL mechanism to use for authentication.
* Allowed values: Gssapi, Plain, ScramSha256, ScramSha512
* Default: PLAIN
*/
BrokerAuthenticationMode authenticationMode() default BrokerAuthenticationMode.NOTSET;
/**
* SASL username with the PLAIN and SASL-SCRAM-.. mechanisms
* Default: ""
*/
String username() default "";
/**
* SASL password with the PLAIN and SASL-SCRAM-.. mechanisms
* Default: ""
*
* security.protocol in librdkafka
*/
String password() default "";
/**
* Gets or sets the security protocol used to communicate with brokers
* default is PLAINTEXT
*/
BrokerProtocol protocol() default BrokerProtocol.NOTSET;
/**
* Path to client's private key (PEM) used for authentication.
* Default ""
* ssl.key.location in librdkafka
*/
String sslKeyLocation() default "";
/**
* Path to CA certificate file for verifying the broker's certificate.
* ssl.ca.location in librdkafka
*/
String sslCaLocation() default "";
/**
* Path to client's certificate.
* ssl.certificate.location in librdkafka
*/
String sslCertificateLocation() default "";
/**
* Password for client's certificate.
* ssl.key.password in librdkafka
*/
String sslKeyPassword() default "";
}