Customization of Kafka Connect automatic topic creation

Kafka provides two mechanisms for creating topics automatically. You can enable automatic topic creation for the Kafka broker, and, beginning with Kafka 2.6.0, you can also enable Kafka Connect to create topics. The Kafka broker uses the auto.create.topics.enable property to control automatic topic creation. In Kafka Connect, the topic.creation.enable property specifies whether Kafka Connect is permitted to create topics. In both cases, the default settings for the properties enables automatic topic creation.

When automatic topic creation is enabled, if a Debezium source connector emits a change event record for a table for which no target topic already exists, the topic is created at runtime as the event record is ingested into Kafka.

Differences between automatic topic creation at the broker and in Kafka Connect

Topics that the broker creates are limited to sharing a single default configuration. The broker cannot apply unique configurations to different topics or sets of topics. By contrast, Kafka Connect can apply any of several configurations when creating topics, setting the replication factor, number of partitions, and other topic-specific settings as specified in the Debezium connector configuration. The connector configuration defines a set of topic creation groups, and associates a set of topic configuration properties with each group.

The broker configuration and the Kafka Connect configuration are independent of each other. Kafka Connect can create topics regardless of whether you disable topic creation at the broker. If you enable automatic topic creation at both the broker and in Kafka Connect, the Connect configuration takes precedence, and the broker creates topics only if none of the settings in the Kafka Connect configuration apply.

Disabling automatic topic creation for the Kafka broker

By default, the Kafka broker configuration enables the broker to create topics at runtime if the topics do not already exist. Topics created by the broker cannot be configured with custom properties. If you use a Kafka version earlier than 2.6.0, and you want to create topics with specific configurations, you must to disable automatic topic creation at the broker, and then explicitly create the topics, either manually, or through a custom deployment process.

Procedure
  • In the broker configuration, set the value of auto.create.topics.enable to false.

Set up Kafka Connect

Automatic topic creation in Kafka Connect is controlled by the topic.creation.enable property. The default value for the property is true, enabling automatic topic creation, as shown in the following example:

topic.creation.enable = true

The setting for the topic.creation.enable property applies to all workers in the Connect cluster.

Kafka Connect automatic topic creation requires you to define the configuration properties that Kafka Connect applies when creating topics. You specify topic configuration properties in the Debezium connector configuration by defining topic groups, and then specifying the properties to apply to each group. The connector configuration defines a default topic creation group, and, optionally, one or more custom topic creation groups. Custom topic creation groups use lists of topic name patterns to specify the topics to which the group’s settings apply.

For details about how Kafka Connect matches topics to topic creation groups, see Topic creation groups. For more information about how configuration properties are assigned to groups, see Topic creation group configuration properties.

By default, topics that Kafka Connect creates are named based on the pattern server.schema.table, for example, dbserver.myschema.inventory.

If you don’t want to allow automatic topic creation by Kafka Connect, set the value of topic.creation.enable to false in the Kafka Connect configuration (connect-distributed.properties file or via environment variable CONNECT_TOPIC_CREATION_ENABLE when using Debezium’s container image for Kafka Connect).

Kafka Connect automatic topic creation requires the replication.factor and partitions properties to be set for at least the default topic creation group. It is valid for groups to obtain the values for the required properties from the default values for the Kafka broker.

Configuration

For Kafka Connect to create topics automatically, it requires information from the source connector about the configuration properties to apply when creating topics. You define the properties that control topic creation in the configuration for each Debezium connector. As Kafka Connect creates topics for event records that a connector emits, the resulting topics obtain their configuration from the applicable group. The configuration applies to event records emitted by that connector only.

Topic creation groups

A set of topic properties is associated with a topic creation group. Minimally, you must define a default topic creation group and specify its configuration properties. Beyond that you can optionally define one or more custom topic creation groups and specify unique properties for each.

When you create custom topic creation groups, you define the member topics for each group based on topic name patterns. You can specify naming patterns that describe the topics to include or exclude from each group. The include and exclude properties contain comma-separated lists of regular expressions that define topic name patterns. For example, if you want a group to include all topics that start with the string dbserver1.inventory, set the value of its topic.creation.inventory.include property to dbserver1\\.inventory\\.*.

If you specify both include and exclude properties for a custom topic group, the exclusion rules take precedence, and override the inclusion rules.

Topic creation group configuration properties

The default topic creation group and each custom group is associated with a unique set of configuration properties. You can configure a group to include any of the Kafka topic-level configuration properties. For example, you can specify the cleanup policy for old topic segments, retention time, or the topic compression type for a topic group. You must define at least a minimum set of properties to describe the configuration of the topics to be created.

If no custom groups are registered, or if the include patterns for any registered groups don’t match the names of any topics to be created, then Kafka Connect uses the configuration of the default group to create topics.

See Configuring Debezium topics in the Debezium installation guide on generic topic configuration considerations.

Default group configuration

Before you can use Kafka Connect automatic topic creation, you must create a default topic creation group and define a configuration for it. The configuration for the default topic creation group is applied to any topics with names that do not match the include list pattern of a custom topic creation group.

