The task of ingesting data into Postgres is a common one in my job as data engineer, and also in my side projects.

As such, I learned a few tricks that here I’m going to discuss, in particular related to ingesting data from Python and merging it with existing rows.

Before starting, I have to say the fastest way to insert data into a Postgres DB is the COPY command, which has a counterpart \copy on the psql CLI tool that is useful to invoke it remotely.

The copy command has some options, but in general it’s used to perform an initial load from a CSV file. It assumes the data is loaded as-is.

This however becomes harder to do when the data has a more complex layout, for example it comes from an API or JSON files, and needs to be merged with data that is already loaded into the DB according to some rule.

In this article I’ll show a real case and discuss some aspects like:

  • using the INSERT... ON CONFLICT functionality, (aka UPSERT) to integrate with existing data
  • setting up a Postgres instance in an automated and reproducible way to make benchmarking a breeze
  • using different libraries: asyncpg, psycopg2 and psycopg3
  • using prepared statements and executemany
  • performing batch insertions in single SQL queries using execute_values
  • using the copy function programmatically, including the binary mode
  • take advantage of UNLOGGED tables

The case: ingest 5 million comments from Reddit

My use case here is to ingest into Postgres a corpus of 5 million subreddit comments. They were downloaded using the Reddit API and this subreddit downloader script I want to later use this data to perform NLP tasks, but for the scope of this topic let’s just say I have a bunch of JSONL files describing comments (and submission) in this form:

  • text: the textual content of a comment
  • score: the number of positive or negative votes the comment received
  • created_at: a timestamp with a time zone
  • permalink
  • username
  • parent_id: the comment or submission this comment is replying to
  • id: an unique id of the comment (used as parent_id by possible replies)
  • retrieved_at: the timestamp of when the data was retrieved

The retrieved_at field is essential: since the script takes many hours to download this data it’s usually running in sessions, and the score changes as people vote the comments, so we are going to see the same comment in different files for different observation times, and we must update the database only when the retrieved_at of an observation is greater than the one already stored.

Upsert the data

While it’s possible, doing multiple queries to check the data already in the DB and decide what to do for each input entry is slow and verbose. Luckily many databases have a function called upsert (update + insert) to perform this operation in one shot. Postgres calls it ON CONFLICT, and it’s a clause to execute in case a query raises a constraint error.

In this case, it looks like this:

    INSERT INTO comment AS old (
        id,
        subreddit,
        author,
        body,
        created_utc,
        parent_id,
        permalink,
        score,
        retrieved_at
      ) VALUES
      ($1, $2, $3,$4, $5, $6, $7, $8, $9)
    ON CONFLICT(id) DO UPDATE SET
        author = EXCLUDED.author,
        subreddit = EXCLUDED.subreddit,
        body = EXCLUDED.body,
        created_utc = EXCLUDED.created_utc,
        parent_id = EXCLUDED.parent_id,
        permalink = EXCLUDED.permalink,
        score = EXCLUDED.score,
        retrieved_at = EXCLUDED.retrieved_at
    WHERE
      EXCLUDED.retrieved_at >= old.retrieved_at;

it starts like a normal insert, but the ON CONFLICT clause specifies what do do when a duplicate on the id column is present. Notice that this clause requires a constraint on the given set of columns to work; in this case id is primary key, so it has a unique index associated. Another possibility is to define your own unique index or use a CHECK expression on that column.

In this query the excluded table is an alias the database uses to refer to the row that we tried to insert, and together with old (an alias defined at the beginning of the query for the target table) allows to define a sophisticated logic to merge the data, in this case a check on the retrieved_at field to preserve the most recent comment snapshot.

The logic can be even more complex, for example one can update flashcards ease factors with an upsert query.

Use Docker to automate the process

Since the goal is to try different approaches to load the data, it’s necessary to be able to easily start and reset a database instance for each one, a process that will be repeated multiple times to reduce the noise in the measurement. To do that, I created a Makefile:


start-db:
	docker run --rm --name reddit-postgres \
	    -v $(shell pwd)/reddit_db:/var/lib/postgresql/data \
	    -p 5442:5432 -e POSTGRES_PASSWORD=mysecretpassword -d postgres:13
	docker exec reddit-postgres sh -c \
      'until pg_isready; do echo "Waiting for the DB to be up..."; sleep 4; done'
	docker exec reddit-postgres sh -c \
      "echo 'CREATE DATABASE reddit;'|psql -U postgres"

tabula-rasa:
	docker kill reddit-postgres || true
	docker rm reddit-postgres || true
	rm -rf $(shell pwd)/reddit_db

it starts a dockerized Postgres instance, mounting the data folder on a local volume, then with pg_isready waits for it to be operational and finally creates a schema. The other command deletes the database and its data, so the whole cycle can be repeated without being effected by previous executions.

Another advantage of dockerizing the DB like this is that one can easily switch Postgres version and keep track of the different DB sizes in different projects using the same tools used for the folder and dataset size, instead of running special queries. Additionally, I tend to have different databases for different projects, and have them running on demand. It’s very valuable during development, and can be used in CI pipelines too (for this case have a look at pg_envwrapper)

Using different libraries: asyncpg, psycopg2 and psycopg3

From Python there’s a plethora of libraries to access a Postgres database, but the most common one is definitely Psycopg2, used both directly and through SQLAlchemy. This library has now a successor called psycopg3. This software is still under development, far from being production-ready, but I decided to give it a try out of curiosity. Among other things, it implements async functionalities and COPY from Python objects. I also decided to try asyncpg. This is an async library that I used for some projects like Grammarquiz, and it implements a good deal of functions. The fact the library is asynchronous is irrelevant for this use case of non-concurrent batch insertion, but turns out to be useful in web applications based on async frameworks like Starlette. Here, the code will await for each call, so it’s used in a sync way.

Abstracting the insertion away

The code to read the input dataset is in a single helper that works as a chunk generator. By calling it, the insertion code receives a batch of 50k elements ready to be inserted, abstracting away the implementation. Additionally, this code measures the time taken by each insertion and eventually produces the median, average and standard deviation of these times.

You can look at the code for the implementation details, all the code below uses this helper and only focus on the actual DB insertion.

Using prepared statements and executemany

Running a lot of INSERT statements one by one is inefficient, because one needs to transfer not only the data but the command itself again and again.

Asyncpg offers a straightforward solution to this, and it’s prepared statements. This is a query that is sent once to the DB and later called with a reference, greatly reducing the overhead. Additionally, Postgres can compile the query once for all making the execution even faster over a great number of calls.

The code is simple:

