How to stream data from Google PubSub to Kafka with Kafka Connect

Emanuele Pirro
Powerspace Engineering
8 min readFeb 22, 2018

--

At Powerspace we are currently operating a transition from an on-premises environment to Google Cloud. The extensive usage of PubSub brought us reliability and scalability and allowed us to quickly integrate systems hosted on the Google Cloud Platform such as Apache Kafka.

Distributed data stores, Kafka Streams processes as well as Machine Learning pipelines consume the data in real time from the Kafka hub.

In fact, we extensively use Google Dataflow for most of the real-time treatments, however Kafka still represents a cheaper alternative, allowing us to replay our message queues and create a variety of streamers for non-critical jobs. To have a bird’s-eye view of our data streaming processes check out the schema below.

In this article I’ll walk you through the streaming of data from a Google PubSub subscription to an Apache Kafka topic using Kafka Connect. Below a bird’s-eye view of the ideal architecture:

Kafka Connect forwarder PubSub to Kafka

Prerequisites

Assuming you have your Kafka cluster in place somewhere on the cloud, and a PubSub subscription from which you want to consume, you are only a few steps away from building a reliable Kafka Connect forwarder.

Build the PubSub connector package

As a first step, let’s build a PubSub Source connector which is going to be used by the Kafka Connect workers.

Google Cloud Platform provides Source and Sink Kafka connectors for Google PubSub. You just need to clone the repository and build it.

Use an automation tool if it is to your taste. I created a Jenkins job for it.

In any case, you will end up with a .jar file which contains your connector, notably our source connector class CloudPubSubSourceConnector. This package must be accessible from you Kafka Connect worker.

Create your topics in Kafka

Create the destination topics

You want your logs to be produced into a specific Kafka topic I suppose. This is the right moment to create it.

$ kafka-topics.sh --zookeeper $ZK --create --topic views --partitions 3 --replication-factor 3

Create the internal topics

Kafka Connect needs three internal topics to carry on his work: the first for the connector and task configuration offsets, the second to store configuration data, and the third for the connector and task status updates.

Even though Kafka Connect auto creates these topics for you, I would strongly suggest to create them by yourself. A typical configuration for these topics can be the following:

$ kafka-topics.sh --zookeeper $ZK --create --topic pws-offsets --partitions 25 --replication-factor 3
$ kafka-topics.sh --zookeeper $ZK --create --topic pws-config --partitions 1 --replication-factor 3
$ kafka-topics.sh --zookeeper $ZK --create --topic pws-status --partitions 3 --replication-factor 3

The config topic has to be a compacted topic with a single partition and a high replication factor (3x or more). The offsets topic instead, should be highly partitioned (e.g., 25 or 50) to support large Kafka Connect clusters.

Create your Kafka Connect worker

First of all you need to download the latest Kafka binaries. Please refer to the Confluent website for Confluent Kafka.

Create kafka-connect-worker.properties and define your worker configuration:

# broker kafka
bootstrap.servers=$KB1:9092
# the cluster id
group.id=pws-connect-cluster
# schemas are disabled because in PubSub as well as in Kafka,
# data is stored as bytes
# these props can be overridden at topic level
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# here the internal topics I've just created
offset.storage.topic=pws-offsets
offset.storage.replication.factor=3
config.storage.topic=pws-config
config.storage.replication.factor=3
status.storage.topic=pws-status
status.storage.replication.factor=3
# 50000ms as a flush interval is largely enough
offset.flush.interval.ms=50000
offset.flush.timeout.ms=10000
# REST port to be exposed in order to setup our connectors
rest.port=8086
# put in this folder the PubSub connector package
plugin.path=connectors

All the workers having the samegroup.id will join the same cluster, potentially working in parallel. Just start a new process with the same group.id to get a bigger cluster. Careful though, this value must not be in conflict with any other Kafka consumer group id.

Finally, run the worker:

$ k_dist/bin/connect-distributed.sh kafka-connect-worker.properties

Take your time to take a look at the logs. Try to spawn another worker and see how they interact forming a cluster.

Your worker(s) should be running now, but there are no connectors yet.

Add Landoop Connect UI (optional)

You can manage your Kafka Connect cluster from command line and define your connectors with the provided REST API. I would suggest though to use Landoop Connect UI to easily create, start, stop and modify your connectors.

Example of Kafka Connect UI — From https://angel.co/

The following docker command hooks up the UI to Kafka Connect using the REST port we defined in kafka-connect-worker.properties.

$ docker run --rm -it -p 8000:8000 -e "CONNECT_URL=localhost:8086" landoop/kafka-connect-ui

Connect UI should be up and running on port 8000.

There’s another Kafka GUI you want to check out, it’s called Conduktor and it’s free to use for one-broker Kafka clusters.

Create your source connectors

Let’s create an instance of PubSub Source connector which will forward messages from (say) views-to-kafka subscription and views topic.

The one below might be a possible configuration:

name=views-connector
cps.project=pws-project
connector.class=com.google.pubsub.kafka.source.CloudPubSubSourceConnector
# the data source PubSub subscription
cps.subscription=views-to-kafka
# the sink Kafka topic
kafka.topic=views
# the number of partitions of the Kafka topic
kafka.partition.count=3
# PubSub attributes to be used as Kafka message key and timestamp
kafka.key.attribute=uuid
kafka.timestamp.attribute=timestamp
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
# maximum number of tasks that should be created for the connector
tasks.max=1

