KAFKA-3598: Improve JavaDoc of public API

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Michael G. Noll, Guozhang Wang

Closes #1250 from mjsax/JavaDoc-publicAPI
This commit is contained in:
Matthias J. Sax 2016-04-29 08:49:16 -07:00 коммит произвёл Guozhang Wang
Родитель 68433dcfdc
Коммит 4ab4e4af81
43 изменённых файлов: 383 добавлений и 163 удалений

Просмотреть файл

@ -168,10 +168,10 @@ public class PageViewTypedDemo {
public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
wViewByRegion.windowStart = key.window().start();
wViewByRegion.region = key.value();
wViewByRegion.region = key.key();
RegionCount rCount = new RegionCount();
rCount.region = key.value();
rCount.region = key.key();
rCount.count = value;
return new KeyValue<>(wViewByRegion, rCount);

Просмотреть файл

@ -107,7 +107,7 @@ public class PageViewUntypedDemo {
public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
keyNode.put("window-start", key.window().start())
.put("region", key.value());
.put("region", key.key());
ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
valueNode.put("count", value);

Просмотреть файл

@ -97,6 +97,12 @@ public class KafkaStreams {
// usage only and should not be exposed to users at all.
private final UUID processId;
/**
* Construct the stream instance.
*
* @param builder the processor topology builder specifying the computational logic
* @param props properties for the {@link StreamsConfig}
*/
public KafkaStreams(TopologyBuilder builder, Properties props) {
this(builder, new StreamsConfig(props));
}
@ -104,8 +110,8 @@ public class KafkaStreams {
/**
* Construct the stream instance.
*
* @param builder The processor topology builder specifying the computational logic
* @param config The stream configs
* @param builder the processor topology builder specifying the computational logic
* @param config the stream configs
*/
public KafkaStreams(TopologyBuilder builder, StreamsConfig config) {
// create the metrics

Просмотреть файл

@ -29,14 +29,31 @@ import java.util.Objects;
*/
public class KeyValue<K, V> {
/** The key of the key-value pair. */
public final K key;
/** The value of the key-value pair. */
public final V value;
/**
* Create a new key-value pair.
*
* @param key the key
* @param value the value
*/
public KeyValue(K key, V value) {
this.key = key;
this.value = value;
}
/**
* Create a new key-value pair.
*
* @param key the key
* @param value the value
* @param <K> the type of the key
* @param <V> the type of the value
* @return a new key value pair
*/
public static <K, V> KeyValue<K, V> pair(K key, V value) {
return new KeyValue<>(key, value);
}

Просмотреть файл

@ -18,7 +18,7 @@
package org.apache.kafka.streams.kstream;
/**
* The Aggregator interface for aggregating values of the given key.
* The {@link Aggregator} interface for aggregating values of the given key.
*
* @param <K> key type
* @param <V> original value type
@ -26,5 +26,13 @@ package org.apache.kafka.streams.kstream;
*/
public interface Aggregator<K, V, T> {
/**
* Compute a new aggregate from the key and value of a record and the current aggregate of the same key.
*
* @param aggKey the key of the record
* @param value the value of the record
* @param aggregate the current aggregate value
* @return the new aggregate value
*/
T apply(K aggKey, V value, T aggregate);
}

Просмотреть файл

@ -18,9 +18,8 @@
package org.apache.kafka.streams.kstream;
/**
* The ForeachAction interface for performing an action on a key-value pair.
* The {@link ForeachAction} interface for performing an action on a key-value pair.
* Note that this action is stateless. If stateful processing is required, consider
* using {@link KStream#transform(TransformerSupplier, String...)} or
* {@link KStream#process(ProcessorSupplier, String...)} instead.
@ -29,6 +28,13 @@ package org.apache.kafka.streams.kstream;
* @param <V> original value type
*/
public interface ForeachAction<K, V> {
/**
* Perform an action for each record of a stream.
*
* @param key the key of the record
* @param value the value of the record
*/
void apply(K key, V value);
}

Просмотреть файл

@ -18,11 +18,16 @@
package org.apache.kafka.streams.kstream;
/**
* The Initializer interface for creating an initial value in aggregations.
* The {@link Initializer} interface for creating an initial value in aggregations.
*
* @param <T> aggregate value type
*/
public interface Initializer<T> {
/**
* Return the initial value for an aggregation.
*
* @return the initial value for an aggregation
*/
T apply();
}

Просмотреть файл

@ -26,7 +26,9 @@ import java.util.Map;
*/
public class JoinWindows extends Windows<TimeWindow> {
/** Maximum time difference for tuples that are before the join tuple. */
public final long before;
/** Maximum time difference for tuples that are after the join tuple. */
public final long after;
private JoinWindows(String name, long before, long after) {
@ -41,40 +43,41 @@ public class JoinWindows extends Windows<TimeWindow> {
}
/**
* Specifies that records of the same key are joinable if their timestamp stamps are within
* timeDifference.
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}.
*
* @param timeDifference join window interval in milliseconds
* @param timeDifference join window interval
*/
public JoinWindows within(long timeDifference) {
return new JoinWindows(this.name, timeDifference, timeDifference);
}
/**
* Specifies that records of the same key are joinable if their timestamp stamps are within
* Specifies that records of the same key are joinable if their timestamps are within
* the join window interval, and if the timestamp of a record from the secondary stream is
* earlier than or equal to the timestamp of a record from the first stream.
*
* @param timeDifference join window interval in milliseconds
* @param timeDifference join window interval
*/
public JoinWindows before(long timeDifference) {
return new JoinWindows(this.name, timeDifference, this.after);
}
/**
* Specifies that records of the same key are joinable if their timestamp stamps are within
* Specifies that records of the same key are joinable if their timestamps are within
* the join window interval, and if the timestamp of a record from the secondary stream
* is later than or equal to the timestamp of a record from the first stream.
*
* @param timeDifference join window interval in milliseconds
* @param timeDifference join window interval
*/
public JoinWindows after(long timeDifference) {
return new JoinWindows(this.name, this.before, timeDifference);
}
/**
* Not supported by {@link JoinWindows}. Throws {@link UnsupportedOperationException}.
*/
@Override
public Map<Long, TimeWindow> windowsFor(long timestamp) {
// this function should never be called
throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows");
}

Просмотреть файл

@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
/**
* KStream is an abstraction of a <i>record stream</i> of key-value pairs.
* {@link KStream} is an abstraction of a <i>record stream</i> of key-value pairs.
*
* @param <K> Type of keys
* @param <V> Type of values
@ -510,7 +510,7 @@ public interface KStream<K, V> {
String name);
/**
* Count number of messages of this stream by key on a window basis into a new instance of windowed {@link KTable}.
* Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
*
* @param windows the specification of the aggregation {@link Windows}
* @param keySerde key serdes for materializing the counting table,
@ -519,7 +519,7 @@ public interface KStream<K, V> {
<W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, Serde<K> keySerde);
/**
* Count number of messages of this stream by key on a window basis into a new instance of windowed {@link KTable}
* Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}
* with default serializers and deserializers.
*
* @param windows the specification of the aggregation {@link Windows}
@ -527,7 +527,7 @@ public interface KStream<K, V> {
<W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows);
/**
* Count number of messages of this stream by key into a new instance of ever-updating {@link KTable}.
* Count number of records of this stream by key into a new instance of ever-updating {@link KTable}.
*
* @param keySerde key serdes for materializing the counting table,
* if not specified the default serdes defined in the configs will be used
@ -536,7 +536,7 @@ public interface KStream<K, V> {
KTable<K, Long> countByKey(Serde<K> keySerde, String name);
/**
* Count number of messages of this stream by key into a new instance of ever-updating {@link KTable}
* Count number of records of this stream by key into a new instance of ever-updating {@link KTable}
* with default serializers and deserializers.
*
* @param name the name of the resulted {@link KTable}

Просмотреть файл

@ -28,19 +28,22 @@ import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
/**
* KStreamBuilder is a subclass of {@link TopologyBuilder} that provides the Kafka Streams DSL
* {@link KStreamBuilder} is a subclass of {@link TopologyBuilder} that provides the Kafka Streams DSL
* for users to specify computational logic and translates the given logic to a {@link org.apache.kafka.streams.processor.internals.ProcessorTopology}.
*/
public class KStreamBuilder extends TopologyBuilder {
private final AtomicInteger index = new AtomicInteger(0);
/**
* Create a new {@link KStreamBuilder} instance.
*/
public KStreamBuilder() {
super();
}
/**
* Creates a {@link KStream} instance from the specified topics.
* Create a {@link KStream} instance from the specified topics.
* The default deserializers specified in the config are used.
*
* @param topics the topic names; must contain at least one topic name
@ -50,7 +53,7 @@ public class KStreamBuilder extends TopologyBuilder {
}
/**
* Creates a {@link KStream} instance for the specified topics.
* Create a {@link KStream} instance for the specified topics.
*
* @param keySerde key serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
@ -67,7 +70,7 @@ public class KStreamBuilder extends TopologyBuilder {
}
/**
* Creates a {@link KTable} instance for the specified topic.
* Create a {@link KTable} instance for the specified topic.
* The default deserializers specified in the config are used.
*
* @param topic the topic name; cannot be null
@ -77,7 +80,7 @@ public class KStreamBuilder extends TopologyBuilder {
}
/**
* Creates a {@link KTable} instance for the specified topic.
* Create a {@link KTable} instance for the specified topic.
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@ -98,7 +101,7 @@ public class KStreamBuilder extends TopologyBuilder {
}
/**
* Creates a new instance of {@link KStream} by merging the given streams
* Create a new instance of {@link KStream} by merging the given streams.
*
* @param streams the instances of {@link KStream} to be merged
*/

Просмотреть файл

@ -23,7 +23,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.StreamPartitioner;
/**
* KTable is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
* {@link KTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
*
* @param <K> Type of primary keys
* @param <V> Type of value changes
@ -39,7 +39,7 @@ public interface KTable<K, V> {
KTable<K, V> filter(Predicate<K, V> predicate);
/**
* Create a new instance of {@link KTable} that consists all elements of this stream which do not satisfy a predicate
* Create a new instance of {@link KTable} that consists all elements of this stream which do not satisfy a predicate.
*
* @param predicate the instance of {@link Predicate}
*/
@ -55,7 +55,7 @@ public interface KTable<K, V> {
/**
* Print the elements of this stream to System.out
* Print the elements of this stream to {@code System.out}
*
* Implementors will need to override toString for keys and values that are not of
* type String, Integer etc to get meaningful information.
@ -63,7 +63,7 @@ public interface KTable<K, V> {
void print();
/**
* Print the elements of this stream to System.out
* Print the elements of this stream to {@code System.out}
* @param keySerde key serde used to send key-value pairs,
* if not specified the default serde defined in the configs will be used
* @param valSerde value serde used to send key-value pairs,
@ -75,15 +75,16 @@ public interface KTable<K, V> {
void print(Serde<K> keySerde, Serde<V> valSerde);
/**
* Write the elements of this stream to a file at the given path.
* Write the elements of this stream to a file at the given path using default serializers and deserializers.
* @param filePath name of file to write to
*
* Implementors will need to override toString for keys and values that are not of
* type String, Integer etc to get meaningful information.
* Implementors will need to override {@code toString} for keys and values that are not of
* type {@link String}, {@link Integer} etc. to get meaningful information.
*/
void writeAsText(String filePath);
/**
* Write the elements of this stream to a file at the given path.
*
* @param filePath name of file to write to
* @param keySerde key serde used to send key-value pairs,
@ -91,8 +92,8 @@ public interface KTable<K, V> {
* @param valSerde value serde used to send key-value pairs,
* if not specified the default serde defined in the configs will be used
*
* Implementors will need to override toString for keys and values that are not of
* type String, Integer etc to get meaningful information.
* Implementors will need to override {@code toString} for keys and values that are not of
* type {@link String}, {@link Integer} etc. to get meaningful information.
*/
void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde);

Просмотреть файл

@ -18,7 +18,7 @@
package org.apache.kafka.streams.kstream;
/**
* The KeyValueMapper interface for mapping a key-value pair to a new value (could be another key-value pair).
* The {@link KeyValueMapper} interface for mapping a key-value pair to a new value (could be another key-value pair).
*
* @param <K> original key type
* @param <V> original value type
@ -26,5 +26,12 @@ package org.apache.kafka.streams.kstream;
*/
public interface KeyValueMapper<K, V, R> {
/**
* Map a record with the given key and value to a new value.
*
* @param key the key of the record
* @param value the value of the record
* @return the new value
*/
R apply(K key, V value);
}

Просмотреть файл

@ -18,12 +18,19 @@
package org.apache.kafka.streams.kstream;
/**
* The Predicate interface represents a predicate (boolean-valued function) of a key-value pair.
* The {@link Predicate} interface represents a predicate (boolean-valued function) of a key-value pair.
*
* @param <K> key type
* @param <V> value type
*/
public interface Predicate<K, V> {
/**
* Test if the record with the given key and value satisfies the predicate.
*
* @param key the key of the record
* @param value the value of the record
* @return return {@code true} if the key-value pair satisfies the predicate&mdash;{@code false} otherwise
*/
boolean test(K key, V value);
}

Просмотреть файл

@ -18,11 +18,18 @@
package org.apache.kafka.streams.kstream;
/**
* The Reducer interface for combining two values of the same type into a new value.
* The {@link Reducer} interface for combining two values of the same type into a new value.
*
* @param <V> value type
*/
public interface Reducer<V> {
/**
* Aggregate the two given values into a single one.
*
* @param value1 the first value for the aggregation
* @param value2 the second value for the aggregation
* @return the aggregated value
*/
V apply(V value1, V value2);
}

Просмотреть файл

@ -24,6 +24,14 @@ import java.util.Map;
/**
* The time-based window specifications used for aggregations.
* <p>
* The semantics of a time-based window are: Every T1 (advance) time-units, compute the aggregate total for T2 (size) time-units.
* <ul>
* <li> If {@code advance < size} a hopping windows is defined: <br />
* it discretize a stream into overlapping windows, which implies that a record maybe contained in one and or more "adjacent" windows.</li>
* <li> If {@code advance == size} a tumbling window is defined:<br />
* it discretize a stream into non-overlapping windows, which implies that a record is only ever contained in one and only one tumbling window.</li>
* </ul>
*/
public class TimeWindows extends Windows<TimeWindow> {

Просмотреть файл

@ -20,7 +20,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.processor.ProcessorContext;
/**
* A stateful Transformer interface for transform a key-value pair into a new value.
* A stateful {@link Transformer} interface for transform a key-value pair into a new value.
*
* @param <K> key type
* @param <V> value type
@ -40,10 +40,10 @@ public interface Transformer<K, V, R> {
void init(ProcessorContext context);
/**
* Transform the message with the given key and value.
* Transform the record with the given key and value.
*
* @param key the key for the message
* @param value the value for the message
* @param key the key for the record
* @param value the value for the record
* @return new value; if null no key-value pair will be forwarded to down stream
*/
R transform(K key, V value);

Просмотреть файл

@ -18,9 +18,14 @@
package org.apache.kafka.streams.kstream;
/**
* A transformer supplier which can create one or more {@link Transformer} instances.
* A {@link TransformerSupplier} interface which can create one or more {@link Transformer} instances.
*/
public interface TransformerSupplier<K, V, R> {
/**
* Return a new {@link Transformer} instance.
*
* @return a new {@link Transformer} instance
*/
Transformer<K, V, R> get();
}

Просмотреть файл

@ -29,6 +29,7 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
private static final long DEFAULT_START_TIMESTAMP = 0L;
/** The start timestamp of the window. */
public final long start;
private UnlimitedWindows(String name, long start) {
@ -41,12 +42,18 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
}
/**
* Returns an unlimited window definition
* Return an unlimited window starting at timestamp zero.
*/
public static UnlimitedWindows of(String name) {
return new UnlimitedWindows(name, DEFAULT_START_TIMESTAMP);
}
/**
* Return a new unlimited window for the specified start timestamp.
*
* @param start the window start time
* @return a new unlimited window that starts at {@code start}
*/
public UnlimitedWindows startOn(long start) {
return new UnlimitedWindows(this.name, start);
}

Просмотреть файл

@ -18,7 +18,7 @@
package org.apache.kafka.streams.kstream;
/**
* The ValueJoiner interface for joining two values and return a the joined new value.
* The {@link ValueJoiner} interface for joining two values into a new value.
*
* @param <V1> first value type
* @param <V2> second value type
@ -26,5 +26,12 @@ package org.apache.kafka.streams.kstream;
*/
public interface ValueJoiner<V1, V2, R> {
/**
* Return a joined value consisting of {@code value1} and {@code value2}.
*
* @param value1 the first value for joining
* @param value2 the second value for joining
* @return the joined value
*/
R apply(V1 value1, V2 value2);
}

Просмотреть файл

@ -18,12 +18,18 @@
package org.apache.kafka.streams.kstream;
/**
* The KeyValueMapper interface for mapping an original value to a new value (could be another key-value pair).
* The {@link ValueMapper} interface for mapping an original value to a new value (could be another key-value pair).
*
* @param <V1> original value type
* @param <V2> mapped value type
*/
public interface ValueMapper<V1, V2> {
/**
* Map the given value to a new value.
*
* @param value the value to be mapped
* @return the new value
*/
V2 apply(V1 value);
}

Просмотреть файл

@ -20,7 +20,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.processor.ProcessorContext;
/**
* A stateful Value Transformer interface for transform a value into a new value.
* A stateful {@link ValueTransformer} interface to transform a value into a new value.
*
* @param <V> value type
* @param <R> return type
@ -31,7 +31,7 @@ public interface ValueTransformer<V, R> {
* Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology
* that contains it is initialized.
* <p>
* If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
* If this transformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
* {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
*
* @param context the context; may not be null
@ -39,9 +39,9 @@ public interface ValueTransformer<V, R> {
void init(ProcessorContext context);
/**
* Transform the message with the given key and value.
* Transform the record with the given key and value.
*
* @param value the value for the message
* @param value the value for the record
* @return new value
*/
R transform(V value);

Просмотреть файл

@ -18,9 +18,14 @@
package org.apache.kafka.streams.kstream;
/**
* A value transformer supplier which can create one or more {@link ValueTransformer} instances.
* A {@link ValueTransformerSupplier} interface which can create one or more {@link ValueTransformer} instances.
*/
public interface ValueTransformerSupplier<V, R> {
/**
* Return a new {@link ValueTransformer} instance.
*
* @return a new {@link ValueTransformer} instance.
*/
ValueTransformer<V, R> get();
}

Просмотреть файл

@ -25,25 +25,37 @@ public abstract class Window {
private long start;
private long end;
/**
* Create a new window for the given start time (inclusive) and end time (exclusive).
*
* @param start the start timestamp of the window (inclusive)
* @param end the end timestamp of the window (exclusive)
*/
public Window(long start, long end) {
this.start = start;
this.end = end;
}
/**
* Returns the start timestamp of this window, inclusive
* Return the start timestamp of this window, inclusive
*/
public long start() {
return start;
}
/**
* Returns the end timestamp of this window, exclusive
* Return the end timestamp of this window, exclusive
*/
public long end() {
return end;
}
/**
* Check if the given window overlaps with this window.
*
* @param other another window
* @return {@code true} if {@code other} overlaps with this window&mdash;{@code false} otherwise
*/
public boolean overlap(Window other) {
return this.start() < other.end() || other.start() < this.end();
}

Просмотреть файл

@ -22,30 +22,40 @@ package org.apache.kafka.streams.kstream;
* i.e. {@link KStream#aggregateByKey(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde,
* org.apache.kafka.common.serialization.Serde)}
*
* @param <T> Type of the key
* @param <K> Type of the key
*/
public class Windowed<T> {
public class Windowed<K> {
private T value;
private K key;
private Window window;
public Windowed(T value, Window window) {
this.value = value;
public Windowed(K key, Window window) {
this.key = key;
this.window = window;
}
public T value() {
return value;
/**
* Return the key of the window.
*
* @return the key of the window
*/
public K key() {
return key;
}
/**
* Return the window containing the values associated with this key.
*
* @return the window containing the values
*/
public Window window() {
return window;
}
@Override
public String toString() {
return "[" + value + "@" + window.start() + "]";
return "[" + key + "@" + window.start() + "]";
}
@Override
@ -58,12 +68,12 @@ public class Windowed<T> {
Windowed<?> that = (Windowed) obj;
return this.window.equals(that.window) && this.value.equals(that.value);
return this.window.equals(that.window) && this.key.equals(that.key);
}
@Override
public int hashCode() {
long n = ((long) window.hashCode() << 32) | value.hashCode();
long n = ((long) window.hashCode() << 32) | key.hashCode();
return (int) (n % 0xFFFFFFFFL);
}
}

Просмотреть файл

@ -30,16 +30,12 @@ public abstract class Windows<W extends Window> {
private static final int DEFAULT_NUM_SEGMENTS = 3;
private static final long DEFAULT_EMIT_DURATION = 1000L;
private static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L; // one day
private static final AtomicInteger NAME_INDEX = new AtomicInteger(0);
protected String name;
private long emitDurationMs;
private long maintainDurationMs;
public int segments;
@ -50,7 +46,6 @@ public abstract class Windows<W extends Window> {
}
this.name = name;
this.segments = DEFAULT_NUM_SEGMENTS;
this.emitDurationMs = DEFAULT_EMIT_DURATION;
this.maintainDurationMs = DEFAULT_MAINTAIN_DURATION;
}
@ -58,17 +53,10 @@ public abstract class Windows<W extends Window> {
return name;
}
/**
* Set the window emit duration in milliseconds of system time.
*/
public Windows emit(long durationMs) {
this.emitDurationMs = durationMs;
return this;
}
/**
* Set the window maintain duration in milliseconds of system time.
*
* @return itself
*/
public Windows until(long durationMs) {
this.maintainDurationMs = durationMs;
@ -79,6 +67,8 @@ public abstract class Windows<W extends Window> {
/**
* Specify the number of segments to be used for rolling the window store,
* this function is not exposed to users but can be called by developers that extend this JoinWindows specs.
*
* @return itself
*/
protected Windows segments(int segments) {
this.segments = segments;
@ -86,18 +76,21 @@ public abstract class Windows<W extends Window> {
return this;
}
public long emitEveryMs() {
return this.emitDurationMs;
}
/**
* Return the window maintain duration in milliseconds of system time.
*
* @return the window maintain duration in milliseconds of system time
*/
public long maintainMs() {
return this.maintainDurationMs;
}
protected String newName(String prefix) {
return prefix + String.format("%010d", NAME_INDEX.getAndIncrement());
}
/**
* Creates all windows that contain the provided timestamp.
*
* @param timestamp the timestamp window should get created for
* @return a map of {@code windowStartTimestamp -> Window} entries
*/
public abstract Map<Long, W> windowsFor(long timestamp);
}

Просмотреть файл

@ -163,7 +163,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
@SuppressWarnings("unchecked")
@Override
public T get(Windowed<K> windowedKey) {
K key = windowedKey.value();
K key = windowedKey.key();
W window = (W) windowedKey.window();
// this iterator should contain at most one element

Просмотреть файл

@ -157,7 +157,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
@SuppressWarnings("unchecked")
@Override
public V get(Windowed<K> windowedKey) {
K key = windowedKey.value();
K key = windowedKey.key();
W window = (W) windowedKey.window();
// this iterator should only contain one element

Просмотреть файл

@ -40,7 +40,7 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>> {
@Override
public byte[] serialize(String topic, Windowed<T> data) {
byte[] serializedKey = inner.serialize(topic, data.value());
byte[] serializedKey = inner.serialize(topic, data.key());
ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE);
buf.put(serializedKey);
@ -55,7 +55,7 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>> {
}
public byte[] serializeBaseKey(String topic, Windowed<T> data) {
return inner.serialize(topic, data.value());
return inner.serialize(topic, data.key());
}
}

Просмотреть файл

@ -29,12 +29,12 @@ public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Window
}
/**
* WindowedStreamPartitioner determines the partition number for a message with the given windowed key and value
* WindowedStreamPartitioner determines the partition number for a record with the given windowed key and value
* and the current number of partitions. The partition number id determined by the original key of the windowed key
* using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
*
* @param windowedKey the key of the message
* @param value the value of the message
* @param windowedKey the key of the record
* @param value the value of the record
* @param numPartitions the total number of partitions
* @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
*/

Просмотреть файл

@ -27,7 +27,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
* via this timestamp extractor.
*
* If <i>CreateTime</i> is used to define the built-in timestamps, using this extractor effectively provide
* <i>event-time</i> semantics.
* <i>event-time</i> semantics. If <i>LogAppendTime</i> is used to define the built-in timestamps, using
* this extractor effectively provides <i>ingestion-time</i> semantics.
*
* If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
*/

Просмотреть файл

@ -37,10 +37,10 @@ public interface Processor<K, V> {
void init(ProcessorContext context);
/**
* Process the message with the given key and value.
* Process the record with the given key and value.
*
* @param key the key for the message
* @param value the value for the message
* @param key the key for the record
* @param value the value for the record
*/
void process(K key, V value);
@ -53,7 +53,8 @@ public interface Processor<K, V> {
void punctuate(long timestamp);
/**
* Close this processor and clean up any resources.
* Close this processor and clean up any resources. Be aware that {@link #close()} is called after an internal cleanup.
* Thus, it is not possible to write anything to Kafka as underlying clients are already closed.
*/
void close();
}

Просмотреть файл

@ -29,5 +29,10 @@ package org.apache.kafka.streams.processor;
*/
public interface ProcessorSupplier<K, V> {
/**
* Return a new {@link Processor} instance.
*
* @return a new {@link Processor} instance
*/
Processor<K, V> get();
}

Просмотреть файл

@ -22,7 +22,17 @@ package org.apache.kafka.streams.processor;
*/
public interface StateStoreSupplier {
/**
* Return the name of this state store supplier.
*
* @return the name of this state store supplier
*/
String name();
/**
* Return a new {@link StateStore} instance.
*
* @return a new {@link StateStore} instance
*/
StateStore get();
}

Просмотреть файл

@ -17,21 +17,21 @@
package org.apache.kafka.streams.processor;
/**
* Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
* Determine how records are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
* {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition.
* <p>
* Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so
* using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you
* to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics.
* to use multiple instances of your topology to process in parallel all of the records on the topology's source topics.
* <p>
* When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only
* those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will
* those processors in that topology instance will consume the records from those partitions. In many cases, Kafka Streams will
* automatically manage these instances, and adjust when new topology instances are added or removed.
* <p>
* Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have
* stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance.
* An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
* determine to which partition each message should be written.
* Some topologies, though, need more control over which records appear in each partition. For example, some topologies that have
* stateful processors may want all records within a range of keys to always be delivered to and handled by the same topology instance.
* An upstream topology producing records to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
* determine to which partition each record should be written.
* <p>
* To do this, create a <code>StreamPartitioner</code> implementation, and when you build your topology specify that custom partitioner
* when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) adding a sink}
@ -48,10 +48,10 @@ package org.apache.kafka.streams.processor;
public interface StreamPartitioner<K, V> {
/**
* Determine the partition number for a message with the given key and value and the current number of partitions.
* Determine the partition number for a record with the given key and value and the current number of partitions.
*
* @param key the key of the message
* @param value the value of the message
* @param key the key of the record
* @param value the value of the record
* @param numPartitions the total number of partitions
* @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
*/

Просмотреть файл

@ -25,11 +25,13 @@ import java.io.IOException;
import java.nio.ByteBuffer;
/**
* The task id representation composed as topic group id plus the assigned partition id.
* The task ID representation composed as topic group ID plus the assigned partition ID.
*/
public class TaskId implements Comparable<TaskId> {
/** The ID of the topic group. */
public final int topicGroupId;
/** The ID of the partition. */
public final int partition;
public TaskId(int topicGroupId, int partition) {
@ -42,7 +44,7 @@ public class TaskId implements Comparable<TaskId> {
}
/**
* @throws TaskIdFormatException if the string is not a valid TaskId
* @throws TaskIdFormatException if the string is not a valid {@link TaskId}
*/
public static TaskId parse(String string) {
int index = string.indexOf('_');

Просмотреть файл

@ -26,10 +26,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
public interface TimestampExtractor {
/**
* Extracts a timestamp from a message
* Extracts a timestamp from a record.
* <p>
* Typically, the timestamp represents the milliseconds since midnight, January 1, 1970 UTC.
*
* @param record ConsumerRecord
* @return timestamp
* @param record a data record
* @return the timestamp of the record
*/
long extract(ConsumerRecord<Object, Object> record);
}

Просмотреть файл

@ -43,11 +43,11 @@ import java.util.Set;
/**
* A component that is used to build a {@link ProcessorTopology}. A topology contains an acyclic graph of sources, processors,
* and sinks. A {@link SourceNode source} is a node in the graph that consumes one or more Kafka topics and forwards them to
* its child nodes. A {@link Processor processor} is a node in the graph that receives input messages from upstream nodes,
* processes that message, and optionally forwarding new messages to one or all of its children. Finally, a {@link SinkNode sink}
* is a node in the graph that receives messages from upstream nodes and writes them to a Kafka topic. This builder allows you
* its child nodes. A {@link Processor processor} is a node in the graph that receives input records from upstream nodes,
* processes that records, and optionally forwarding new records to one or all of its children. Finally, a {@link SinkNode sink}
* is a node in the graph that receives records from upstream nodes and writes them to a Kafka topic. This builder allows you
* to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreams}
* instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing messages}.
* instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing records}.
*/
public class TopologyBuilder {
@ -193,7 +193,7 @@ public class TopologyBuilder {
public TopologyBuilder() {}
/**
* Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
* Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
* The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and
* {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
@ -208,15 +208,15 @@ public class TopologyBuilder {
}
/**
* Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
* Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
* The source will use the specified key and value deserializers.
*
* @param name the unique name of the source used to reference this node when
* {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
* @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source
* @param keyDeserializer the {@link Deserializer key deserializer} used when consuming records; may be null if the source
* should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source
* @param valDeserializer the {@link Deserializer value deserializer} used when consuming records; may be null if the source
* should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param topics the name of one or more Kafka topics that this source is to consume
@ -242,14 +242,14 @@ public class TopologyBuilder {
}
/**
* Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
* Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
* The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and
* {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
*
* @param name the unique name of the sink
* @param topic the name of the Kafka topic to which this sink should write its messages
* @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
* @param topic the name of the Kafka topic to which this sink should write its records
* @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, StreamPartitioner, String...)
@ -261,22 +261,22 @@ public class TopologyBuilder {
}
/**
* Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic, using
* Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic, using
* the supplied partitioner.
* The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and
* {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
* <p>
* The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
* The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among
* the named Kafka topic's partitions. Such control is often useful with topologies that use
* {@link #addStateStore(StateStoreSupplier, String...) state stores}
* in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
* messages among partitions using Kafka's default partitioning logic.
* in its processors. In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute
* records among partitions using Kafka's default partitioning logic.
*
* @param name the unique name of the sink
* @param topic the name of the Kafka topic to which this sink should write its messages
* @param partitioner the function that should be used to determine the partition for each message processed by the sink
* @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
* @param topic the name of the Kafka topic to which this sink should write its records
* @param partitioner the function that should be used to determine the partition for each record processed by the sink
* @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
@ -288,18 +288,18 @@ public class TopologyBuilder {
}
/**
* Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
* Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
* The sink will use the specified key and value serializers.
*
* @param name the unique name of the sink
* @param topic the name of the Kafka topic to which this sink should write its messages
* @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
* @param topic the name of the Kafka topic to which this sink should write its records
* @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
* should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
* @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
* should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
* @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
@ -311,19 +311,19 @@ public class TopologyBuilder {
}
/**
* Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
* Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
* The sink will use the specified key and value serializers, and the supplied partitioner.
*
* @param name the unique name of the sink
* @param topic the name of the Kafka topic to which this sink should write its messages
* @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
* @param topic the name of the Kafka topic to which this sink should write its records
* @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
* should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
* @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
* should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param partitioner the function that should be used to determine the partition for each message processed by the sink
* @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
* @param partitioner the function that should be used to determine the partition for each record processed by the sink
* @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
@ -354,11 +354,11 @@ public class TopologyBuilder {
}
/**
* Add a new processor node that receives and processes messages output by one or more parent source or processor node.
* Any new messages output by this processor will be forwarded to its child processor or sink nodes.
* Add a new processor node that receives and processes records output by one or more parent source or processor node.
* Any new record output by this processor will be forwarded to its child processor or sink nodes.
* @param name the unique name of the processor node
* @param supplier the supplier used to obtain this node's {@link Processor} instance
* @param parentNames the name of one or more source or processor nodes whose output messages this processor should receive
* @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
* and process
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name

Просмотреть файл

@ -25,9 +25,16 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
* Using this extractor effectively provides <i>processing-time</i> semantics.
*
* If you need <i>event-time</i> semantics, use {@link ConsumerRecordTimestampExtractor} with
* built-in <i>CreateTime</i> timestamp (see KIP-32: Add timestamps to Kafka message for details).
* built-in <i>CreateTime</i> or <i>LogAppendTime</i> timestamp (see KIP-32: Add timestamps to Kafka message for details).
*/
public class WallclockTimestampExtractor implements TimestampExtractor {
/**
* Return the current wall clock time as timestamp.
*
* @param record a data record
* @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC
*/
@Override
public long extract(ConsumerRecord<Object, Object> record) {
return System.currentTimeMillis();

Просмотреть файл

@ -286,7 +286,7 @@ public class StreamThread extends Thread {
removeStandbyTasks();
// We need to first close the underlying clients before closing the state
// manager, for example we need to make sure producer's message sends
// manager, for example we need to make sure producer's record sends
// have all been acked before the state manager records
// changelog sent offsets
try {

Просмотреть файл

@ -24,13 +24,23 @@ import org.apache.kafka.common.serialization.Serializer;
/**
* Factory for creating serializers / deserializers for state stores in Kafka Streams.
*
* @param <K> key type of serdes
* @param <V> value type of serdes
* @param <K> key type of serde
* @param <V> value type of serde
*/
public final class StateSerdes<K, V> {
public static <K, V> StateSerdes<K, V> withBuiltinTypes(String topic, Class<K> keyClass, Class<V> valueClass) {
return new StateSerdes<>(topic, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass));
/**
* Create a new instance of {@link StateSerdes} for the given state name and key-/value-type classes.
*
* @param stateName the name of the state
* @param keyClass the class of the key type
* @param valueClass the class of the value type
* @param <K> the key type
* @param <V> the value type
* @return a new instance of {@link StateSerdes}
*/
public static <K, V> StateSerdes<K, V> withBuiltinTypes(String stateName, Class<K> keyClass, Class<V> valueClass) {
return new StateSerdes<>(stateName, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass));
}
private final String stateName;
@ -63,46 +73,105 @@ public final class StateSerdes<K, V> {
this.valueSerde = valueSerde;
}
/**
* Return the key serde.
*
* @return the key serde
*/
public Serde<K> keySerde() {
return keySerde;
}
/**
* Return the value serde.
*
* @return the value serde
*/
public Serde<V> valueSerde() {
return valueSerde;
}
/**
* Return the key deserializer.
*
* @return the key deserializer
*/
public Deserializer<K> keyDeserializer() {
return keySerde.deserializer();
}
/**
* Return the key serializer.
*
* @return the key serializer
*/
public Serializer<K> keySerializer() {
return keySerde.serializer();
}
/**
* Return the value deserializer.
*
* @return the value deserializer
*/
public Deserializer<V> valueDeserializer() {
return valueSerde.deserializer();
}
/**
* Return the value serializer.
*
* @return the value serializer
*/
public Serializer<V> valueSerializer() {
return valueSerde.serializer();
}
public String topic() {
/**
* Return the name of the state.
*
* @return the name of the state
*/
public String stateName() {
return stateName;
}
/**
* Deserialize the key from raw bytes.
*
* @param rawKey the key as raw bytes
* @return the key as typed object
*/
public K keyFrom(byte[] rawKey) {
return keySerde.deserializer().deserialize(stateName, rawKey);
}
/**
* Deserialize the value from raw bytes.
*
* @param rawValue the value as raw bytes
* @return the value as typed object
*/
public V valueFrom(byte[] rawValue) {
return valueSerde.deserializer().deserialize(stateName, rawValue);
}
/**
* Serialize the given key.
*
* @param key the key to be serialized
* @return the serialized key
*/
public byte[] rawKey(K key) {
return keySerde.serializer().serialize(stateName, key);
}
/**
* Serialize the given value.
*
* @param value the value to be serialized
* @return the serialized value
*/
public byte[] rawValue(V value) {
return valueSerde.serializer().serialize(stateName, value);
}

Просмотреть файл

@ -23,7 +23,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.processor.StateStore;
/**
* A windowed store interface extending {@link StateStore}
* A windowed store interface extending {@link StateStore}.
*
* @param <K> Type of keys
* @param <V> Type of values

Просмотреть файл

@ -213,7 +213,7 @@ public class SmokeTestClient extends SmokeTestUtil {
new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() {
@Override
public KeyValue<String, Long> apply(Windowed<String> key, Long value) {
return new KeyValue<>(key.value() + "@" + key.window().start(), value);
return new KeyValue<>(key.key() + "@" + key.window().start(), value);
}
}
).to(stringSerde, longSerde, "wcnt");

Просмотреть файл

@ -77,7 +77,7 @@ public class SmokeTestUtil {
public static final class Unwindow<K, V> implements KeyValueMapper<Windowed<K>, V, KeyValue<K, V>> {
public KeyValue<K, V> apply(Windowed<K> winKey, V value) {
return new KeyValue<K, V>(winKey.value(), value);
return new KeyValue<K, V>(winKey.key(), value);
}
}