stm = await conn.prepare("""INSERT INTO
[...]
VALUES ($1, $2, $3,$4, ...)
[...]"""

await stm.executemany(entries)

the first line takes the same query we would run with execute() and instead prepares it. Then the prepared statement is invoked with a list of values.

This produces a median speed of 3700 rows per second over the whole dataset, an excellent result considering how heavy the rows are (they can contain a few KB of text each).

Performing batch insertions in single SQL queries using execute_values

Another option to speed up the repeated insert is the execute_values function in Psycopg2. This function is included in the psycopg2 extras and basically generates long SQL statements containing multiple values instead of separate ones, and using it in many projects I often observed a 10x speed-up compared to naive batch of inserts.

A downside of this functionality is that the code is less readable:

    stm = """
         INSERT INTO comments AS old (
          [...]
         ) VALUES %s
        ON CONFLICT(id) DO UPDATE SET
            [...]
    """

    with conn.cursor() as cur:
        execute_values(cur, stm, values)

Notice the %s placeholder: it’s where psycopg2 will insert the list of VALUES, taking care of splitting it into chunks and adding parentheses. An extra argument can provide a mapping between values and key names in case your values are dictionaries.

This leads to about 3780 rows/second, slightly better than asyncpg. Notice that I repeated the tests multiple times to reduce noise.

execute_batch of prepared statements

As done with asyncpg, psycopg2 offers the execute_batch function that can be combined with a prepared statement.

The code is however a bit terse:

    with conn.cursor() as cur:
        cur.execute("""
        PREPARE stmt AS
          INSERT INTO comment AS old (
           [...]
          VALUES ($1, $2, $3,$4, $5, $6, $7, $8, $9)
           [...]
        """)
        execute_batch(
            cur,
            "EXECUTE stmt (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
            coms
        )
        cur.execute("DEALLOCATE stmt")

Postgres uses natively the $x placeholder, while psycopg2 (and 3) use %s, so we need to use the first syntax to define the prepared statement. In this case we need to combine them, which makes the code harder to follow, and the errors less readable. The speed is the same as with execute_values for my data.

Using the COPY function programmatically, including the binary mode

As said, the COPY command is a very fast way to insert data, the fastest indeed, but is limited in that it can perform only a simple insertion, not the upsert logic we need. However, there’s a simple workaround. We can copy the new data verbatim to a temporary table and then perform a fancy query to merge it with the rest of the data. Since at that point the raw data is already in the database, the merge operation can be very efficient.

Psycopg3 can perform a copy directly from objects that are in memory, a function that is missing in its predecessor.

This allows for lazy generation of the rows, a nice thing to have when processing large datasets

    cur.execute("""
        CREATE TABLE new_comment AS SELECT * FROM comment WHERE FALSE;
        """
    )
    with cur.copy("COPY new_comment FROM STDIN") as copy:
        for c in comments:
            copy.write_row(...some tuple based on c...)
    cur.execute("INSERT INTO comment [...]")
    cur.execute("DROP TABLE new_comment")

every time write_row is invoked a new row is sent to the DB and written directly into new_comment.

Then, the same upsert query as above is used, but instead of the values SELECT * FROM new_comment is used. This way the table is merged into the existing one. Then, the temporary table is dropped and will be recreated at the next cycle for the new insertion.

This leads to 3300 rows/second, which is slower than the simple insertion with asyncpg. Notice however that combined with partitioning this approach can allow for a great level of parallelization, and minimize the disruption when the database is being used. See here an example of partitions being merged

Take advantage of UNLOGGED tables

The code above can be improved by using an UNLOGGED table. A table created as unlogged has no WAL in Postgres, meaning that it will be lost if the database crashes. It has an advantage, however: since no WAL operations are performed, it’s generally faster to write, sometimes a lot faster. This is perfect for the new_comment table above, since its usage is always temporary.

Changing the creation statement above to CREATE UNLOGGED TABLE, the speed goes up to 3450 rows / second. Not bad!

Binary copy

Another interesting feature of psycopg3 is the binary protocol. This allows to send Postgres raw binary data instead of a text representation, so it uses the bandwith better and can be faster to process. Unfortunately, this requires some work to serialize the data. There’s work going on, so presumably this will be easier in the future, but for now I had to do this:


# Unix epoch of 2000-01-01
PSQL_EPOCH = 946684800

def timestamp_to_binary(dt: int):
    return struct.pack(">q", int((dt - PSQL_EPOCH) * 10 ** 6))

 with conn.cursor(binary=True) as cur:
        cur.execute(
            """
        CREATE UNLOGGED TABLE new_comment AS SELECT * FROM comment WHERE FALSE;
        """
        )
        with cur.copy("COPY new_comment FROM STDIN WITH BINARY") as copy:
            for c in comments.values():
                copy.write_row(
                    (
                        ...
                        timestamp_to_binary(c.created_utc)
                        ...
                    )

The cursor is binary thanks to the parameter binary=True, then the copy command has the WITH BINARY clause to use it. The problem arises with the timestamp. Turns out Postgres wants an 8-byte (long long) integer representing the milliseconds from the midnight of January, 1st of year 2000. The function above does this conversion. All the other fields are converted by the library without extra magic. Hopefully, this will also cover timezone-aware datetimes.

This gives a speed of 3700 rows / second, faster than the text mode but at the cost of an increased complexity.

Three important notes on this:

  • This probably will be automated in the future, the library is a work in progress

  • I’m running the DB on my machine, but when the program runs on a different machine the binary protocol the difference with binary can grow

  • Asyncpg also implements a binary copy function. In fact, most operations in asyncpg use the binary protocol.

Conclusion

If you are like me, at this point you expect a chart showing a benchmark of the above solutions. Maybe you even skipped the article to jump to this chart!

I think it would not be a fair comparison, since the numbers are so close and the noise so high (the standard deviation of the speed in my experiments is 30%-50% of the value) and dependent on the environment and the data being ingested, the chart would only create a false sense of precision.

Instead, here’s a table to sum it up:

solution vote note
asyncpg + prepared statement ⭐⭐⭐⭐⭐ Very complete library, well documented and generally fast.
psycopg2 + execute_values ⭐⭐⭐⭐ The fastest in my test, but only slightly. Generally the code is more verbose
psycopg3 (under development) + binary copy ⭐⭐⭐⭐ Fast, probably very convenient for the network, and likely to become easier to use soon. Worth keeping an eye on
psycopg2 + execute_batch ⭐⭐ Code quite long and convolute, without a speed gain for this case
a lot of separate inserts 🤨 Why?