Procedure
  • To define properties for the topic.creation.default group, add them to the connector configuration JSON, as shown in the following example:

    {
        ...
    
        "topic.creation.default.replication.factor": 3,  (1)
        "topic.creation.default.partitions": 10,  (2)
        "topic.creation.default.cleanup.policy": "compact",  (3)
        "topic.creation.default.compression.type": "lz4"  (4)
    
         ...
    }

    You can include any Kafka topic-level configuration property in the configuration for the default group.

Table 1. Connector configuration for the default topic creation group
Item Description

1

topic.creation.default.replication.factor defines the replication factor for topics created by the default group.
replication.factor is mandatory for the default group but optional for custom groups. Custom groups will fall back to the default group’s value if not set. Use -1 to use the Kafka broker’s default value.

2

topic.creation.default.partitions defines the number of partitions for topics created by the default group.
partitions is mandatory for the default group but optional for custom groups. Custom groups will fall back to the default group’s value if not set. Use -1 to use the Kafka broker’s default value.

3

topic.creation.default.cleanup.policy is mapped to the cleanup.policy property of the topic level configuration parameters and defines the log retention policy.

4

topic.creation.default.compression.type is mapped to the compression.type property of the topic level configuration parameters and defines how messages are compressed on hard disk.

Custom groups fall back to the default group settings only for the required replication.factor and partitions properties. If the configuration for a custom topic group leaves other properties undefined, the values specified in the default group are not applied.

Custom group configuration

You can define multiple custom topic groups, each with its own configuration.

Procedure
  • To define a custom topic group, add a topic.creation.<group_name>.include property to the connector JSON, and list the properties for the custom group after the group name.

    The following example shows sample configurations for the inventory and applicationlogs custom topic creation groups:

    {
        ...
    
        (1)
        "topic.creation.inventory.include": "dbserver1\\.inventory\\.*",  (2)
        "topic.creation.inventory.partitions": 20,
        "topic.creation.inventory.cleanup.policy": "compact",
        "topic.creation.inventory.delete.retention.ms": 7776000000,
    
        (3)
        "topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*",  (4)
        "topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*",  (5)
        "topic.creation.applicationlogs.replication.factor": 1,
        "topic.creation.applicationlogs.partitions": 20,
        "topic.creation.applicationlogs.cleanup.policy": "delete",
        "topic.creation.applicationlogs.retention.ms": 7776000000,
        "topic.creation.applicationlogs.compression.type": "lz4",
    
         ...
    }
Table 2. Connector configuration for custom inventory and applicationlogs topic creation groups
Item Description

1

Defines the configuration for the inventory group.
The replication.factor and partitions properties are optional for custom groups. If no value is set, custom groups fall back to the value set for the default group. Set the value to -1 to use the value that is set for the Kafka broker.

2

topic.creation.inventory.include defines a regular expression to match all topics that start with dbserver1.inventory.. The configuration that is defined for the inventory group is applied only to topics with names that match the specified regular expression.

3

Defines the configuration for the applicationlogs group.
The replication.factor and partitions properties are optional for custom groups. If no value is set, custom groups fall back to the value set for the default group. Set the value to -1 to use the value that is set for the Kafka broker.

4

topic.creation.applicationlogs.include defines a regular expression to match all topics that start with dbserver1.logs.applog-. The configuration that is defined for the applicationlogs group is applied only to topics with names that match the specified regular expression. Because an exclude property is also defined for this group, the topics that match the include regular expression might be further restricted by the that exclude property.

5

topic.creation.applicationlogs.exclude defines a regular expression to match all topics that start with dbserver1.logs.applog-old-. The configuration that is defined for the applicationlogs group is applied only to topics with name that do not match the given regular expression. Because an include property is also defined for this group, the configuration of the applicationlogs group is applied only to topics with names that match the specified include regular expressions and that do not match the specified exclude regular expressions.

Registering custom groups

After you specify the configuration for any custom topic creation groups, register the groups.

Procedure
  • Register custom groups by adding the topic.creation.groups property to the connector JSON, and specifying a comma-separated list of groups.

    The following example registers the custom topic creation groups inventory and applicationlogs:

    {
        ...
    
        "topic.creation.groups": "inventory,applicationlogs",
    
        ...
    }
Completed configuration

The following example shows a completed configuration that includes the configuration for a default topic group, along with the configurations for an inventory and an applicationlogs custom topic creation group:

Example: Configuration for a default topic creation group and two custom groups
{
    ...

    "topic.creation.default.replication.factor": 3,
    "topic.creation.default.partitions": 10,
    "topic.creation.default.cleanup.policy": "compact",
    "topic.creation.default.compression.type": "lz4"
    "topic.creation.groups": "inventory,applicationlogs",
    "topic.creation.inventory.include": "dbserver1\\.inventory\\.*",
    "topic.creation.inventory.partitions": 20,
    "topic.creation.inventory.cleanup.policy": "compact",
    "topic.creation.inventory.delete.retention.ms": 7776000000,
    "topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*",
    "topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*",
    "topic.creation.applicationlogs.replication.factor": 1,
    "topic.creation.applicationlogs.partitions": 20,
    "topic.creation.applicationlogs.cleanup.policy": "delete",
    "topic.creation.applicationlogs.retention.ms": 7776000000,
    "topic.creation.applicationlogs.compression.type": "lz4"
}

Additional resources

For more information on topic auto-creation you can have a look at these resources: