How to sync data from MySQL to Google BigQuery using Debezium and Kafka Connect

Mažvydas Andrijauskas

August 22, 2023


0

Syncing data from a MySQL database to Google BigQuery can be a great way to keep your data up to date and easily accessible for analysis. In this article, we will explore the process of setting up Debezium and Kafka Connect to sync data from MySQL to BigQuery, providing you with all the information you need to get started.

diagram outlining data flow

Why use Debezium and Kafka Connect?

Debezium and Kafka Connect are open-source platforms that provide a powerful solution for streaming data changes in real-time between systems. That real-time interaction allows you to keep your data in sync and easily accessible for various use cases such as real-time analytics, data warehousing, and data pipeline integrations.

High level solution diagram

solution architecture

solution architecture

Technology used

Before we go into the details of setting up Debezium and Kafka Connect to sync data from MySQL to BigQuery, it is important to understand the technologies that you will be using and how they are connected.

Change data capture

Change Data Capture (CDC) is a technique for capturing and recording all the changes made to a database over time. This allows for real-time data replication, making it easy to keep multiple systems in sync.

CDC does this by detecting row-level changes in database source tables, which are characterized as “Insert,” “Update,” and “Delete” events. CDC then notifies other systems or services that rely on the same data.​

Apache Kafka

Apache Kafka is a distributed streaming platform that is used for building real-time data pipelines and streaming applications. It allows for the storage and processing of streams of records in a fault-tolerant way.

Kafka Connect

Kafka Connect is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems, using so-called connectors.​

Kafka connectors are ready-to-use components that can help you to import data from external systems into Kafka topics and export data from Kafka topics into external systems. You can use existing connector implementations for common data sources and syncs or implement our own connectors.​

Debezium

Debezium is an open-source platform that allows you to easily stream changes from your MySQL database to other systems using CDC. It works by reading MySQL binlog to capture data changes in a transactional manner, so you can be sure that you're always working with the most up-to-date data.

By using Debezium, you can capture the changes made to the MySQL database and stream them to Kafka. Data on the changes can then be consumed by Kafka Connect to load the data into BigQuery.

BigQuery setup

1. Creating a BigQuery project and dataset:

a. In the Google Cloud Console, navigate to the BigQuery page and create a new project (Creating and managing projects | Resource Manager Documentation | Google Cloud). We will name it “mysql-bigquery” for this tutorial.

b. Within the project, create a new dataset (Creating datasets | BigQuery | Google Cloud). We will name it “debezium” for this tutorial.

c. Note that Debezium will automatically create tables in the dataset that match the structure of the MySQL tables being monitored.

2. Creating a GCP service account with BigQuery editor role:

a. In the Google Cloud Console, navigate to the IAM & Admin page and create a new service account (Creating and managing service accounts | IAM Documentation | Google Cloud).

b. Give the service account a name and description, then select the “BigQuery Data Editor” role

3. Generating and downloading a key for the service account:

a. In the Google Cloud Console, navigate to the IAM & Admin page, find the service account, and click on the three dots on the right, then select "create key" (Create and manage service account keys | IAM Documentation | Google Cloud).

b. Select JSON as the key type and download the key file.

c. Store the key file securely and use it to authenticate the connector in Kafka Connect when accessing the BigQuery dataset.

Tutorial

To start syncing data from MySQL to BigQuery we will need following components:

Start required services

1. Let's start with creating a new directory. Open Terminal and run:

1
$ mkdir mysql-to-bigquery
2
$ cd mysql-to-bigquery

2. Create a plugins directory

1
$ mkdir plugins

3. Download Debezium mysql plugin:

1
$ wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.1.1.Final/debezium-connector-mysql-2.1.1.Final-plugin.tar.gz -O mysql-plugin.tar.gz
2
3
$ tar -xzf mysql-plugin.tar.gz -C plugins

4. Download BigQuery plugin and put the contents into your plugins directory (in this tutorial we are using version v2.4.3). Now your plugins directory should look like this:

1
$ ls plugins
2
debezium-connector-mysql wepay-kafka-connect-bigquery-2.4.3

5. Create a new file (“docker-compose.yml”) with these configurations:

1
version: '2'
2
services:
3
zookeeper:
4
container_name: zookeeper
5
image: quay.io/debezium/zookeeper:2.1
6
ports:
7
- 2181:2181
8
- 2888:2888
9
- 3888:3888
10
kafka:
11
container_name: kafka
12
image: quay.io/debezium/kafka:2.1
13
ports:
14
- 9092:9092
15
links:
16
- zookeeper
17
environment:
18
- ZOOKEEPER_CONNECT=zookeeper:2181
19
mysql:
20
container_name: mysql
21
image: quay.io/debezium/example-mysql:2.1
22
ports:
23
- 3306:3306
24
environment:
25
- MYSQL_ROOT_PASSWORD=debezium
26
- MYSQL_USER=mysqluser
27
- MYSQL_PASSWORD=mysqlpw
28
connect:
29
container_name: connect
30
image: quay.io/debezium/connect-base:2.1
31
volumes:
32
- ./plugins:/kafka/connect
33
ports:
34
- 8083:8083
35
links:
36
- kafka
37
- mysql
38
environment:
39
- BOOTSTRAP_SERVERS=kafka:9092
40
- GROUP_ID=1
41
- CONFIG_STORAGE_TOPIC=my_connect_configs
42
- OFFSET_STORAGE_TOPIC=my_connect_offsets
43
- STATUS_STORAGE_TOPIC=my_connect_statuses

6. Let's start the services:

1
$ docker-compose up

You should see an output similar to the following:

1
...
2
2023-01-16 15:48:33,939 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser]
3
...
4
2023-01-16 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset -1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
5
2023-01-16 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder]

7. Check if Debezium is running with Kafka Connect API.

1
$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors

An empty array in response shows that there are no connectors currently registered with Kafka Connect.

8. We also have MySQL running with an example database inventory. You can check what tables are there by running:

1
$ docker exec -it mysql mysql -uroot -pdebezium -D inventory -e "SHOW TABLES;"
2
+---------------------+
3
| Tables_in_inventory |
4
+---------------------+
5
| addresses |
6
| customers |
7
| geom |
8
| orders |
9
| products |
10
| products_on_hand |
11
+---------------------+
1
$ docker exec -it mysql mysql -uroot -pdebezium -D inventory -e "SELECT * FROM customers;"
2
+------+------------+-----------+-----------------------+
3
| id | first_name | last_name | email |
4
+------+------------+-----------+-----------------------+
5
| 1001 | Sally | Thomas | sally.thomas@acme.com |
6
| 1002 | George | Bailey | gbailey@foobar.com |
7
| 1003 | Edward | Walker | ed@walker.com |
8
| 1004 | Anne | Kretchmar | annek@noanswer.org |
9
+------+------------+-----------+-----------------------+

Configure Debezium to start syncing MySQL to Kafka

Now let’s configure Debezium to start syncing the inventory database with Kafka.

1. Create a new file (“register-mysql.json”) with these configurations:

1
{
2
"name": "inventory-connector-mysql",
3
"config": {
4
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
5
"tasks.max": "1",
6
"database.hostname": "mysql",
7
"database.port": "3306",
8
"database.user": "root",
9
"database.password": "debezium",
10
"database.server.id": "184054",
11
"topic.prefix": "debezium",
12
"database.include.list": "inventory",
13
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
14
"schema.history.internal.kafka.topic": "schemahistory.inventory"
15
}
16
}

You can find information about these configuration properties in the Debezium documentation.

2. Register a MySQL connector:

1
Bash$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

3. Verify that “inventory-connector” is included in the list of connectors:

1
$ curl -H "Accept:application/json" localhost:8083/connectors/
2
3
["inventory-connector-mysql"]

4. You can now see database contents in Kafka. To see topics, run:

1
$ docker exec -it kafka bash bin/kafka-topics.sh --list
2
3
--bootstrap-server kafka:9092
4
...
5
debezium.inventory.addresses
6
debezium.inventory.customers
7
...
8

Let’s check debezium.inventory.addresses:

1
Bash$ docker exec -it kafka bash bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic debezium.inventory.addresses --from-beginning
2
3
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"debezium.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"debezium.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"debezium.inventory.addresses.Envelope","version":1},"payload":{"before":null,"after":{"id":10,"customer_id":1001,"street":"3183 Moore Avenue","city":"Euless","state":"Texas","zip":"76036","type":"SHIPPING"},"source":{"version":"2.1.1.Final","connector":"mysql","name":"debezium","ts_ms":1673446748000,"snapshot":"first","db":"inventory","sequence":null,"table":"addresses","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1673446748425,"transaction":null}}
4
...
5

For more information on Debezium events, see this Debezium documentation.

Configure Debezium to start syncing data to Google BigQuery

Before you start configuring the BigQuery connector, move the Google BigQuery service account key file (details in previous section) to your working directory and name it “bigquery-keyfile.json”.

1. Once you have the key file, copy it to the Connect container:

1
$ docker cp bigquery-keyfile.json connect:/bigquery-keyfile.json

2. Now create a file register-bigquery.json with these configurations:

1
{
2
"name": "inventory-connector-bigquery",
3
"config": {
4
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
5
"tasks.max": "1",
6
"consumer.auto.offset.reset": "earliest",
7
"topics.regex": "debezium.inventory.*",
8
"sanitizeTopics": "true",
9
"autoCreateTables": "true",
10
"keyfile": "/bigquery-keyfile.json",
11
"schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
12
"project": "mysql-bigquery",
13
"defaultDataset": "debezium",
14
"allBQFieldsNullable": true,
15
"allowNewBigQueryFields": true,
16
"transforms": "regexTopicRename,extractAfterData",
17
"transforms.regexTopicRename.type": "org.apache.kafka.connect.transforms.RegexRouter",
18
"transforms.regexTopicRename.regex": "debezium.inventory.(.*)",
19
"transforms.regexTopicRename.replacement": "$1",
20
"transforms.extractAfterData.type": "io.debezium.transforms.ExtractNewRecordState"
21
}
22
}

You can find information about these configuration properties in the official documentation.

3. To register the BigQuery connector, run:

1
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-bigquery.json

In your BigQuery dataset, you will now be able to see tables matching those in MySQL.

BigQuery table data 1

BigQuery table data 1

Now, select data from your customers table. Emails used are for example purposes only and do not correspond to real individuals.

BigQuery table data 2

BigQuery table data 2

You can create a new entry in MySQL customers table:

1
$ docker exec -it mysql mysql -uroot -pdebezium -D inventory -e "INSERT INTO customers VALUES(1005, \"Tom\", \"Addams\", \"tom.addams@mailer.net\");"

You will see that a new entry has automatically synced to BigQuery.

Bigquery dataset tables

Bigquery dataset tables

Conclusion

You should now have a clear understanding of the benefits of syncing data from MySQL to BigQuery using Debezium and Kafka Connect. With the detailed tutorial found in this article, you will be able to set up and configure Debezium and Kafka Connect yourself.

As a reminder, it's important to test and monitor the pipeline to ensure that data is being synced as expected and to troubleshoot any issues that may arise.

For more information on Debezium and Kafka Connect, you can refer to the following resources: