Simple and manual sharding on PostgreSQL.

- 11 mins

Concept

In the current concept, we are going to combine Foreign tables inheritance with the postgres_fdw extension, both being already available features since 9.5 version.

Cross-node partitioning allows a better data locality and a more scalable model than keeping local partitions. Being said, the data will be split into several nodes and organized using a particular key, which will determine in which shard data will be allocated. For the current POC, we are going to specify the shardKey , which is a simple char(2) type.

How this was done before

Until today, the only way to perform findings over this method, was from the application layer, by issuing queries directly to the nodes by keeping certain deterministic way as {1} or using a catalog table as {2} (NOTE: the bellow examples are using pseudo code).

{1}

query = "SELECT name,lastname FROM " + relation + partition " WHERE " id =" + person_id

{2}:

shard = query("SELECT shard FROM catalog WHERE key = " + person_id)
query = "SELECT name,lastname FROM " + relation + shard " WHERE " id =" + person_id

How we are going to implement this now

As foreign tables (FT) does not hold any data, it is possible to keep copies aroud all the databases involved and also in separated instances if this is necessary.

All the operations against the table will be done through the parent table of the FT tree tables and Postgres itself will determine the destination FT using the constraint exclusion feature, which will be detailed further.

For HA, you are limited on the data nodes to implement any other replication solution available in the core version. To be fair, 9.6 supports streaming replication and logical decoding, which is used by the pglogical tool for providing advanced logical replication per table basis.

TPS

[Fig. 1] Manual sharding with FDW current implementation.

Foreign tables

Foreign tables do not contain data by itselves and they only reference to a external table on a different Postgres database. There are plenty of different extensions allowing external tables on different data store solutions, but in this particular article we are going to focus on postgres_fdw as we want to explore more about condition pushdowns, which makes queries against these tables more performant on more complex queries.

A more extensive benchmark can be found at my next article.

The framework underlying for the Foreign Data Wrappers, support both reads and write operations. postgres_fdw is not the exception and does also support condition pushdown for avoiding large scans on the source tables.

On each database holding the FT, you need to invoke the extension creation:

CREATE EXTENSION postgres_fdw;

FT have two main elements,necessary to point correctly both in source as in user privileges. If you are paranoic enough, you’ll prefer to use unprivileged users with limited grants over the tables that you use.

{1}

CREATE SERVER shard1_main FOREIGN DATA WRAPPER postgres_fdw
  OPTIONS(host '127.0.0.1',port '5434',dbname 'shard1');
CREATE SERVER shard2_main FOREIGN DATA WRAPPER postgres_fdw
  OPTIONS(host '127.0.0.1',port '5435',dbname 'shard2');

-- Slaves
CREATE SERVER shard1_main_replica FOREIGN DATA WRAPPER postgres_fdw
  OPTIONS(host '127.0.0.1',port '7777',dbname 'shard1');
CREATE SERVER shard2_main_replica FOREIGN DATA WRAPPER postgres_fdw
    OPTIONS(host '127.0.0.1',port '8888',dbname 'shard2');

{2}

-- User mapping
CREATE USER MAPPING FOR postgres SERVER shard1_main OPTIONS(user 'postgres');
CREATE USER MAPPING FOR postgres SERVER shard2_main OPTIONS(user 'postgres');

CREATE USER MAPPING FOR postgres SERVER shard1_main_replica OPTIONS(user 'postgres');
CREATE USER MAPPING FOR postgres SERVER shard2_main_replica OPTIONS(user 'postgres');

The FT definition is indeed pretty straightforward if we don’t want to do any further column filtering:

CREATE TABLE main (shardKey char(2), key bigint, avalue text);

CREATE FOREIGN TABLE main_shard01
       (CHECK (shardKey = '01'))
       INHERITS (main)
       SERVER shard1_main;

CREATE FOREIGN TABLE main_shard02
       (CHECK (shardKey = '02'))
       INHERITS (main)
       SERVER shard2_main;

Writable FDWs

Even if I don’t recommend the following approach, it can be very easy to centralize the writes to the shards through the FT. Although, it requires to code a trigger for managing this. Currently, the minimum transaction level for foreign tables is REPEATABLE READ, but it will probably change in future versions.

A very simplistic approach for an INSERT trigger will be like bellow:

CREATE OR REPLACE FUNCTION f_main_part() RETURNS TRIGGER AS
$FMAINPART$
DECLARE
            partition_name text;
BEGIN
            partition_name := 'main_shard' || NEW.shardKey;
            EXECUTE  'INSERT INTO ' ||  quote_ident(partition_name) ||  ' SELECT ($1).*' USING NEW ;
            RETURN NULL;
END;
$FMAINPART$ LANGUAGE plpgsql;

CREATE TRIGGER t_main BEFORE INSERT
  ON main
  FOR EACH ROW EXECUTE PROCEDURE f_main_part();

Data on shards

As shards contain data, the declaration ends up to be a common table within the necessary suffix for localization:

CREATE TABLE main_shard01(  shardKey char(2),
                            key bigint,
                            avalue text,
                            CHECK(shardKey='01'));
CREATE INDEX ON main_shard01(key);

A simple test could be done by issuing:

proxy=# INSERT INTO main
        SELECT '0' || round(random()*1+1),i.i,random()::text
        FROM generate_series(1,20000) i(i) ;
INSERT 0 0

You probably are intuiting that the above statement inserts data on both nodes, and the trigger will derive the row accordingly to the corresponding shard.

NOTE: the shard number is generated by random()*1+1 which output rounds between 1 and 2.

Grab them from the hidden columns

Querying data can be nicely transparent, as shown bellow. The tableoid in this particular case can be misleading, as the oid reported are those from the nodes, not the local machine. It is used just to show that they’re effectively different tables:

proxy=# select tableoid,count(*) from main group by tableoid;
 tableoid | count
----------+-------
    33226 |   104
    33222 |    96
(2 rows)

For example, retrieving a single row is easy as:

proxy=# SELECT avalue FROM main WHERE key = 1500 and shardKey = '01';
      avalue       
-------------------
 0.971926014870405
(1 row)

Behind the scenes, the pushed query to the remote servers contains the corresponding filter ((key = 1500)) and locally, the constraint exclusion allows to avoid further scans into the other child FT.

proxy=# explain (VERBOSE true)SELECT avalue
                               FROM main WHERE key = 1500
                                                and shardKey = '01';
                                 QUERY PLAN                                                    
--------------------------------------------------------------------------------
 Append  (cost=0.00..131.95 rows=2 width=32)
   ->  Seq Scan on public.main  (cost=0.00..0.00 rows=1 width=32)
         Output: main.avalue
         Filter: ((main.key = 1500) AND (main.shardkey = '01'::bpchar))
   ->  Foreign Scan on public.main_shard01  (cost=100.00..131.95 rows=1 width=32)
         Output: main_shard01.avalue
         Remote SQL: SELECT avalue FROM public.main_shard01 WHERE ((key = 1500))
             AND ((shardkey = '01'::bpchar))
(7 rows)

Even if we don’t want to provide the shardKey, the key filter will be pushed across all the shard nodes. If your keys aren’t unique across shards, you’ll get a multi-row result set.

proxy=# explain (VERBOSE true)SELECT avalue FROM main WHERE key = 1500;
                                    QUERY PLAN                                    
--------------------------------------------------------------------------------
 Append  (cost=0.00..256.83 rows=15 width=32)
   ->  Seq Scan on public.main  (cost=0.00..0.00 rows=1 width=32)
         Output: main.avalue
         Filter: (main.key = 1500)
   ->  Foreign Scan on public.main_shard01  (cost=100.00..128.41 rows=7 width=32)
         Output: main_shard01.avalue
         Remote SQL: SELECT avalue FROM public.main_shard01 WHERE ((key = 1500))
   ->  Foreign Scan on public.main_shard02  (cost=100.00..128.41 rows=7 width=32)
         Output: main_shard02.avalue
         Remote SQL: SELECT avalue FROM public.main_shard02 WHERE ((key = 1500))
(10 rows)

Considerations

Foreign Data Wrappers for Postgres are such a great extension, but it comes at a price with a visible overhead in high intensive transactional workloads.

Hope you liked the article!

Emanuel Calvo

Emanuel Calvo

Open Source Data Specialist

comments powered by Disqus
rss facebook twitter github youtube mail spotify instagram linkedin google google-plus pinterest medium vimeo stackoverflow reddit quora