Let’s talk about TOAST. Toast? No, TOAST!
So what’s that? TOAST (The Oversized-Attribute Storage Technique) is a mechanism in Postgres which stores large column values in multiple physical rows, circumventing the page size limit of 8 KB.
Typically, TOAST storage is transparent to the user, so you don’t really have to care about it. There’s an exception, though: if a table row has changed, any unchanged values that were stored using the TOAST mechanism are not included in the message that Debezium receives from the database, unless they are part of the table’s replica identity. Consequently, such unchanged TOAST column value will not be contained in Debezium data change events sent to Apache Kafka. In this post we’re going to discuss different strategies for dealing with this situation.
When encountering an unchanged TOAST column value in the logical replication message received from the database,
the Debezium Postgres connector will represent that value with a configurable placeholder.
By default, that’s the literal __debezium_unavailable_value
,
but that value can be overridden using the toasted.value.placeholder
connector property.
Let’s consider the following Postgres table definition as an example:
CREATE TABLE customers (
id SERIAL NOT NULL PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE,
biography TEXT
);
Here, the biography TEXT
column is a TOAST-able column, as its value may exceed the page size limit.
So when issuing an update such as update inventory.customers set first_name = 'Dana' where id = 1004;
,
you might receive a data change event in Apache Kafka which looks like this
(assuming the table has the default replica identity):
{
"before": null,
"after": {
"id": 1004,
"first_name": "Dana",
"last_name": "Kretchmar",
"email": "annek@noanswer.org",
"biography": "__debezium_unavailable_value"
},
"source": {
"version": "0.10.0.Final",
"connector": "postgresql",
"name": "dbserver1",
"ts_ms": 1570448151151,
"snapshot": "false",
"db": "sourcedb",
"schema": "inventory",
"table": "customers",
"txId": 627,
"lsn": 34650016,
"xmin": null
},
"op": "u",
"ts_ms": 1570448151611
}
Note how the biography
field (whose value hasn’t changed with the UPDATE
) has the special __debezium_unavailable_value
marker value.
Now, if change event consumers receive that placeholder value,
the question arises how they should react to this.
One way, and certainly the easiest from a consumer’s perspective, is to avoid the situation in the first place.
This can be achieved by using a "replica identity" of FULL
for the Postgres table in question.
Alternatively, the replica identity can be based on an index which comprises the TOAST-able column.
Excluding Unchanged Values
If changing the source table’s replica identity is not an option, one approach for consumers that update a sink datastore (e.g. a database, cache or search index) is to ignore any field of a change event which has the placeholder value.
This means that any column with the placeholder value must be omitted from the update statement executed on the sink datastore.
E.g. in terms of a SQL database, an specific UPDATE
statement must be built and executed which doesn’t contain the column(s) with the placeholder value.
Users of Hibernate ORM may feel reminded of the "dynamic updates" feature which works similar.
Some datastores and connectors might only support full updates, though, in which case this strategy isn’t viable.
Triggers
One interesting variation of the "ignore" approach is the usage of triggers in the sink database: registered for the column that may receive the marker value, they can "veto" such change and just keep the previously stored value instead. The following shows an example of such a trigger in Postgres:
CREATE OR REPLACE FUNCTION ignore_unchanged_biography()
RETURNS TRIGGER AS
$BODY$
BEGIN
IF NEW."biography" = '__debezium_unavailable_value'
THEN
NEW."biography" = OLD."biography";
END IF;
RETURN NEW;
END;
$BODY$ LANGUAGE PLPGSQL;
CREATE TRIGGER customer_biography_trigger
BEFORE UPDATE OF "biography"
ON customers
FOR EACH ROW
EXECUTE PROCEDURE ignore_unchanged_biography();
This will keep the old value for the biography
column if it were to be set to the __debezium_unavailable_value
marker value.
Stateful Stream Processing
An alternative approach to dealing with unchanged TOAST column values is a stateful stream processing application.
This application can persist the latest value of a TOAST column (as obtained from a snapshot, an insert event or an update including the TOAST-able column) in a state store and put the value back into change events with the marker value.
Debezium makes sure that all change events for one particular record always go into the same partition, so they they will be processed in the exact same order as they were created. This ensures that the latest value is available in the statestore when receiving a change event with the marker value.
Kafka Streams with its state store API comes in very handy for building such a service. Based on Quarkus and its extension for building Kafka Streams applications running either on the JVM or natively via GraalVM, a solution could look like this:
@ApplicationScoped
public class TopologyProducer {
private static final Logger LOG = LoggerFactory.getLogger(TopologyProducer.class);
static final String BIOGRAPHY_STORE = "biography-store";
@ConfigProperty(name = "pgtoast.customers.topic")
String customersTopic;
@ConfigProperty(name = "pgtoast.customers.enriched.topic")
String customersEnrichedTopic;
@Produces
public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<JsonObject, String>> biographyStore = (1)
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(BIOGRAPHY_STORE),
new JsonObjectSerde(),
new Serdes.StringSerde()
);
builder.addStateStore(biographyStore);
builder.<JsonObject, JsonObject>stream(customersTopic) (2)
.transformValues(ToastColumnValueProvider::new, BIOGRAPHY_STORE)
.to(customersEnrichedTopic);
return builder.build();
}
class ToastColumnValueProvider implements
ValueTransformerWithKey<JsonObject, JsonObject, JsonObject> {
private KeyValueStore<JsonObject, String> biographyStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
biographyStore = (KeyValueStore<JsonObject, String>) context.getStateStore(
TopologyProducer.BIOGRAPHY_STORE);
}
@Override
public JsonObject transform(JsonObject key, JsonObject value) {
JsonObject payload = value.getJsonObject("payload");
JsonObject newRowState = payload.getJsonObject("after");
String biography = newRowState.getString("biography");
if (isUnavailableValueMarker(biography)) { (3)
String currentValue = biographyStore.get(key); (4)
if (currentValue == null) {
LOG.warn("No biography value found for key '{}'", key);
}
else {
value = Json.createObjectBuilder(value) (5)
.add(
"payload",
Json.createObjectBuilder(payload)
.add(
"after",
Json.createObjectBuilder(newRowState).add(
"biography",
currentValue
)
)
)
.build();
}
}
else { (6)
biographyStore.put(key, biography);
}
return value;
}
private boolean isUnavailableValueMarker(String value) {
return "__debezium_unavailable_value".contentEquals(value);
}
@Override
public void close() {
}
}
}
1 | Set up a state store for storing the latest biography value per customer id |
2 | The actual streaming pipeline: for each message on the customers topic, apply the logic for replacing the TOAST column marker value and write the transformed message to an output topic |
3 | Check whether the biography value from the incoming message is the marker |
4 | If so, get the current biography value for the customer from the state store |
5 | Replace the marker value with the actual value obtained from the state store |
6 | If the incoming message has an actual biography value, put this to the state store |
Now, if a consumer subscribes to the "enriched" topic, it will see any customer change events with the actual value of any unchanged TOAST columns, as materialized from the state store. The fact that the Debezium connector originally emitted the special marker value, is fully transparent at that point.
Primary Key Changes
When a record’s primary key gets updated,
Debezium will create two change events: one "delete" event using the old key and one "insert" event with the new key.
When processing the second event, the stream processing application will not be able to look up the One way to address this would be to expose the original key value e.g. as a message header of the insert event. This requirement is tracked as DBZ-1531; let us know if you’d like to contribute and implement this feature. |
When to Use What?
We’ve discussed different options for dealing with unchanged TOAST column values in Debezium’s data change events. Which one should be used in which case then?
Changing the replica identity to FULL
is the easiest approach by far:
a single configuration to the source table avoids the problem to begin with.
It’s not the most efficient solution, though, and some DBAs might be reluctant to apply this setting.
When using the change events to update some kind of sink data store, it may sound attractive at first to simply omit any field with the special marker value when issuing an update. But this technique has some downsides: not all data stores and the corresponding connectors might support partial updates. Instead there might only be the option to do full updates to a record in the sink data store based on the incoming data. Even when that option exists, it might be sub-optimal. E.g. for a SQL database, a statement just with the available values may be executed. This is at odds with efficient usage of prepared statements and batching, though: as the "shape" of the data may change between two updates to the same table, the same prepared statement cannot be re-used and performance may suffer.
The trigger-based approach isn’t prone to these problems: any updates to a table will have the same number of columns, so the consumer (e.g. a sink connector) may re-use the same prepared statement and batch multiple records into a single execution. One thing to be aware of is the organizational cost associated with this approach: triggers must be installed for each affected column and be kept in sync when table structures change. This must be done individually in each sink datastore, and not all stores have may have support for triggers to begin with. But where possible, triggers can be a great solution.
Finally, stream processing makes the usage of TOAST-able columns and the absence of their values in update events fully transparent to consumers. The enrichment logic is implemented in a single place, from which all the consumers of the change event stream benefit, without the need for individual solutions in each one of them. Also, it’s the only viable solution if consumers themselves are stateless and don’t have any way to materialize the last value of such column, e.g. when streaming change events to a browser via web sockets or GraphQL subscriptions. The price to pay is the overhead of maintaining and operating a separate service.
On a side note, such stream processing application might also be provided as a configurable, ready-to-use component coming as a part of the Debezium platform. This might be useful not only for Postgres, but also when thinking about other Debezium connectors. For instance, in case of Cassandra, change events will only ever contain the updated fields; a similar mode could be envisioned for MySQL by supporting its "non full" binlog mode. In both cases, a stateful stream processing service could be used to hydrate full data change events based on earlier row state retrieved from a local state store and an incoming "patch" style change event. If you think that’d be a useful addition to Debezium, please let us know.
As always, there are no silver bullets: you should choose a solution based on your specific situation and requirements. As a starting point you can find a basic implementation of the trigger and Kafka Streams approaches in the Debezium examples repository.
Which approach would you prefer? Or perhaps you have even further alternatives in mind? Tell us about it in the comments below.
Many thanks to Dave Cramer and Jiri Pechanec for their feedback while working on this post and the accompanying example code!
About Debezium
Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.
Get involved
We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.