Custom Converters
This feature is currently in incubating state, i.e. exact semantics, configuration options etc. may change in future revisions, based on the feedback we receive. Please let us know if you encounter any problems while using this extension. |
Datatype Conversion
The Debezium connectors map database column types to corresponding Kafka Connect schema types and convert the column values accordingly. The specific column type mappings are documented for each of the connectors and represent a reasonable default behavior for most of the time. It is still possible that an application requires a specific handling of a certain column type or specific column due to downstream system requirements. For instance you might want to export temporal column values as a formatted string instead of milli-seconds since epoch.
For this purpose Debezium provides an extension point that allows users to inject their own converters based on their business requirements. The converters are written in Java and are enabled and configured via connector properties.
During connector startup, all configured converters are instantiated and placed in an registry. While the connector-internal schema representation is built, every converter is invoked for every column/field of every table/collection and it can register itself to become responsible for the conversion of the given column or field.
Whenever a new change is processed by Debezium, the converter is invoked to execute the actual conversion for the registered columns or fields.
Implementing Converters
The converter implementation is a Java class that implements the interface io.debezium.spi.converter.CustomConverter
:
public interface CustomConverter<S, F extends ConvertedField> {
@FunctionalInterface
interface Converter {
Object convert(Object input);
}
public interface ConverterRegistration<S> {
void register(S fieldSchema, Converter converter);
}
void configure(Properties props);
void converterFor(F field, ConverterRegistration<S> registration);
}
The method configure()
is used to pass converter configuration options into the converter after its instantiation, so it can modify its runtime behaviour for each specific instance.
The method converterFor()
is invoked by Debezium and the converter is required to call registration
in case of taking responsibility for the conversion.
The registration provides the target schema definition and the actual conversion code.
Schemas are currently represented using Kafka Connect’s SchemaBuilder
API.
In the future, an independent schema definition API will be added.
The metadata about the column or field are passed via the field
parameter.
They contain information like table or collection name, column or field name, type name, and others.
The following example implements a simple converter that will:
-
accept one parameter named
schema.name
-
register itself for every column of type
isbn
with-
the target
STRING
schema named according to theschema.name
parameter -
the conversion code that converts the ISBN data to
String
-
public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private SchemaBuilder isbnSchema;
@Override
public void configure(Properties props) {
isbnSchema = SchemaBuilder.string().name(props.getProperty("schema.name"));
}
@Override
public void converterFor(RelationalColumn column,
ConverterRegistration<SchemaBuilder> registration) {
if ("isbn".equals(column.typeName())) {
registration.register(isbnSchema, x -> x.toString());
}
}
}
To compile the code it is necessary to provide dependencies to the debezium-api
and connect-api
modules like:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${version.kafka}</version>
</dependency>
where ${version.debezium}
and ${version.kafka}
are the versions of Debezium and Apache Kafka, respectively.
Configuring and Using Converters
After the converter is developed it has to be deployed in a JAR file side-by-side with the Debezium connector JARs. To enable the converter for a given connector instance it is necessary to provide the connector options like this:
converters=isbn isbn.type=io.debezium.test.IsbnConverter isbn.schema.name=io.debezium.postgresql.type.Isbn
The option converters
is mandatory and enumerates comma-separated symbolic names of converter instances to be used.
The symbolic names are used as a prefix for further configuration options.
isbn.type
(generally <prefix>.type
) is mandatory and is the name of the class that implements the converter.
isbn.schema.name
is a converter parameter that is passed to the converter’s configure
method as schema.name
.