Skip to Main Content

Cloud Platform

Announcement

For appeals, questions and feedback about Oracle Forums, please email oracle-forums-moderators_us@oracle.com. Technical questions should be asked in the appropriate category. Thank you!

Real time processing with Kafka Streams based microservices on Application Container Cloud

Abhishek Gupta-OracleMar 6 2017 — edited Mar 21 2017

Part I of the blog demonstrated development, deployment of individual microservices (on Oracle Application Container Cloud) and how they are loosely coupled using the Apache Kafka message hub (setup on Oracle Compute Cloud). This (second) part will continue building on the previous one and with the help of an application, it will explore microservice based stream processing and dive into the following areas

  • Kafka Streams: A stream processing library
  • Scalability: enable your application to handle increased demands
  • Handling state: this is a hard problem to solve when the application needs to be horizontally scalable

Technical Components

Open source technologies

The following open source components were used to build the sample application

|

Component

|

Description

|
|

|

|
|

Apache Kafka

|

A scalable, distributed pub-sub message hub

|
|

Kafka Streams

|

A library for building stream processing applications on top of Apache Kafka

|
|

Jersey

|

Used to implement REST and SSE services. Uses Grizzly as a (pluggable) runtime/container

|
|

Maven

|

Used as the standard Java build tool (along with its assembly plugin)

|

Oracle Cloud

The following Oracle Cloud services have been leveraged

|

Oracle Cloud Service

|

Description

|
|

|

|
|

Application Container Cloud

|

Serves as a scalable platform for running our

stream processing microservices

|
|

Compute Cloud

|

Hosts the Kafka cluster (broker)

|

Note: In addition to compute based (IaaS) Kafka hosting, Oracle Cloud now offers Event Hub Cloud. This is a compelling offering which provides Apache Kafka as a fully managed service along with other value added capabilities.

Hello Kafka Streams!

In simple words, Kafka Streams is a library which you can include in your Java based applications to build stream processing applications on top of Apache Kafka. Other distributed computing platforms like Apache Spark, Apache Storm etc. are widely used in the big data stream processing world, but Kafka Streams brings some unique propositions in this area

Kafka Streams: what & why

|

What

|

Why

|
|

|

|
|

Built on top of Kafka – leverages its scalable and fault tolerant capabilities

|

If you use Kafka in your ecosystem, it makes perfect sense to leverage Kafka Streams to churn streaming data to/from the Kafka topics

|
|

|

|
|

Microservices friendly

|

It’s a lightweight library which you use within your Java application. This means that you can use it to build microservices style stream processing applications

|
|

|

|
|

Flexible deployment & elastic in nature

|

You’re not restricted to a specific deployment model (e.g. cluster-based). The application can be packaged and deployed in a flexible manner and scaled up and down easily

|
|

|

|
|

For fast data

|

Harness the power of Kafka streams to crunch high volume data in real time systems – it does not need to be at big data scale

|
|

|

|
|

Support for stateful processing

|

Helps manage local application state in a fault tolerant & scalable manner

|

Sample application: what’s new

In part I, the setup was as follows

  • A Kafka broker serving as the messaging hub
  • Producer application (on Application Container Cloud) pushing CPU usage metrics to Kafka
  • Consumer application (on Application Container Cloud) consuming those metrics from Kafka and exposes them as real time feed (using Server Sent Events)

Some parts of the sample have been modified to demonstrate some of the key concepts. Here is the gist

|

Component

|

Changes

|
|

|

|
|

Consumer API

|

The new consumer application leverages the Kafka Streams API on Application Container Cloud as compared to the traditional (polling based) Kafka Consumer client API (used in part I)

|
|

|

|
|

Consumer topology

|

We will deploy multiple instances of the Consumer application to scale our processing logic

|
|

|

|
|

Nature of metrics feed

|

The cumulative moving average of the CPU metrics per machine is calculated as opposed to the exact metric provided by the SSE feed in part I

|
|

|

|
|

Accessing the CPU metrics feed

|

the consumer application makes the CPU usage metrics available in the form of a REST API as compared to the SSE based implementation in part I

|

High level architecture

