Kafka Streams – Catching Data in the Act. 3: The Mechanics.

In the previous post we designed the experiment, simulated different operational states and confirmed that the results were as expected – more or less. Here we go over the implementation and a few relevant code snippets before wrapping up this series of posts. As usual the package is available for download at github.

We start with the pom.

1. The pom

Mostly straightforward. The relevant xml snippets are as follows. We use Elasticsearch (6.1.2) for long term storage.

2. Producers

Producers here are the sensors at the vertices A, B, and C. Each sensor produces to an assigned partition: A => 0, B => 1, and C => 2. The sensors throw off raw measurements at a maximum rate of about 1/sec as per the following code snippet running on thread in a loop until interrupted. Lines 5-6 update the position of the vertex. Angular velocity is one revolution per minute (2 PI / 60). Line #7 computes the RawVertex object as per the AVRO spec. Lines 8-9 push the object to the rawVertex topic and wait for full acknowledgement.

The important producer configurations in Lines #5, #6 indicate the  KafkaAvroSerializers, and Line #7 the url to find/register the schemas.

The three producers are started off at the same time via a script (see Section 4) that supplies the initial positions of the vertices.

3. Stream processing

The VertexProcessor and TriangleProcessor work in concert to enable stress metrics on the triangle. A kafka stream is a consumer and/or producer as needed for receiving messages from upstream entities and producing messages to downstream entities. Thus the serialization (for production) & deserialization (for consumption) methodologies need to be defined and be in place before a stream topology can be instantiated. Further, as the processors may employ a variety of stores (key-value and window in our case here), the means to build these stores should be in place as well.

A few key stream properties are shown above. We employ 3 stream threads (Line #1) to match the 3 rawVertex partition sources we have. Line #4 defines the Stream Time, the key quantity that makes the stream move or not. This is the actual measurement time in case of the rawVertex and the average of those in case of the smoothedVertex. The code snippet below shows the custom class that extends TimestampExtractor. Lines #8 and #12 extract the measurement time as the stream-time.

3.1 Serialization & Deserialization

The messages in our cases have Strings/Longs as the keys but Avro objects as the values. The following snippet of code configures and returns a serializer/deserializer we will need for the values (Lines 6 and 15 below).

Querying the schema registry shows the String, and the RawVertex, SmoothedVertex schemas we defined via “avsc” files in the data model.

3.2 The Stores

The VertexProcessor employs two local stores for resiliency. One of them holds (Line #10 below), the current batch of raw vertices being smoothed. In case the processor dies, the restarted processor will read the current state from this store. The other (Line #11) will have the previously smoothed vertex object, which is needed for computing cumulative displacement of the vertex. The windowstore used by the TriangleProcessor is defined in Line #14, with two parameters – the length of the time-window, and its retention time. The right values for these depend on the overall beat of the apparatus – the production rate, expected delays in transit, a reasonable duration to average the measurements over etc… that we discussed at length in the previous post.

3.3 The Topology

The process topology described in the earlier posts is materialized by the following code snippet.

In Line #3 the data is read off of the “rawVertex” topic with the configured deserializers (String for key & KafkaAvro for the value which is a RawVertex object). This flows into the VertexProcessor in Line #4 that smooths them over a short time-interval before forwarding the average into “smoothedVertex” topic in Line #7. The smoothedVertex topic serves as the source (Line #8) for the TriangleProcessor in Line #8.  The topology built and executed by Kafka is obtained with  topology.describe() showing two sub-topologies as expected – excellent!

When  starting a new run we may want to cleanup any hold over stores from a previous run. In case of a crash and restart we should not reset the stores, of course. This is Line #3 below. In case of a failure or interruption a shutdown hook allows for all the state stores to be closed cleanly in Line #12.

The Vertex and Triangle processors provide an implementation for  kstreams.close() to take care of any other house keeping operations. For example the TriangleProcessor (see Section 3.5) employs the following snippet to flush the triangle metrics and smoothed vertex measurements to Elasticsearch before exiting.

3.4 The VertexProcessor

The VertexProcessor averages the incoming raw measurements over the smoothing interval and forwards the smoothedVertex to the next stage. The stores are initialized in Lines 3-4 below. In Line # 5, the punctuation is scheduled to run periodically at an interval – smoothingInterval (12 seconds in our simulaions). The smoothed vertex is computed (Line # 14), forwarded downstream (Line #15), and also saved to local store (Line # 17).  We get ready for a new smoothing batch in Line #16.

In case of failure & restart we need to load the store from disk and initialize the state (Line #3 below). An iterator over the store in Line #17 provides access to the raw vertex objects that have been pulled off the topic prior to failure but not yet processed into the smoothedVertex topic.

3.5 The TriangleProcessor

The TriangleProcessor gets the forwarded smoothedVertex objects from the VertexProcessors, computes triangle metrics as soon as a window gets at least one of A, B, and C measurements. The metrics are saves it to an elasticsearch index for long term storage. Line #3 in the code below keeps track of the ‘travel-time’ that we plotted in the previous post. This is the time that a smoothed vertex takes to reach the TriangleProcessor after it has been forwarded by the VertexProcessor. If we are restarting the stream processor after a crash, Line #9 gets engaged and reads in all the available windowstore data. Line #13 saves the incoming measurement to the window store.

Because the time-windows have defined boundaries (first window; [0, windowSize), second window: [windowSize, 2*windowSize), etc…) we know which window an incoming measurement will fall into. This is Line #15, followed by keeping track of this window (Line #13) for “saturation-time” that we want to know. How long does a time-window needs to be retained will depend on the maximum saturation-time that we can expect in a simulation. We talked about this a good bit in the previous post. Every time a window gets a new measurement and it has at least one each of A, B, and C, we compute the triangle metrics – this is Line #26.

In the code snippet below, the vertex coordinates in a window are processed to produce a single value for A, B & C and at a mean time. Line #3 gets an iterator over all the smoothedVertex objects in a given time-window.

With a triangle at hand we proceed forward to compute the metrics, and save it to Elasticsearch for long term storage. Any alerts based on the computed metrics will be injected here – but we will not delve into that. Line #12 computes the time gap between the earliest & latest arriving measurements in a window – so we can get a distribution of the this quantity over time and use it to adjust the window retention time.

4. Running The Simulation

First the rawVertex topic is created with 3 partitions and the smoothedVertex topic with 1 partition.

The three producers are started off from a script that provides all the producer configs (Section 2) as arguments. The production parameters are geared (no retries, synchronous send, only one message in flight, full acknowledgement etc…) towards keeping the message order in the topic to be identical to production order – we count on this for smoothing in the VertexProcessor.

Likewise the stream processor is started (or re-started) with a script supplying the various config parameters indicated in Section 3.

5. Conclusion

With that we close this post – and this series of posts on using Kafka for streaming analytics. Hopefully the detail here is all that one needs to replicate the simulations in this series. The complete code is available at github.

Leave a Reply