Skip to main content

Clean Up Your Outbox Tables With Programmatic TTL

Clean Up Your Outbox Tables With Programmatic TTL

For those familiar with the Outbox Pattern, CockroachDB provides some unique capabilities for handling these type of architectural patterns. One common method is to use Changefeeds in CockroachDB to send an acknowledgement message back to the originating service that the database transaction was committed. Changefeeds are great in this scenario in that they can be emitted on a record mutation on table (except Import), connect to a message bus like Kafka and emit the payload in a mildly low latent (~100ms) fashion. However, one circumstance of this pattern is having historical records build up in the Outbox table. Fortunately we have a rather nifty solution that can clean up these Outbox tables.

So the goal in this post is to show how you can programmatically remove records from an Outbox table that have been flushed to its sink (i.e Kafka). The idea here is to create a clean up job that removes records where the MVCC timestamp of an Outbox record is adequately past the high water mark of a Changefeed.

If you want to cut the chase, proceed directly to Step 3 to find the TTL statement to execute. Steps 1 and 2 show how you can find the details in the CockroachDB catalog.

Lastly, the steps below uses S3 as a sink instead of Kafka but it easily be repurposed to use Kafka by changing the sink target in the Changefeed.

High Level Steps

  • Get the list of outbox tables that need TTL
  • Find the active Changefeed for those tables
  • For each table, delete rows where the mvcc_internal timestamp of the row is < the high water mark of the Changefeed

You can run this test using CockroachCloud or CockroachDB. If using CockroachDB, I would recommend trying this using cockroach demo.

cockroach demo --nodes 3 --empty

Step 0 - Create A Schema To Test With

For this schema, we'll create 1 decoy table and 3 outbox tables. We'll also insert some initial records, create a Changefeed on only 2 outbox tables and then add more data after the Changefeeds are created. We want to show how the SQL TTL script only picks up Outbox tables with Changefeeds. Notice how one Changefeed includes two tables: outbox_t2 and outbox_t3. We'll make sure to pick up both of the tables ;)

set cluster setting  kv.rangefeed.enabled = true;

create table test_t0 (i int primary key);

create table outbox_t1 (i int primary key);
create table outbox_t2 (i int primary key);
create table outbox_t3 (i int primary key);

insert into test_t0 values (unique_rowid());
insert into outbox_t1 values (unique_rowid());
insert into outbox_t2 values (unique_rowid());
insert into outbox_t3 values (unique_rowid());

CREATE CHANGEFEED FOR TABLE test_t0 INTO 'experimental-s3://chrisc-test/changefeed/ttl/test?AUTH=implicit' WITH updated, resolved = '1m';

CREATE CHANGEFEED FOR TABLE outbox_t1 INTO 'experimental-s3://chrisc-test/changefeed/ttl/outbox1?AUTH=implicit' WITH updated, resolved = '1m';

CREATE CHANGEFEED FOR TABLE outbox_t2, outbox_t3 INTO 'experimental-s3://chrisc-test/changefeed/ttl/outbox2?AUTH=implicit' WITH updated, resolved = '1m';

insert into test_t0 values (unique_rowid());
insert into outbox_t1 values (unique_rowid());
insert into outbox_t2 values (unique_rowid());
insert into outbox_t3 values (unique_rowid());

Let's also verify the Changefeeds are sending data to our sink:

aws s3 ls s3://chrisc-test/changefeed/ttl/ --recursive | grep ndjson

The output should look like this

2021-08-19 21:46:33        222 changefeed/ttl/outbox1/2021-08-20/202108200145322858970000000000000-fad43b3554ca0a0b-1-5-00000000-outbox_t1-1.ndjson
2021-08-19 22:05:35        180 changefeed/ttl/outbox1/2021-08-20/202108200204308217750000000000001-fad43b3554ca0a0b-1-5-00000001-outbox_t1-1.ndjson
2021-08-19 21:46:33        222 changefeed/ttl/outbox2/2021-08-20/202108200145325129600000000000000-212aeaca8652f3bc-1-8-00000000-outbox_t2-1.ndjson
2021-08-19 21:46:33        222 changefeed/ttl/outbox2/2021-08-20/202108200145325129600000000000000-212aeaca8652f3bc-1-8-00000001-outbox_t3-1.ndjson
2021-08-19 22:05:35        180 changefeed/ttl/outbox2/2021-08-20/202108200204310221340000000000001-212aeaca8652f3bc-1-8-00000002-outbox_t2-1.ndjson
2021-08-19 22:05:35        180 changefeed/ttl/outbox2/2021-08-20/202108200204310221340000000000001-212aeaca8652f3bc-1-8-00000003-outbox_t3-1.ndjson
2021-08-19 21:46:33        222 changefeed/ttl/test/2021-08-20/202108200145319247430000000000000-64e78d6565e58782-1-2-00000000-test_t0-1.ndjson

Step 1 - Get the list of Outbox tables that need TTL

This is a simple select that queries internal catalog of CockroachDB to find tables that have an 'outbox' prefix.

select table_catalog, table_name
from information_schema.tables
where table_name like 'outbox%';

And the output should be:

table_catalog | table_name
----------------+-------------
defaultdb     | outbox_t1
defaultdb     | outbox_t2
defaultdb     | outbox_t3
(3 rows)

Pretty simple.

Step 2 - Find the active Changefeeds for the Outbox tables

Again, let's query CockroachDB's internal catalog to see which tables that are prefixed with 'outbox' also have a running Changefeed.

