Implementing Kafka Connector to Couchbase
At the previous article, we have looked at what the Database Change Protocol (DCP) is and how to implement in a Spring Application. As it was mentioned, handling mutations multiple times or possibility of missing any of them are biggest problems as we implementing the project called dcp-client
that contains a pure java implementation of a Couchbase DCP client.
At this point, the Kafka Connect Couchbase framework that is highly useful and capable of capturing all mutations happen on a document in a bucket and send them to a topic in Kafka without either handling them multiple times or missing any of them. Therefore, in this article, we will focus on the Kafka Connect Couchbase framework.
Kafka Connect Concepts
Before getting in deep of the the Kafka Connect Couchbase framework, we need to understand Kafka Connect concepts. According to their official documentation, it is defined as a framework to stream data into and out of Kafka. It consists of several concepts that are important to understand such as;
- connectors coordinate data streaming. According to the direction the data stream from and to, it is classified as a source connector (publish the event from any source to Kafka) or sink connector (publish the event from Kafka to any source).
- tasks define how data is copied during the transfer either from Kafka to a source or a source to Kafka.
- workers execute the connectors and tasks defined. According to the number of processes that are responsible for executing the connectors and tasks, it separates different types as standalone and distributed workers.
- converters are used by tasks to change data format during the writing to or reading from Kafka. This means data, which is taken from a data source, could be converted to Avro/JSON format and record and keep in this format into Kafka. There are different types of converters(i.e. AvroConverter, JsonConverter, StringConverter, etc.) available.
- transforms configure the connectors to make simple and lightweight modifications to individual messages.
The Kafka Connect Couchbase framework includes the default connectors, tasks, converters, and all other requirements that we need to connect to Couchbase and Kafka.
It extends from the Java DCP Client project above, so no need to be depended on it extra. It provides its own capturing mechanism and does not lose any log during the application is closed. Also, we just focus on the source connector sample of the Kafka Connect framework and capture the changing happened in the Couchbase in this article.
Configure the Kafka Connector & Docker containers
At this step, we will up the Couchbase and Kafka in different containers and track the Country bucket in the Couchbase cluster to be sent into the couchbase.country.bucket.mutations.0
topic in the Kafka container. The following image shows the flow; the Kafka Connect Couchbase application will be working in our local, and then it will listen to the Couchbase container will be exposed on 8091 port and send events the Kafka will be exposed on 9092 port.
Create the containers
Firstly, we create the docker containers and then continue with the Kafka Connect Couchbase application. The following docker-compose.yaml
file will up to three containers we need.
Create the bucket and topic
After creating all containers, we create a bucket called Country in the Couchbase cluster. Also, we create a user for this bucket that will require us to the steps below.
On the other hand, we make sure that we have any Kafka distribution in our local because we will use it in several steps below, starting with creating the topic to collect the events during any mutation and deletion that happen in the Couchbase. The couchbase.country.bucket.mutations.0
topic is created by executing the following command in the terminal.
In this article, the Confluent Kafka will be used in the following steps but there is no big difference between the steps we will follow if you prefer Apache Kafka.
kafka-topics --create --bootstrap-server localhost:9092 --topic couchbase.country.bucket.mutations.0 --partitions 1 --replication-factor 1 --config min.insync.replicas=1
Then we start to consume the topic we just create to check the events published by using the following command.
kafka-console-consumer --bootstrap-server localhost:9092 --topic couchbase.country.bucket.mutations.0
This topic and the user information (i.e. username, password, etc.) will be defined in the country-source-connector.properties
file at the following steps.
Clone the project
Since the bucket and topic are ready, we clone the Kafka Connect Couchbase framework by using the following command;
git clone https://github.com/couchbase/kafka-connect-couchbase.git
Create the configuration properties files
In the Kafka Connect project, under the com.couchbase.connect.kafka
package, the default connectors, tasks, converters, and transforms are implemented. Therefore, we just add the following properties files to define source connector properties that will be needed for connecting to the Couchbase bucket;
- the
country.worker.properties
file, which is the worker configuration properties file, and - the
country-source-connector.properties
file, which is the connector configuration properties file
The country.worker.properties
file provides the configurations about workers. Since workers can run in standalone or distributed mode, all configurations can be classified as the following;
- Common worker configurations; set the host/port pairs for the Kafka cluster, the converter class, the host/port pairs for the REST API, and plugin path (that refers to the location of the application jar and their dependencies).
- Standalone worker mode configurations; set the offset file (that stores connector offset in disk).
- Distributed worker mode configurations; set the topics (for storing offsets, connector and task configurations, and status updates), and consumer group id.
This file can be duplicated if the application is wanted to run distributed, otherwise there is no need to duplicate it with another port.
The country-source-connector.properties
file provides the configurations about connectors. Since connector works as source connector, all configurations are classified as the following;
- Connector configurations; set the connector name, the name/alias of the connector class in Kafka Connector, the stream mode
- Converter configurations; set the converter class.
- Couchbase configurations; set the bucket name, credentials, host/port pairs for Couchbase.
Generated the connector archives
After configuring the properties files, the connector archives are generating by the following command;
maven package
Since we are working locally, we can use the
maven clean install
command instead of themaven package
command. The dependencies defined under thepom.xml
file have been installed in the.m2
repository with theinstall
command. Therefore, the project finds required JAR and third-party dependencies for the plugin.
When we use the package
command, all required JAR files and third-party dependencies will be packaged with the project jar file under the target
directory. Since the dependencies are packaged with the application jar, we should deploy them together to somewhere that doesn’t have dependencies jars.
Execute in Standalone mode
Now, we are ready to execute the Kafka Connector in standalone mode with the following command. It takes the worker configuration properties file (the country.worker.properties
file) as the first parameter and the connector configuration properties file (the country-source-connector.properties
file) as the second parameter.
connect-standalone config/country.worker.properites config/country-source-connector.properties
If it is working correctly, we will see the following message in the terminal
We are inserting a document by executing the following query;
INSERT INTO `Country` (KEY, VALUE)
VALUES ("1", {
"zone": "UTC+3",
"name": "Turkey"
});
Right after the document above is inserted in the bucket, the following event log is shown in the consumer terminal.
The advantage of Kafka Connector
The biggest difference and advantage of Kafka Connect from the Java DCP Client is streaming modes. As it seems at the country-source-connector.properties
file above, it supports two different stream modes, which are SAVED_OFFSET_OR_BEGINNING
and SAVED_OFFSET_OR_NOW
modes, beyond just BEGINNING
and NOW
modes. With these modes, Kafka Connector guarantees that it managing all events without either handling them multiple times or missing any of them as the application needs to restart for any reason.
In the Java DCP Client, in any case of the restart of the application, we;
- read the events, which have been already processed, whenever the application is restarted so we might process the same event multiple times if we select the
BEGINNING
mode. - don’t read the events, which have been streamed during the application is closed, whenever the application is restarted so we might miss the event if we select the
NOW
mode.
In the Kafka Connect, the SAVED_OFFSET_OR_BEGINNING
and SAVED_OFFSET_OR_NOW
modes support the existing behaviors by checking the offsets additionally. To understand this difference, we close the application and insert the following documents;
INSERT INTO `Country` (KEY, VALUE)
VALUES ("2", {
"zone": "UTC+1",
"name": "The Netherlands"
});INSERT INTO `Country` (KEY, VALUE)
VALUES ("3", {
"zone": "UTC+1",
"name": "Germany"
});
Since we define the stream mode as SAVED_OFFSET_OR_BEGINNING
, it just inserts those 2 documents that have been never inserted before and it will not process the first document has been already processed again. Therefore, we restart Kafka Connect and it send 2 events for those 2 documents that have not been processed before.
The consumer has not been closed during this process and we check the terminal.
When we look at the terminal, it just processed 2 new documents without processing the first document again as we expected.
In this article, firstly, we looked the Kafka Connect concepts, and then implemented the Kafka Connector Couchbase framework. It seems highly useful and capable of capturing all mutations happen on a document in a bucket and send them to a topic in Kafka.