In this post we will see how Pega uses Kafka internally to process streams of data, just an overview on streams service.
Tip: If you are into Kafka topic for the first time in my blog, I recommend you go through the Kafka series posts in order.
I am new to kafka, I am thinking of integration kafka message queue with incoming http request and consuming in my web application. Here my question is how to capture the incoming request in tomcat to my kafka and in which format should i consume by my spring MVC application. I am not sure if this is the right way of doing it. Kafka is a distributed system consisting of servers and clients that communicate via a high-performance TCP network protocol. It can be deployed on bare-metal hardware, virtual machines, and containers in on-premise as well as cloud environments. Apache Kafka Tutorial – Learn about Apache Kafka Consumer with Example Java Application working as a Kafka consumer. Step by step guide to realize a Kafka Consumer is provided for understanding. What is a Kafka Consumer? A Consumer is an application that reads data from Kafka Topics. It subscribes to one or more topics in the Kafka cluster.
This tutorial is implemented using Pega personal edition 8.4
What is stream service?
- Stream service was introduced in Pega 7.4.
- Stream service is built on Apache kafka which helps in publish and subscribe streams of data, store stream of data and process the same data in real time.
As we saw from the previous introduction post, to make use of Kafka service, you need the Kafka server and zookeeper server up and running.
How do you start the kafka and zookeeper in Pega?
Starting a stream service is responsible to start the internal Kafka and zookeeper.
So, how do you start the stream service?
First let’s talk about node classification. You can classify the nodes or servers based on its usage.
Node type = stream; This node type is responsible for Kafka based stream processing.
I recommend you to go through the below post on more information on node classification.
- Stream service get automatically started whenever a server (node) of type stream is started.
Note: In the earlier Pega 8 versions, if the existing nodes did not leave or failed to join the Kafka cluster, then server restart will not help in starting the stream service automatically.
We know that, in Pega personal edition, node type is configured as ‘WebUser, BackgroundProcessing, Search,Stream.
So by default, restarting (or starting) the server should automatically start the stream service.
Restart the server. You will see lot of log entries that are related to Kafka server start-up.
The only one Pega personal edition server joined the Kafka cluster as a broker (also the leader). Replication factor is also 1, because personal edition always comes with one server 🙂
The Kafka configurations and data are available in the below folders.
To check the server properties, switch to Kafka server folder inside the personal edition tomcat folder
There you will see all the default configurations.
Important note: All these server, zookeeper properties file are auto-generated on starting the stream service.
Click and open the server.properties file.
You will see all the server configuration details here.
You may get a question, how to override the settings?
Remember, Pega generates these files automatically, when you start the stream service. So it is pretty clear that updating this property file manually will not help, because these files are always re-generated with default values.
Pega provides an option to use DSS to override the default server, broker properties.
We will try our hand at the end of the tutorial
How do we make sure stream service is started?
Step 1: Check the stream services landing page.
Switch to designer studio -> Configure -> Decisioning -> Infrastructure -> Services -> Stream.
You will see the stream service status as “NORMAL” for the only node on the local IP address – 192.168.2.5
Now it is clear that stream service is up running, but still how do we make sure Kafka and Zookeeper are running.
Again, Kafka listens on port 9092 and Zookeeper on 2181.
Check the port status.
Step 1: Open the cmd window using run as administrator.
Step 2: Execute the command = netstat –ab
This command will list all the listening ports in your machine.
You see Zookeepr port 2181 is occupied.
Similarly Kafka port 9092 is also occupied 🙂
Step 3: Stop the stream service manually using the landing page execute option.
You see the Stream service is stopped.
Step 4: Now check the listening ports again using the command prompt – netstat –ab Search fully on the results, you will find the ports are not listening 🙂
So it is pretty clear that when a stream service is started – Kafka and Zookeeper are started automatically in their default ports.
So how do we change the default settings for the Kafka server and Zookeeper settings?
As usual how we override the configuration settings, using DSS.
For tutorial, we will try to change the Zookeeper port from 2181 to 2182.
Step 1: Create a new DSS
Owning Ruleset – Pega-Engine
Setting purpose – prconfig/dsm/services/stream/pyKeeperPort/default
Step 2: Set the value as 2182
Step 3: Restart the server for the DSS configuration to take effect.
On the cmd prompt check the listening ports again – netstat –ab
You see now port 2182 is occupied 🙂
Also the stream service should be up and running again!
For other configuration settings, please visit the below pega link
Coming back to the question, how we do start the stream service?
2. In prior Pega 8 versions, you get a button to Add nodes manually in the stream services landing page
Okay, now what are the infrastructure requirements for running stream service or Kafka?
I recommend you to go through the knowledge article on Streams overview. You will get some basic understanding about disk space, compression settings etc for the stream services.
Let’s see two main differences between the normal Kafka implementation (we saw in the Kafka 2 post) and the Internal Kafka implementation with Pega package.
1. Zookeeper Library
Pega uses a project called ‘Charlatan server’. It is Pega’s own library to substitute the real Zookeeper with kafka implementation.
You will the source files for the charlatan in the Pega git repository
We know that Zookeeper main responsibility is to maintain the metadata about Kafka brokers, topics etc.
So how this charlatan server maintains the metadata.
It is a usual answer 😉 yes, there is a dedicated table to store the metadata for Zookeeping.
Table name – pr_data_stream_nodes
View the rows in the corresponding table.
You see there is one kafka cluster and one Kafka broker. You also see the metadata for all the Kafka topics J
So the understanding is Charlatan server uses this table to maintain the metadata.
The next main difference is
2. Consumer offsets are not stored with the Kafka server.
What is consumer offset?
Kafka Tomcat Download
Copy pasting the content from the Kafka introduction post 😉
Say a consumer read the message from a partition 0 till offset 5. Now the consumer is taken down for maintenance purpose. When the consumer comes back live, from where it should read the message. The ideal situation will be, consumers should read after offset 5.
What is consumer offsets?
- Kafka internally stores the offsets at which the consumer group is reading.
- The offsets are committed in a Kafka topic _consumer_offsets.
- Whenever a consumer in a group processed the data, then it should commit the offsets.
Consumer can choose when to commit the offsets.
But with Pega Kafka implementation, there is no _consumer_offsets topic used!!
In the directory – C:PRPCPersonalEditiontomcatkafka-data
You will see no kafka topic on _consumer_offsets.
So the question is how the consumer offsets are managed???
Again the same usual answer 😉 In a dedicated table!
Table name – pr_data_decision_df_part
You see for every OOTB queue processor rules (consumer of kafka data) maintain their offset in the pxrecordsProcessed column.
When a Kafka message is processed, Pega maintains the offset in this table!
So the entire Architecture between Pega – Kafka will look like this.
There is one more useful topic on troubleshooting the stream services
As a summary
- Stream service helps in publish and subscribe streams of data, store stream of data, and process the same data in real-time.
- Starting a stream service in-turn starts the Kafka server and Zookeeper service, and broker joins the Kafka cluster. designer studio -> Configure -> Decisioning -> Infrastructure -> Services -> Stream landing page shows the available stream services.
- Charlatan server – the library that substitutes the Zookeeper functionality with Kafka implementation.
- Table pr_data_stream_nodes stores the metadata for Kafka topic, broker, cluster etc. Table pr_data_decision_df_part stores the consumer offsets
See you all in my next post on Data sets!
Apache Kafka Tutorial – Learn about Apache Kafka Consumer with Example Java Application working as a Kafka consumer. Step by step guide to realize a Kafka Consumer is provided for understanding.
What is a Kafka Consumer ?
A Consumer is an application that reads data from Kafka Topics. It subscribes to one or more topics in the Kafka cluster and feeds on tokens or messages from the Kafka Topics.
The connectivity of Consumer to Kafka Cluster is known using Heartbeat. Heartbeat is setup at Consumer to let Zookeeper or Broker Coordinator know if the Consumer is still connected to the Cluster. Absence of heartbeat means the Consumer is no longer connected to the Cluster, in which case the Broker Coordinator has to re-balance the load. Heartbeat is an overhead to the cluster. The interval at which the heartbeat at Consumer should happen is configurable by keeping the data throughput and overhead in consideration.
Also, consumers could be grouped and the consumers in the Consumer Group could share the partitions of the Topics they subscribed to. If there are N partitions in a Topic, N consumers in the Consumer Group, and the group has subscribed to a Topic, each consumer would read data from a partition of the topic. This is just a heads up that Consumers could be in groups. We shall go into details of Consumer Group in out next tutorial.
The Consumer API from Kafka helps to connect to Kafka cluster and consume the data streams.
Following is a picture demonstrating the working of Consumer in Apache Kafka.
Kafka Consumer with Example Java Application
Following is a step by step process to write a simple Consumer Example in Apache Kafka.
1. Create Java Project
Create a new Java Project called KafkaExamples, in your favorite IDE. In this example, we shall use Eclipse. But the process should remain same for most of the other IDEs.
2. Add Jars to Build Path
Add following jars to the Java Project Build Path.Note : The jars are available in the lib folder of Apache Kafka download from [[https://kafka.apache.org/downloads]].
3. New SampleConsumer Thread
Create a new class for a sample Consumer, SampleConsumer.java, that extends Thread. So that Consumer could be launched as a new thread from a machine on demand.
4. Properties of Kafka Consumer
Provide the information like Kafka Server URL, Kafka Server Port, Consumer’s ID (Client ID), Serializers for Key and Value.
Note : Make sure that the Server URL and PORT are in compliance with the values in /<kafka_directory>/config/server.properties.
5. Create Kafka Consumer with the Properties
With the properties that have been mentioned above, create a new KafkaConsumer.
6. Subscribe Consumer to a Topic
Consumer has to subscribe to a Topic, from which it can receive records.
7. Fetch Records for the Topic
Fetch Records for the Topic that the Consumer has been subscribed to, using poll(long interval). interval is the time period over which, the records are aggregated.
8. Consume the records
You may consumer the records as per your need or use case. Here, in this tutorial, we shall print those messages to console output.
9. Start Zookeeper and Kafka Cluster
Navigate to the root of Kafka directory and run each of the following commands in separate terminals to start Zookeeper and Kafka Cluster.
10. Start the Kafka Producer
Well! There has to be a Producer of records for the Consumer to feed on. Start the Kafka Producer by following Kafka Producer with Java Example. Also note that, if you are changing the Topic name, make sure you use the same topic name for the Kafka Producer Example and Kafka Consumer Example Java Applications.
11. Start the SampleConsumer thread
Example Java Application that works as Kafka Consumer
Tomcat Kafka Consumer
Kafka Producer Console Output
Kafka Consumer Console Output
Kafka Tomcat Meaning
In this Apache Kafka Tutorial – Kafka Consumer with Example Java Application, we have learnt about Kafka Consumer, and presented a step by step guide to realize a Kafka Consumer Application using Java.