Customizing Topic Auto-Creation
Debezium automatically creates internal topics for offsets, connector status, config
storage and history topics. The destination topics for the captured tables will be
automatically created with a default config by the Kafka brokers when
auto.create.topics.enable
is set to true
.
When topic creation is disabled on the brokers, for example in production environments,
or when the topics need a different configuration then these topics have to be created
upfront either automated in a custom deployment process or manually until Kafka Connect 2.6.
Since Kafka 2.6.0 Kafka Connect supports customizable topic auto-creation.
Set up Kafka Connect
Kafka Connect since Kafka 2.6.0 comes with topic creation enabled:
topic.creation.enable = true
If you don’t want to allow automatic topic creation by connectors you can set this value to |
Configuration
Topic auto-creation is based on groups. Every custom group has an include
and an
exclude
property which are comma-separated lists of regular expressions matching
topic names that should be included or excluded.
You can specify both, |
You don’t have to specify any custom group. When there’s no custom group registered or
the registered group’s include
patterns don’t match the topic which is to be created
then the default config will be used.
You can specify all topic level configuration parameters to customize how topics will be created.
See Configuring Debezium Topics section in the Debezium installation guide on generic topic configuration considerations.
Default group configuration
The default config can be passed in the connector config JSON like:
{
...
"topic.creation.default.replication.factor": 3, (1)
"topic.creation.default.partitions": 10, (2)
"topic.creation.default.cleanup.policy": "compact", (3)
"topic.creation.default.compression.type": "lz4" (4)
...
}
Item | Description |
---|---|
1 |
|
2 |
|
3 |
|
4 |
|
As you can see, you can use every topic level configuration parameter as property.
Note that |
Custom group configuration
You can specify multiple groups. Similar to the default
group you group properties together by
the group name. This will look like that in your connector JSON:
{
...
(1)
"topic.creation.inventory.include": "dbserver1\\.inventory\\.*", (2)
"topic.creation.inventory.partitions": 20,
"topic.creation.inventory.cleanup.policy": "compact",
"topic.creation.inventory.delete.retention.ms": 7776000000,
(3)
"topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*", (4)
"topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*", (5)
"topic.creation.applicationlogs.replication.factor": 1,
"topic.creation.applicationlogs.partitions": 20,
"topic.creation.applicationlogs.cleanup.policy": "delete",
"topic.creation.applicationlogs.retention.ms": 7776000000,
"topic.creation.applicationlogs.compression.type": "lz4",
...
}
Item | Description |
---|---|
1 |
First we define the configuration for the |
2 |
|
3 |
Then we define the configuration for the |
4 |
|
5 |
|
Registering custom groups
Finally, we need to register the two defined custom groups inventory
and applicationlogs
with
the topic.creation.groups
property:
{
...
"topic.creation.groups": "inventory,applicationlogs",
...
}
A complete connector JSON config will look like that:
{
...
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 10,
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.compression.type": "lz4"
"topic.creation.groups": "inventory,applicationlogs",
"topic.creation.inventory.include": "dbserver1\\.inventory\\.*",
"topic.creation.inventory.replication.factor": 3,
"topic.creation.inventory.partitions": 20,
"topic.creation.inventory.cleanup.policy": "compact",
"topic.creation.inventory.delete.retention.ms": 7776000000,
"topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*",
"topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*",
"topic.creation.applicationlogs.replication.factor": 1,
"topic.creation.applicationlogs.partitions": 20,
"topic.creation.applicationlogs.cleanup.policy": "delete",
"topic.creation.applicationlogs.retention.ms": 7776000000,
"topic.creation.applicationlogs.compression.type": "lz4"
}
Additional resources
For more information on topic auto-creation you can have a look at these resources:
-
Debezium Blog: Auto-creating Debezium Change Data Topics
-
Kafka Improvement Proposal about adding topic auto-creation to Kafka Connect: KIP-158 Kafka Connect should allow source connectors to set topic-specific settings for new topics