select
j.job_id,
n."parentID",
n2.name as "database",
j.id,
n.name as "table",
j.high_water_timestamp
from system.namespace n
inner join
(
  select job_id, unnest(descriptor_ids) as id, high_water_timestamp
  from crdb_internal.jobs
  where "job_type" = 'CHANGEFEED'
    and "status" = 'running'
) j
on j.id = n.id
inner join
system.namespace n2
on n."parentID" = n2.id
where n."parentID" != 0
  and n.name like 'outbox%'
;

The output should be something like this:

job_id             | parentID | database  | id |   table   |      high_water_timestamp
---------------------+----------+-----------+----+-----------+---------------------------------
686009654600433665 |       50 | defaultdb | 53 | outbox_t1 | 1629424169484287000.0000000000
686009655343546369 |       50 | defaultdb | 55 | outbox_t3 | 1629424169684500000.0000000000
686009655343546369 |       50 | defaultdb | 54 | outbox_t2 | 1629424169684500000.0000000000
(3 rows)

Time: 10ms total (execution 10ms / network 0ms)

Step 3 - Create those TTL-ish delete statements

And this is really the only statement you need to run. This will create the delete statements of records to delete in the outbox table. The records that will be deleted will compare the high water timestamp of the changefeed to the mvcc timestamp of the record. The high water timestamp of the Changefeed is the checkpoint that indicates records that have been sent to their sink up to a particular timestamp. This is one of my favorite queries...

select
'delete from ' || n2.name || '.' || n.name || ' where crdb_internal_mvcc_timestamp < ' || j.high_water_timestamp::STRING || ';' as "SQL"
from system.namespace n
inner join
(
  select job_id, unnest(descriptor_ids) as id, high_water_timestamp
  from crdb_internal.jobs
  where "job_type" = 'CHANGEFEED'
    and "status" = 'running'
) j
on j.id = n.id
inner join
system.namespace n2
on n."parentID" = n2.id
where n."parentID" != 0
  and n.name like 'outbox%'
;

The output here will create the delete statements for you to run:

SQL
-------------------------------------------------------------------------------------------------------
delete from defaultdb.outbox_t1 where crdb_internal_mvcc_timestamp < 1629424830259619000.0000000000
delete from defaultdb.outbox_t3 where crdb_internal_mvcc_timestamp < 1629424830459942000.0000000000
delete from defaultdb.outbox_t2 where crdb_internal_mvcc_timestamp < 1629424830459942000.0000000000
(3 rows)

Step 4 - Run the delete statements

This is the last step where we remove the records that have already been emitted to our sink.

> delete from defaultdb.outbox_t1 where crdb_internal_mvcc_timestamp < 1629425070821775000.0000000000;
delete from defaultdb.outbox_t3 where crdb_internal_mvcc_timestamp < 1629425071022134000.0000000000;
delete from defaultdb.outbox_t2 where crdb_internal_mvcc_timestamp < 1629425071022134000.0000000000;
DELETE 2

Time: 4ms total (execution 4ms / network 0ms)

DELETE 2

Time: 2ms total (execution 2ms / network 0ms)

DELETE 2

Time: 3ms total (execution 3ms / network 0ms)

Step 5 - Clean Up

Lastly let's clean up our files that we placed in S3.

aws s3 rm s3://chrisc-test/changefeed/ttl/ --recursive

Conclusion

You can easily create a program to use the query in Step 3, execute it, then take the SQL output and execute the generated statements. Below is a quick and dirty example of how you can do this in Python.

import psycopg2
conn = psycopg2.connect(database="defaultdb", user="demo", password="demo61304" ,host="localhost", port=26257)
conn.set_session(autocommit=True)
cur = conn.cursor()
sql=""" select 'delete from ' || n2.name || '.' || n.name || ' where crdb_internal_mvcc_timestamp < ' || j.high_water_timestamp::STRING || ';' as "SQL" from system.namespace n inner join ( select job_id, unnest(descriptor_ids) as id, high_water_timestamp from crdb_internal.jobs where "job_type" = 'CHANGEFEED' and "status" = 'running' ) j on j.id = n.id inner join system.namespace n2 on n."parentID" = n2.id where n."parentID" != 0 and n.name like 'outbox%'; """
cur.execute(sql)
ttl=cur.fetchall()
ttlsql='; '.join(map(str,ttl)).replace("'","").replace("(","").replace(";,)","")
cur.execute(ttlsql)
cur.statusmessage
cur.close()
conn.close()

I hope you enjoyed this creative way of removing historical records from your Outbox tables!

view raw outbox_ttl.md hosted with ❤ by GitHub

Comments

Popular posts from this blog

Part & Pin for the Inverted Index Win

Part & Pin for the Inverted Index Win "How to Partition Inverted Indexes in CockroachDB" Recently I worked with a CockroachDB user who had a GDPR locality requirement.  Naturally CockroachDB was a fit since it sports geo-partitioning capabilities for data and objects.  However, the user ran into a snafu where they could not partition and localize data in inverted indexes.  The example they had was customer data existed in a JSONB data column that had an inverted index on it.  The data in the table was already partitioned by country and pinned to servers / nodes that were in that country.  So they were good with table partitioning and pinning.  But the data in the inverted index could theoretically be distributed anywhere in the cluster and also needed to be partitioned and pinned to the proper locality.  I.e.  A German customer must remain in Germany.  This caught me by surprise as well that you can not partition inverted indexes o...