The basic architecture still remains the same i.e. microservices decoupled using a messaging layer

pastedImage_1.png

As mentioned above, the consumer application has undergone changes and is now based on the Kafka Streams API. We could have continued to use the traditional poll based Kafka Consumer client API as in part I, but the Kafka Streams API was chosen for a few reasons. Let’s go through them in detail and see how it fits in the context of the overall solution. At this point, ask yourself the following questions

  • How would you scale your consumer application?
  • How would you handle intermediate state (required for moving average calculation) spread across individual instances of your scaled out application?

Scalability

With Application Container Cloud you can spawn multiple instances of your stream processing application with ease (for more details, refer to the documentation)

But how does it help?

The sample application models CPU metrics being continuously sent by the producer application to a Kafka broker – for demonstration purposes, the number of machines (whose CPU metrics are being sent) have been limited to ten. But how would you handle large scale data

  • When the number of machines increases to scale of thousands?
  • Perhaps you want to factor in additional attributes (in addition to just the cpu usage)?
  • Maybe you want to execute all this at data-center scale?

The answer lies in distributing your computation across several processes and this is where horizontal scalability plays a key role.

When the CPU metrics are sent to a topic in Kafka, they are distributed to different partitions (using a default consistent hashing algorithm) – this is similar to sharding. This helps from multiple perspectives

  • When Kafka itself is scaled out (broker nodes are added) – individual partitions are replicated over these nodes for fault tolerance and high performance
  • From a consumer standpoint - multiple consumers (in the same group) automatically distribute the load among themselves

In the case of our example, each of the streams processing application instance is nothing but a (specialized) form of Kafka Consumer and takes up a non-overlapping set of partitions in Kafka for processing. For a setup where 2 instances which are processing data for 10 machines spread over 4 partitions in Kafka (broker). Here is a pictorial representation

pastedImage_5.png

Managing application state (at scale)

The processing logic in the sample application is not stateless i.e. it depends on previous state to calculate its current state. In the context of this application, state is

  • the cumulative moving average of a continuous stream of CPU metrics,
  • being calculated in parallel across a distributed set of instances, and
  • constantly changing i.e. the cumulative moving average of the machines handled by each application instance is getting updated with the latest results

If you confine the processing logic to a single node, the problem of localized state co-ordination would not have existed i.e. local state = global state. But this luxury is not available in a distributed processing system. Here is how our application handles it (thanks to Kafka Streams)

  • The local state store (a KV store) containing the machine to (cumulative moving average) CPU usage metric is sent to a dedicated topic in Kafka e.g. the in-memory-avg-store in our application (named cpu-streamz) will have a corresponding topic cpu-streamz-in-memory-avg-store-changelog in Kafka
  • This topic is called a changelog since it is a compacted one i.e. only the latest key-value pair is retained by Kafka. This is meant to achieve the goal (distributed state management) in the cheapest possible manner
  • During scale up – Kafka assigns some partitions to the new instance (see above example) and the state for those partitions (which were previously stored in another instance) are replayed from the Kafka changelog topic to build the state store for this new instance
  • When an instance crashes or is stopped – the partitions being handled by that instance is handed off to some other node and the state of the partition (stored in the Kafka changelog topic) is written to the local state store of the existing node to which the work was allotted

All in all, this ensures scalable and fault tolerant state management

Exposing application state

As mentioned above, the cumulative moving averages of CPU metrics of each machine is calculated across multiple nodes in parallel. In order to find out the global state of the system i.e. current average of all (or specific) machines, the local state stores need to be queried. The application provides a REST API for this

pastedImage_7.png

More details in the Testing section on how to see this in action

It's important to make note of these points with regards to the implementation of the REST API which in turns lets us get what we want - real time insight in to the moving averages of the CPU usage

  • Topology agnostic: Use a single access URL provided by Application Container Cloud (as highlighted in the diagram above). As a client, you do not have to be aware of individual application instances
  • Robust & flexible: Instances can be added or removed on the fly but the overall business logic (in this case it is calculation of the cumulative moving average of a stream of CPU metrics) will remain fault tolerant and adjust to the elastic topology changes

This is made possible by a combination of the following

  • Automatic load balancing: Application Container cloud load balances requests among multiple instances of your applications
  • Clustered setup: from an internal implementation perspective, your application instances can detect each other. For this to work, the isClustered attribute in the manifest.json is set to true and custom logic is implemented within the solution in order for the instance specific information to be discovered and used appropriately. However, this is an internal implementation detail and the user is not affected by it

Please look at the Code snippets section for some more details

  • Interactive queries: this capability in Kafka Streams enables external clients to introspect the state store of a stream processing application instance via a host-port configuration enabled within the app configuration

An in-depth discussion of Kafka Streams is not possible in a single blog. The above sections are meant to provide just enough background which is (hopefully) sufficient from the point of view of this blog post. Readers are encouraged to spend some time going through the official documentation and come back to this blog to continue hacking on the sample

Setup

You can refer to part I of the blog for the Apache Kafka related setup. The only additional step which needs to be executed is exposing the port on which your Zookeeper process is listening (its 2181 by default) – as this is required by the Kafka Streams library configuration. While executing the steps from section Open Kafka listener port section, ensure that you include the Oracle Compute Cloud configuration for 2181 (in addition to the Kafka broker port 9092)

Code

Maven dependenies

As mentioned earlier, from an application development standpoint, Kafka Streams is just a library. This is evident in the pom.xml

<dependency>

 \<groupId>org.apache.kafka\</groupId>

 \<artifactId>kafka-streams\</artifactId>

 \<version>0.10.1.1\</version>

</dependency>

The project also uses the appropriate Jersey libraries along with the Maven shade and assembly plugins to package the application

Overview

The producer microservice remains the same and you can refer part I for the details. Let’s look at the revamped Consumer stream processing microservice

|

Class

|

Details

|
|

|

|
|

KafkaStreamsAppBootstrap

|

Entry point for the application. Kicks off Grizzly container, Kafka Stream processing pipeline

|
|

CPUMetricStreamHandler

|

Implements the processing pipeline logic and handles K-Stream configuration and the topology creation as well

|
|

MetricsResource

|

Exposes multiple REST endpoints for fetching CPU moving average metrics

|
|

Metric, Metrics

|

POJOs (JAXB decorated) to represent metric data. They are exchanged as JSON/XML payloads

|
|

GlobalAppState, Utils

|

Common utility classes

|

Now that you have a fair idea of what's going on within the application and an overview of the classes involved, it makes sense to peek at some of the relevant sections of the code

State store

public static class CPUCumulativeAverageProcessor implements Processor\<String, String> {

 ...................

    @Override

    public void init(ProcessorContext pc) {

        this.pc = pc;

        this.pc.schedule(12000); //invoke punctuate every 12 seconds

        this.machineToAvgCPUUsageStore = (KeyValueStore\<String, Double>) pc.getStateStore(AVG\_STORE\_NAME);

        this.machineToNumOfRecordsReadStore = (KeyValueStore\<String, Integer>) pc.getStateStore(NUM\_RECORDS\_STORE\_NAME);

    }

 ...............

Cumulative Moving Average (CMA) calculation

..........

@Override

public void process(String machineID, String currentCPUUsage) {

        //turn each String value (cpu usage) to Double

        Double currentCPUUsageD = Double.parseDouble(currentCPUUsage);

        Integer recordsReadSoFar = machineToNumOfRecordsReadStore.get(machineID);

        Double latestCumulativeAvg = null;

        if (recordsReadSoFar == null) {

            PROC\_LOGGER.log(Level.INFO, "First record for machine {0}", machineID);

            machineToNumOfRecordsReadStore.put(machineID, 1);

            latestCumulativeAvg = currentCPUUsageD;

        } else {

            Double cumulativeAvgSoFar = machineToAvgCPUUsageStore.get(machineID);

            PROC\_LOGGER.log(Level.INFO, "CMA so far {0}", cumulativeAvgSoFar);

            //refer [https://en.wikipedia.org/wiki/Moving_average#Cumulative_moving_average](https://en.wikipedia.org/wiki/Moving_average#Cumulative_moving_average) for details

            latestCumulativeAvg = (currentCPUUsageD + (recordsReadSoFar \* cumulativeAvgSoFar)) / (recordsReadSoFar + 1);

            recordsReadSoFar = recordsReadSoFar + 1;

            machineToNumOfRecordsReadStore.put(machineID, recordsReadSoFar);

        }

        machineToAvgCPUUsageStore.put(machineID, latestCumulativeAvg); //store latest CMA in local state store

..........

Metrics POJO

@XmlRootElement

@XmlAccessorType(XmlAccessType.FIELD)

public class Metrics {

private final List\<Metric> metrics;

public Metrics() {

    metrics = new ArrayList\<>();

}

public Metrics add(String source, String machine, String cpu) {

    metrics.add(new Metric(source, machine, cpu));

    return this;

}

public Metrics add(Metrics anotherMetrics) {

    anotherMetrics.metrics.forEach((metric) -> {

        metrics.add(metric);

    });

    return this;

}

@Override

public String toString() {

    return "Metrics{" + "metrics=" + metrics + '}';

}

public static Metrics EMPTY(){

    return new Metrics();

}

}

Exposing REST API for state

@GET

@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})