To actually run the connector on your cluster you can either use the provided Kafka Connect REST API:

$ curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" -d @views-connector.json $KAFKA_REST_URL/api/kafka-connect-1/connectors

…or you can use any GUI of your taste.

Verify your streaming

Alright. Your Kafka Connect cluster is up and the connector deployed. Verify the cluster log and make sure your connectors have been successfully loaded and they are running as expected.

Let’s verify now if our forwarder is working. Create a parallel PubSub subscription (views-to-kafka-2) and publish a message to the related topic. Then, pull it and check it out:

$ gcloud pubsub subscriptions pull views-to-kafka-2 --format=json[{
"ackId": "[a-long-hash]",
"message": {
"attributes": {
"id": "TIsps77TG5OzvqTIlGPTRA==",
"ts": "1519061518694",
"uuid": "8f3b37c4-f273-39e8-8450-a59fa6bce00a"
},
"data": "[some message xyz]",
"messageId": "41159486510616",
"publishTime": "2018-02-19T17:32:00.992Z"
}
}]

Please notice that in my case, I’ve set an extra attribute to the PubSub message: uuid. id attribute is handled internally (and overridden) by PubSub and should not be used for business purposes.uuid is the one that will be used as a key for the related Kafka message (as defined in our connector config).

We expect this message to be streamed and stored to Kafka, right into our views topic. Let’s consume it:

$ kafka-console-consumer.sh --zookeeper $ZK --topic views \
--property print.key = true \
--property print.timestamp = true \
--property key.separator = " ~ "
CreateTime: 1519061518694 ~ 8f3b37c4-f273-39e8-8450-a59fa6bce00a ~
{
"schema": {
"type": "struct",
"fields": [{
"type": "bytes",
"optional": false,
"field":"message"
}, {
"type": "string",
"optional": false,
"field": "id"
}, {
"type": "string",
"optional": false,
"field": "ts"
}],
"optional" :false
},
"payload": {
"message": "[some message xyz]",
"ts": "1519146743085"
}
}

Here it is! The message has been published to Kafka!

Final tips

Just few other suggestions around Kafka Connect and Kafka itself.

Kafka message body structure

You might be wondering about the structure of the Kafka message above. That’s due to the extra PubSub attribute we added and the behaviour of the PubSub connector. The last few lines of the connector repo README tells you more about this:

If the PubSub message doesn’t have any other attributes, the message body is stored as a byte[] for the Kafka message value. However, if there are attributes beyond the Kafka key, the value is assigned a struct schema. Each key in the PubSub message attributes map becomes a field name, with the values set accordingly with string schemas. In this case, the PubSub message body is identified by the field name set in “message”, and has the schema types bytes.

In these cases, to carry forward the structure of data stored in attributes, we recommend using a converter that can represent a struct schema type in a useful way, e.g. JsonConverter.

Be aware of Kafka topic auto creation

Always avoid to let Kafka auto create any kind of topic. Brokers rely on their configuration which dictates default partitioning and replication factor. These settings may not be best suited for all your needs.

default.replication.factor for instance is a broker level configuration which determines the default topic replication factor if not specified, and its default value is 1. This value is often not enough, so always create in advance your topics!

Unclean Leader Election: when to use it

Kafka does a great job in guaranteeing consistency and reliability managing the replicas. However, what happens when all the replicas (for a specific topic) die?

Kafka’s default behaviour is to wait until any ISR (In-Sync Replica) comes back up to re-elect it as a leader and carry on its work. This behaviour favours consistency and durability over availability. We all agree on that.

But what if the ISRs remain unavailable for a very long time? Even if other not in-sync replicas do exist, our system would stay down until any ISR comes back in service.

There is a broker/topic configuration property that allows to support use cases where inconsistency is preferable to downtime: unclean.leader.election. Good to be aware of it.

Setting it to true allows Kafka to re-elect as a leader the first replica (not necessarily in-sync) that comes back to life.

Careful, the drawback is potential data loss. So mind a general rule: keep the replication factor of your topics greater than two whenever is possible.

Recognise lagging brokers

We need to ensure that our topics are replicated and the ISR number is always equal to the number of replicas.

$ kafka-topics.sh --zookeeper $ZK --describe --topic views --under-replicated-partitionsTopic: views PartitionCount: 3 ReplicationFactor: 3
Topic: views Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2
Topic: views Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 2,2

The example above clearly shows that the broker 3 is lagging and certainly taking too long at replicating the partitions.

Check out the configs replica.lag.time.max.ms, replica.lag.max.messages and num.replica.fetchers to tune Kafka and solve this problem.

Resources

PubSub Kafka Connector: Github official repository

Confluent Blog: reference blog around the streaming platform technologies

Thanks to Cedric Sadai and the Stéphane Derosiaux for reviewing this article and thank you all for reading it! I hope you found it useful. 📚

Powerspace is looking for data engineers in Paris to play with Kafka, Google Cloud, Tensorflow and Kubernetes. Get in touch! jobs@powerspace.com.

--

--