Capture Data Mutation in Couchbase by using the Database Change Protocol (DCP)

Burakcan Ekici
8 min readJan 10, 2022

--

In this article, we will look at a special memory-based replication protocol called Database Change Protocol (DCP) and then continue with integration of a project called dcp-client that contains a pure java implementation of a Couchbase DCP client.

In the following article, we have mentioned how to detect mutations by using the Eventing Service in the Couchbase but, in this article, we will focus on detecting mutations by using a memory-based replication protocol instead.

What is Database Change Protocol (DCP)?

In the official document, the Database Change Protocol (DCP) is defined as a memory-based internal replication protocol that is ordering, resumable, and consistent.

  • It orders all mutations to happen and it helps to understand where we left.
  • After failures, it provides optimizing.
  • It is capable of producing consistent snapshots.

Besides these capabilities above, the thing it makes DCP different from other replication techniques is streaming data changes that are made to documents in memory immediately without writing it to disk. This means that it avoids checking the record that wants to be changed and alter it in place on the disk. Since the DCP is memory-based, latency for view updates could be reduced significantly (high-performance).

Since each mutation or deletion that happens on a document generates a replication event, in DCP, only the mutation and deletion messages can be streamed. The mutation includes all changes on either content or key causing by updating or inserting operations. On the other hand, the deletion just includes deleting operations properly in its name.

Without going deeper, we need to look few concepts to understand DCP more clearly. The vBucket identifier and the mutation sequence number are two important concepts to provide to identify the mutations and so the DCP. Therefore, we check below what the vBucket is and how is DCP related to the vBuckets identifier and mutation sequence number.

Understand the vBuckets

Basically, Couchbase is a distributed key/value store and the vBuckets are the concept that provides distribution of the data across clusters. The following image is good to understand vBuckets. Each document is kept as a key/value pair in a bucket. Couchbase splits these keys into the different vBuckets (thinks like partitions) and the vBuckets assigned to nodes to sign the cluster this document is kept in.

According to the following image, we can understand that the document whose key is user_32 is kept in the Cluster-Node3 because its key kept in the vBucket3 and it assigned to the Cluster-Node3 cluster.

taken from https://docs.couchbase.com/server/current/learn/buckets-memory-and-storage/vbuckets.html

Understand the vBuckets identifier & mutation sequence number

These are two important concepts that are associated directly with vBuckets and provide to identify the mutations;

  • the vBucket identifier (UUID); refers to a unique identifier for each vBucket
  • the mutation sequence number; refers to a sequence number assigned to each mutation to a vBucket.

Both these parameters are used for realizing the last mutated version in the specific vBucket.

Couchbase only stores the latest version of a document

One of the biggest problems we faced while evaluating the change capture is getting the old version of the document updated. In the Change Data Capture(CDC) framework (such as Debezium), it provides the old version of a document and its properties, which are just updated, can seem clearly in the log record. But in Couchbase, they don’t support versioning of documents, the Data Service, which provides access to data, just stores the latest version of the document. Therefore, when any mutation happens and DCP streams to a client, it shows just the latest version of the relevant document.

Couchbase Java DCP Client

After looking at the DCP in Couchbase, we implementing some code to stream events from the Couchbase to look at the log record. It is good to understand how the streamed record looks exactly. This project called dcp-client contains a pure java implementation of a Couchbase DCP client.

The dcp-client project, which mentioned in the link above, is not officialy supported by Couchbase and it is used a base and fundamental for higher-level or officially supported libraries such as connectors.

Firstly, we add the following dependency to the pom.xml file with the latest version that is 0.36.0 when we write this article.

<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>dcp-client</artifactId>
<version>0.36.0</version>
</dependency>

Create the bucket on the Couchbase

Before starting coding staff, we create the Country bucket to track the mutations at the steps below. There is not a specific thing to do here, we just add an index after creating the bucket. We will track this bucket from the application that will be implemented below.

The username, password, and bucket properties should be kept because they will be used during creating the Clientobject, which is necessary for connecting to the Couchbase cluster, at the following step.

Create Service and Controller class

Now, we can start implementing the DCPService class. In the code below, we will;

  • define the client object that manages the connection to the Couchbase bucket with the configuration we set. (client configurations)
  • override the onEvent method inside the dataEventHandler to define how the event is handled whenever any mutation or deletion happens in the bucket. In this context, if a mutation event happens, the DcpMutationMessage will be filled and the event will be written to the terminal after being converted to a JsonObject. On the other hand, if a deletion happens, the DcpDeletionMessage will be filled and the event will be written to the terminal. (event handlers)
  • connect to the sockets, initialize the state (StreamFrom.BEGINNING, StreamTo.INFINITY) for streaming, and start the streaming.