public Response all_metrics() throws Exception {

    Response response = null;

    try {

        KafkaStreams ks = GlobalAppState.getInstance().getKafkaStreams();

        HostInfo thisInstance = GlobalAppState.getInstance().getHostPortInfo();

      Metrics metrics = getLocalMetrics();

        ks.allMetadataForStore(storeName)

                .stream()

                .filter(sm -> !(sm.host().equals(thisInstance.host()) && sm.port() == thisInstance.port())) //only query remote node stores

                .forEach(new Consumer\<StreamsMetadata>() {

                    @Override

                    public void accept(StreamsMetadata t) {

                        String url = "[http://](http://community.oracle.com/)" + t.host() + ":" + t.port() + "/metrics/remote";

                        //LOGGER.log(Level.INFO, "Fetching remote store at {0}", url);

                        Metrics remoteMetrics = Utils.getRemoteStoreState(url, 2, TimeUnit.SECONDS);

                        metrics.add(remoteMetrics);

                        LOGGER.log(Level.INFO, "Metric from remote store at {0} == {1}", new Object\[\]{url, remoteMetrics});

                    }

                });

        response = Response.ok(metrics).build();

    } catch (Exception e) {

        LOGGER.log(Level.SEVERE, "Error - {0}", e.getMessage());

    }

    return response;

}

Host discovery

public static String getHostIPForDiscovery() {

String host = null;

    try {

        String hostname = Optional.ofNullable(System.getenv("APP\_NAME")).orElse("streams");

        InetAddress inetAddress = Address.getByName(hostname);

        host = inetAddress.getHostAddress();

    } catch (UnknownHostException ex) {

        host = "localhost";

    }

    return host;

}

Deployment to Application Container Cloud

Now that you have a fair idea of the application, it’s time to look at the build, packaging & deployment

Update deployment descriptors

The metadata files for the producer application are the same. Please refer to part I for details on how to update them. The steps below are relevant to the (new) stream processing consumer microservice.

manifest.json: You can use this file in its original state

{

"runtime": {

    "majorVersion": "8"

},

"command": "java -jar acc-kafka-streams-1.0.jar",

"isClustered": "true"

}

deployment.json

It contains environment variables corresponding required by the application at runtime. The value is left as a placeholder for you to fill prior to deployment.

{

"instances": "2",

"environment": {

"APP_NAME":"kstreams",

"KAFKA_BROKER":"<as-configured-in-kafka-server-properties>",

"ZOOKEEPER":"<zookeeper-host:port>"

}

}

Here is an example

{

"instances": "2",

"environment": {

"APP_NAME":"kstreams",

"KAFKA_BROKER":"oc-140-44-88-200.compute.oraclecloud.com:9092",

"ZOOKEEPER":"10.190.210.199:2181"

}

}

You need to be careful about the following

  • The value of the KAFKA_BROKER attribute should be the same as (Oracle Compute Cloud instance public DNS) the one you configured in the advertised.listeners attribute of the Kafka server.properties file
  • The APP_NAME attribute should be the same as the one you use while deploying your application using the Application Container Cloud REST API

