For those familiar with the Outbox Pattern, CockroachDB provides some unique capabilities for handling these type of architectural patterns. One common method is to use Changefeeds in CockroachDB to send an acknowledgement message back to the originating service that the database transaction was committed. Changefeeds are great in this scenario in that they can be emitted on a record mutation on table (except Import), connect to a message bus like Kafka and emit the payload in a mildly low latent (~100ms) fashion. However, one circumstance of this pattern is having historical records build up in the Outbox table. Fortunately we have a rather nifty solution that can clean up these Outbox tables.
So the goal in this post is to show how you can programmatically remove records from an Outbox table that have been flushed to its sink (i.e Kafka). The idea here is to create a clean up job that removes records where the MVCC timestamp of an Outbox record is adequately past the high water mark of a Changefeed.
If you want to cut the chase, proceed directly to Step 3 to find the TTL statement to execute. Steps 1 and 2 show how you can find the details in the CockroachDB catalog.
Lastly, the steps below uses S3 as a sink instead of Kafka but it easily be repurposed to use Kafka by changing the sink target in the Changefeed.
- Get the list of outbox tables that need TTL
- Find the active
Changefeed
for those tables - For each table, delete rows where the mvcc_internal timestamp of the row is < the high water mark of the
Changefeed
You can run this test using CockroachCloud or CockroachDB. If using CockroachDB, I would recommend trying this using cockroach demo.
cockroach demo --nodes 3 --empty
For this schema, we'll create 1 decoy table and 3 outbox tables. We'll also insert some initial records, create a Changefeed on only 2 outbox tables and then add more data after the Changefeeds are created. We want to show how the SQL TTL script only picks up Outbox tables with Changefeeds. Notice how one Changefeed includes two tables: outbox_t2 and outbox_t3. We'll make sure to pick up both of the tables ;)
set cluster setting kv.rangefeed.enabled = true;
create table test_t0 (i int primary key);
create table outbox_t1 (i int primary key);
create table outbox_t2 (i int primary key);
create table outbox_t3 (i int primary key);
insert into test_t0 values (unique_rowid());
insert into outbox_t1 values (unique_rowid());
insert into outbox_t2 values (unique_rowid());
insert into outbox_t3 values (unique_rowid());
CREATE CHANGEFEED FOR TABLE test_t0 INTO 'experimental-s3://chrisc-test/changefeed/ttl/test?AUTH=implicit' WITH updated, resolved = '1m';
CREATE CHANGEFEED FOR TABLE outbox_t1 INTO 'experimental-s3://chrisc-test/changefeed/ttl/outbox1?AUTH=implicit' WITH updated, resolved = '1m';
CREATE CHANGEFEED FOR TABLE outbox_t2, outbox_t3 INTO 'experimental-s3://chrisc-test/changefeed/ttl/outbox2?AUTH=implicit' WITH updated, resolved = '1m';
insert into test_t0 values (unique_rowid());
insert into outbox_t1 values (unique_rowid());
insert into outbox_t2 values (unique_rowid());
insert into outbox_t3 values (unique_rowid());
Let's also verify the Changefeeds are sending data to our sink:
aws s3 ls s3://chrisc-test/changefeed/ttl/ --recursive | grep ndjson
The output should look like this
2021-08-19 21:46:33 222 changefeed/ttl/outbox1/2021-08-20/202108200145322858970000000000000-fad43b3554ca0a0b-1-5-00000000-outbox_t1-1.ndjson
2021-08-19 22:05:35 180 changefeed/ttl/outbox1/2021-08-20/202108200204308217750000000000001-fad43b3554ca0a0b-1-5-00000001-outbox_t1-1.ndjson
2021-08-19 21:46:33 222 changefeed/ttl/outbox2/2021-08-20/202108200145325129600000000000000-212aeaca8652f3bc-1-8-00000000-outbox_t2-1.ndjson
2021-08-19 21:46:33 222 changefeed/ttl/outbox2/2021-08-20/202108200145325129600000000000000-212aeaca8652f3bc-1-8-00000001-outbox_t3-1.ndjson
2021-08-19 22:05:35 180 changefeed/ttl/outbox2/2021-08-20/202108200204310221340000000000001-212aeaca8652f3bc-1-8-00000002-outbox_t2-1.ndjson
2021-08-19 22:05:35 180 changefeed/ttl/outbox2/2021-08-20/202108200204310221340000000000001-212aeaca8652f3bc-1-8-00000003-outbox_t3-1.ndjson
2021-08-19 21:46:33 222 changefeed/ttl/test/2021-08-20/202108200145319247430000000000000-64e78d6565e58782-1-2-00000000-test_t0-1.ndjson
This is a simple select that queries internal catalog of CockroachDB to find tables that have an 'outbox' prefix.
select table_catalog, table_name
from information_schema.tables
where table_name like 'outbox%';
And the output should be:
table_catalog | table_name
----------------+-------------
defaultdb | outbox_t1
defaultdb | outbox_t2
defaultdb | outbox_t3
(3 rows)
Pretty simple.
Again, let's query CockroachDB's internal catalog to see which tables that are prefixed with 'outbox' also have a running Changefeed.
select
j.job_id,
n."parentID",
n2.name as "database",
j.id,
n.name as "table",
j.high_water_timestamp
from system.namespace n
inner join
(
select job_id, unnest(descriptor_ids) as id, high_water_timestamp
from crdb_internal.jobs
where "job_type" = 'CHANGEFEED'
and "status" = 'running'
) j
on j.id = n.id
inner join
system.namespace n2
on n."parentID" = n2.id
where n."parentID" != 0
and n.name like 'outbox%'
;
The output should be something like this:
job_id | parentID | database | id | table | high_water_timestamp
---------------------+----------+-----------+----+-----------+---------------------------------
686009654600433665 | 50 | defaultdb | 53 | outbox_t1 | 1629424169484287000.0000000000
686009655343546369 | 50 | defaultdb | 55 | outbox_t3 | 1629424169684500000.0000000000
686009655343546369 | 50 | defaultdb | 54 | outbox_t2 | 1629424169684500000.0000000000
(3 rows)
Time: 10ms total (execution 10ms / network 0ms)
And this is really the only statement you need to run. This will create the delete statements of records to delete in the outbox table. The records that will be deleted will compare the high water timestamp of the changefeed to the mvcc timestamp of the record. The high water timestamp of the Changefeed is the checkpoint that indicates records that have been sent to their sink up to a particular timestamp. This is one of my favorite queries...
select
'delete from ' || n2.name || '.' || n.name || ' where crdb_internal_mvcc_timestamp < ' || j.high_water_timestamp::STRING || ';' as "SQL"
from system.namespace n
inner join
(
select job_id, unnest(descriptor_ids) as id, high_water_timestamp
from crdb_internal.jobs
where "job_type" = 'CHANGEFEED'
and "status" = 'running'
) j
on j.id = n.id
inner join
system.namespace n2
on n."parentID" = n2.id
where n."parentID" != 0
and n.name like 'outbox%'
;
The output here will create the delete statements for you to run:
SQL
-------------------------------------------------------------------------------------------------------
delete from defaultdb.outbox_t1 where crdb_internal_mvcc_timestamp < 1629424830259619000.0000000000
delete from defaultdb.outbox_t3 where crdb_internal_mvcc_timestamp < 1629424830459942000.0000000000
delete from defaultdb.outbox_t2 where crdb_internal_mvcc_timestamp < 1629424830459942000.0000000000
(3 rows)
This is the last step where we remove the records that have already been emitted to our sink.
> delete from defaultdb.outbox_t1 where crdb_internal_mvcc_timestamp < 1629425070821775000.0000000000;
delete from defaultdb.outbox_t3 where crdb_internal_mvcc_timestamp < 1629425071022134000.0000000000;
delete from defaultdb.outbox_t2 where crdb_internal_mvcc_timestamp < 1629425071022134000.0000000000;
DELETE 2
Time: 4ms total (execution 4ms / network 0ms)
DELETE 2
Time: 2ms total (execution 2ms / network 0ms)
DELETE 2
Time: 3ms total (execution 3ms / network 0ms)
Lastly let's clean up our files that we placed in S3.
aws s3 rm s3://chrisc-test/changefeed/ttl/ --recursive
You can easily create a program to use the query in Step 3, execute it, then take the SQL output and execute the generated statements. Below is a quick and dirty example of how you can do this in Python.
import psycopg2
conn = psycopg2.connect(database="defaultdb", user="demo", password="demo61304" ,host="localhost", port=26257)
conn.set_session(autocommit=True)
cur = conn.cursor()
sql=""" select 'delete from ' || n2.name || '.' || n.name || ' where crdb_internal_mvcc_timestamp < ' || j.high_water_timestamp::STRING || ';' as "SQL" from system.namespace n inner join ( select job_id, unnest(descriptor_ids) as id, high_water_timestamp from crdb_internal.jobs where "job_type" = 'CHANGEFEED' and "status" = 'running' ) j on j.id = n.id inner join system.namespace n2 on n."parentID" = n2.id where n."parentID" != 0 and n.name like 'outbox%'; """
cur.execute(sql)
ttl=cur.fetchall()
ttlsql='; '.join(map(str,ttl)).replace("'","").replace("(","").replace(";,)","")
cur.execute(ttlsql)
cur.statusmessage
cur.close()
conn.close()
I hope you enjoyed this creative way of removing historical records from your Outbox tables!
Comments
Post a Comment