The DCPService class will be called from the DCPController class. In this controller, there is an endpoint to open streaming as keeps running continuously for tracking mutations. This means that the application will start streaming without shutdown after we call the /dcp/start-loop endpoint with POST method.

Check the results

We are ready for checking the results so we start the project, open swagger, and call the endpoint.

Since we start a loop by calling the /dcp/start-loop endpoint, it will not stop and the thread keeps going until the project is finished. Shutdown properly would be better but this way is helpful for looking at the record stream. As the following image is taken from the terminal, after calling the endpoint, it is starting the Saw .. changes so far. message in 1 second period.

Couchbase has a fixed amount of the vBuckets and it usually sets to 1024. Since the DCP tracks the vBuckets, in the first message, streaming is started for 1024 partitions.

terminal log

We are inserting a document by executing the following query and check the log;

INSERT INTO `Country` (KEY, VALUE)
VALUES ("1", {
"zone": "UTC+3",
"name": "Turkey"
});

Every insertion in the bucket is handled as a mutation operation, the DcpMutationMessage will be filled, and it converts to JsonObject and prints the terminal. Aside from the mutation log, we could see the difference at the change counter because, right after the mutation message, the Saw 1 changes so far. message is being written to the terminal.

terminal log

State is StreamFrom.BEGINNING vs. StreamFrom.NOW

The state could be initialized as StreamFrom.BEGINNING or StreamFrom.NOW mode. Among these modes, if it is initialized

  • as StreamFrom.BEGINNING, independent from whether the mutations happened before or after the streaming, all mutations are being taken when the streaming is started.
  • as StreamFrom.NOW, only mutations, which happen after the streaming, are being taken when the streaming is started.

To understand this state, we insert 2 documents into the bucket before we starting the streaming by executing the following queries;

INSERT INTO `Country` (KEY, VALUE)
VALUES ("1", {
"zone": "UTC+3",
"name": "Turkey"
});
INSERT INTO `Country` (KEY, VALUE)
VALUES ("2", {
"zone": "UTC+1",
"name": "The Netherlands"
});

Since we set the state as StreamFrom.BEGINNING, we will see 2 mutation events, which happened before starting the streaming, in the terminal right after we trigger the endpoint like the following image. Then we restart the project and triggered the endpoint from swagger. These 2 mutations are written to the terminal again.

terminal log

Now, we change the states as StreamFrom.NOW, start the project, and triggered the endpoint from swagger. At this time, we see the Saw 0 changes so far. message in the terminal. As we expect, because they happened before starting the stream, the first 2 mutations couldn’t seem and be written to the terminal. We insert the third document while the stream continues.

INSERT INTO `Country` (KEY, VALUE)
VALUES ("3", {
"zone": "UTC+1",
"name": "Germany"
});

As the result, only this document is written to the terminal as a mutation.

terminal log

If we restart the project in the StreamFrom.NOW mode, the last mutation, which we just made, couldn’t seem like a mutation and the project see 0 change in the bucket as the following image.

terminal log

As you understand, we should be careful during both BEGINNING and NOW mode is being used. When we use;

  • the BEGINNING mode, it returns all mutations and deletions from creating the bucket and we might read the same event more than one time if any crash happens in the project.
  • the NOW mode, it causes missing events circumstance because it doesn’t return mutations and deletions, which happened during any crash period in the project, whether we handle it.

Beyond these circumstances, the other biggest concerns are, we couldn’t

  • understand which attribute in the relevant document has been changed
  • check the latest version, which is before mutation, of the relevant document.

These problems cause by the Couchbase, as has been mentioned at top of the article, the Data Service only keeps the latest version of the document so there is no way to determine the property that might be changed.

On the other hand, managing all events without either handling them multiple times or missing any of them is another problem that should be taken care of. At this point, the Kafka Connect Couchbase framework provides to stream data into and out of Kafka to manage the problem mentioned. The following article shows how to implement the Kafka Connect Couchbase framework.

In this article, we focusing on the DCP what it is capable of/not capable of, and how to implement the Kafka Connector to the Couchbase cluster. The project files, which contain all code we implemented at the steps above, are available here.

--

--