Please refer to the following documentation for more details on metadata files

Build

Initiate the build process to produce the deployable artifact (a ZIP file)

//Producer application

cd <code_dir>/producer //maven project location

mvn clean package

//Consumer application

cd <code_dir>/producer //maven project location

mvn clean package

The output of the build process is the respective ZIP files for producer (accs-kafka-producer-1.0-dist.zip) and consumer (acc-kafka-streams-1.0-dist.zip) microservices respectively

Upload & deploy

You would need to upload the ZIP file to Oracle Storage Cloud and then reference it in the subsequent steps. Here are the required the cURL commands

Create a container in Oracle Storage cloud (if it doesn't already exist)

curl -i -X PUT -u <USER_ID>:<USER_PASSWORD> <STORAGE_CLOUD_CONTAINER_URL>

e.g. curl -X PUT –u jdoe:foobar "https://domain007.storage.oraclecloud.com/v1/Storage-domain007/accs-kstreams-consumer/"

Upload your zip file into the container (zip file is nothing but a Storage Cloud object)

curl -X PUT -u <USER_ID>:<USER_PASSWORD> <STORAGE_CLOUD_CONTAINER_URL> -T <zip_file> "<storage_cloud_object_URL>" //template

e.g. curl -X PUT –u jdoe:foobar -T acc-kafka-streams-1.0-dist.zip "https://domain007.storage.oraclecloud.com/v1/Storage-domain007/accs-kstreams-consumer/accs-kafka-consumer.zip"

Repeat the same for the producer microservice

You can now deploy your application to Application Container Cloud using its REST API. The Oracle Storage cloud path (used above) will be referenced while using the Application Container Cloud REST API (used for deployment). Here is a sample cURL command which makes use of the REST API

curl -X POST -u joe@example.com:password \

-H "X-ID-TENANT-NAME:domain007" \

-H "Content-Type: multipart/form-data" -F "name=kstreams" \

-F "runtime=java" -F "subscription=Monthly" \

-F "deployment=@deployment.json" \

-F "archiveURL=accs-kstreams-consumer/accs-kafka-consumer.zip" \

-F "notes=notes for deployment" \

https://apaas.oraclecloud.com/paas/service/apaas/api/v1.1/apps/domain007

Note

  • the name attribute used in the curl command should be the same as the APP_NAME attribute used in the manifest.json
  • Repeat the same for the producer microservice

Post deployment

(the consumer application has been highlighted below)

The Applications console

pastedImage_14.png

The Overview sub-section

pastedImage_16.png

The Deployments sub-section

pastedImage_17.png

Testing

Assuming your Kakfa broker is up and running and you have deployed the application successfully, execute the below mentioned steps to test drive your application

Start the producer

Trigger your producer application by issuing a HTTP GET https://my-producer-app-url/producer e.g. https://accs-kafka-producer-domain007.apaas.us.oraclecloud.com/producer. This will start producing (random) CPU metrics for a bunch of (10) machines

pastedImage_20.png

You can stop the producer by issuing a HTTP DELETE on the same URL

pastedImage_24.png

Check the statistics

Cumulative moving average of all machines

Allow the producer to run for a 30-40 seconds and then check the current statistics. Issue a HTTP GET request to your consumer application e.g. https://acc-kafka-streams-domain007.apaas.us.oraclecloud.com/metrics. You’ll see a response payload similar to what’s depicted below

pastedImage_25.png

The information in the payload is as following

  • cpu: the cumulative average of the CPU usage of a machine
  • machine: the machine ID
  • source: this has been purposefully added as a diagnostic information to see which node (instance in the Application Container Cloud) is handling the calculation for a specific machine (this is subject to change as your application scales up/down)

Cumulative moving average of a specific machine

pastedImage_27.png

Scale your application

Increase the number of instances of your application (from 2 to 3)

pastedImage_28.png

Check the stats again and you’ll notice that the computation task is being shared among three nodes now..

That’s all for this blog series.. !

**The views expressed in this post are my own and do not necessarily reflect the views of Oracle

Comments
Post Details
Added on Mar 6 2017
1 comment
4,517 views