How to shard tables for efficient join

Hello, I’m new to YugabyteDB. We have 5 large tables, and we often need to join them together. They share the same sharding key, since their size is large, colocation doesn’t look like will fit.

Our join query takes a long time to finish, I guess it might because the tablets with the same sharding key but from different tables are stored on different servers.

Is my understanding correct? If so, can someone help me to understand how to define and shard these tables so that all 5 tables in the same shard can stay on the same server?

Thank you in advance for your help.

Hi Jane,
Joining with tablets from different servers should not matter if the right execution plan is used because join methods are optimized to get rows with minimal network calls.
To get the right plan with large tables and many joins, you should enable the cost based optimizer:

  • ANALYZE the tables (it says it is beta because not all optimizations are there but it works)
  • and set yb_enable_base_scans_cost_model and yb_enable_optimizer_statistics to on

Then please provide the execution plan, ideally when running the query to get all execution statistics:

explain (analyze, buffers, dist)
select ...

Ideally, the join methods that have good performance even when the tables are distributed are:

  • YB Batched Nested Loop when only a subset of the inner table rows are necessary, as the nested loop can push down the join condition
  • Hash Join when all rows from the inner table are necessary for the result, like small lookup tables. For larger table to be hashed efficiently, increasing work_mem may help

Please share the execution plan. It’s also good to be sure that there are not a lot of rows that are read from the storage and removed later because. This can happen when there’s no good index to find the suitable ranges of rows, without additional filter, and get them ordered, without additional sort operation

Thank you FranckPachot. We are running this query in our test environment, and it takes 19 seconds to return. When we try to generate the execution plan without setting yb_enable_base_scans_cost_model and yb_enable_optimizer_statistics to on, it took about 5 minutes.

After we update the settings as you’ve suggested, the execution plan generation timed out. Not sure if this is expected?

So the execution plan we got so far is as follows:

Nested Loop Anti Join  (cost=107.50..141.65 rows=1 width=131) (actual time=75.318..269503.497 rows=2790 loops=1)
  ->  Nested Loop Anti Join  (cost=107.50..141.53 rows=1 width=131) (actual time=75.041..267744.118 rows=2790 loops=1)
        ->  Nested Loop  (cost=107.50..141.41 rows=1 width=163) (actual time=61.947..232899.305 rows=68220 loops=1)
              ->  Nested Loop  (cost=107.50..141.30 rows=1 width=195) (actual time=61.850..196895.623 rows=68220 loops=1)
                    Join Filter: (((c.lt <> 12) AND ((c.tci = 0) OR (c.tci > c.aci)) AND (sg.tsgi > COALESCE(sg.asgi, '0'::bigint))) OR ((c.lt = 12) AND ((COALESCE(sa.aab, '0'::numeric) + COALESCE(sa.cl, '0'::numeric)) > COALESCE(sa.ra, '0'::numeric)) AND (sg.tsgb > sg.asgs)))
                    ->  Nested Loop  (cost=107.50..141.15 rows=1 width=355) (actual time=61.736..160730.994 rows=68220 loops=1)
                          ->  Nested Loop  (cost=107.50..140.70 rows=1 width=290) (actual time=61.569..120643.458 rows=68850 loops=1)
                                ->  Nested Loop  (cost=107.50..140.00 rows=5 width=272) (actual time=61.344..55249.821 rows=95400 loops=1)
                                      ->  HashAggregate  (cost=107.50..109.50 rows=200 width=64) (actual time=61.143..251.773 rows=101070 loops=1)
                                            Group Key: tablec.sgid, tablec.shard_id
                                            ->  Seq Scan on tablec  (cost=0.00..102.50 rows=1000 width=64) (actual time=3.460..29.305 rows=132570 loops=1)
                                                  Remote Filter: ((cstatus)::text = 'active'::text)
                                                  Filter: ((flagd IS FALSE) AND (flagp IS TRUE))
                                                  Storage Table Read Requests: 35
                                                  Storage Table Read Execution Time: 11.345 ms
                                                  Storage Table Rows Scanned: 140220
                                      ->  Index Scan using tablesg_pk on tablesg sg  (cost=0.00..0.14 rows=1 width=240) (actual time=0.524..0.524 rows=1 loops=101070)
                                            Index Cond: (((shard_id)::text = (tablec.shard_id)::text) AND ((sgid)::text = (tablec.sgid)::text))
                                            Filter: ((flagd IS FALSE) AND (flagp IS TRUE) AND ((tsgi > COALESCE(asgi, '0'::bigint)) OR (tsgb > asgs)))
                                            Rows Removed by Filter: 0
                                            Storage Table Read Requests: 1
                                            Storage Table Read Execution Time: 0.482 ms
                                            Storage Table Rows Scanned: 1
                                ->  Index Scan using tablep_pk on tablep c  (cost=0.00..0.14 rows=1 width=82) (actual time=0.615..0.615 rows=1 loops=95400)
                                      Index Cond: (((shard_id)::text = (sg.shard_id)::text) AND ((pid)::text = (sg.pid)::text))
                                      Remote Filter: (((pstatus)::text = 'active'::text) AND (((lt <> 12) AND ((tci = 0) OR (tci > aci))) OR (lt = 12)))
                                      Filter: ((flagd IS FALSE) AND (flagp IS TRUE))
                                      Storage Table Read Requests: 1
                                      Storage Table Read Execution Time: 0.572 ms
                                      Storage Table Rows Scanned: 1
                          ->  Index Scan using tableg_pk on tableg g  (cost=0.00..0.45 rows=1 width=97) (actual time=0.547..0.547 rows=1 loops=68850)
                                Index Cond: (((shard_id)::text = (sg.shard_id)::text) AND ((grp_id)::text = (sg.grp_id)::text))
                                Remote Filter: ((grp_status)::text = 'active'::text)
                                Filter: ((flagd IS FALSE) AND (flagp IS TRUE) AND ((flagf IS TRUE) OR (alternatives: SubPlan 1 or hashed SubPlan 2)))
                                Rows Removed by Filter: 0
                                Storage Table Read Requests: 1
                                Storage Table Read Execution Time: 0.503 ms
                                Storage Table Rows Scanned: 1
                                SubPlan 1
                                  ->  Index Scan using tableti_pk on tableti  (cost=0.00..16.50 rows=100 width=0) (actual time=0.574..0.574 rows=0 loops=270)
                                        Index Cond: (((shard_id)::text = (g.shard_id)::text) AND ((grp_id)::text = (g.grp_id)::text))
                                        Remote Filter: ((ntstart_time <= '2024-09-05 07:59:00.004256+09'::timestamp with time zone) AND ('2024-09-05 07:59:00.004256+09'::timestamp with time zone < ntend_time))
                                        Filter: ((flagd IS FALSE) AND (flagp IS TRUE))
                                        Storage Table Read Requests: 2
                                        Storage Table Read Execution Time: 1.052 ms
                                        Storage Table Rows Scanned: 1
                                SubPlan 2
                                  ->  Seq Scan on tableti tableti_1  (cost=0.00..105.00 rows=1000 width=64) (never executed)
                                        Remote Filter: ((ntstart_time <= '2024-09-05 07:59:00.004256+09'::timestamp with time zone) AND ('2024-09-05 07:59:00.004256+09'::timestamp with time zone < ntend_time))
                                        Filter: ((flagd IS FALSE) AND (flagp IS TRUE))
                    ->  Index Scan using tablesa_pk on tablesa sa  (cost=0.00..0.11 rows=1 width=128) (actual time=0.511..0.511 rows=1 loops=68220)
                          Index Cond: ((shard_id)::text = (sg.shard_id)::text)
                          Filter: ((flagd IS FALSE) AND (flagp IS TRUE))
                          Storage Table Read Requests: 1
                          Storage Table Read Execution Time: 0.473 ms
                          Storage Table Rows Scanned: 1
              ->  Index Scan using tablest_pk on tablest vst  (cost=0.00..0.11 rows=1 width=32) (actual time=0.512..0.512 rows=1 loops=68220)
                    Index Cond: ((stid)::text = (sg.stid)::text)
                    Filter: ((flagd IS FALSE) AND (flagp IS TRUE))
                    Storage Table Read Requests: 1
                    Storage Table Read Execution Time: 0.480 ms
                    Storage Table Rows Scanned: 1
        ->  Index Scan using tablecl_pk on tablecl vc  (cost=0.00..0.11 rows=1 width=64) (actual time=0.495..0.495 rows=1 loops=68220)
              Index Cond: (((sg.shard_id)::text = (shard_id)::text) AND ((sgid)::text = (sg.sgid)::text))
              Storage Table Read Requests: 1
              Storage Table Read Execution Time: 0.464 ms
              Storage Table Rows Scanned: 1
  ->  Index Scan using tablesgh_pk on tablesgh  (cost=0.00..0.13 rows=1 width=64) (actual time=0.578..0.578 rows=0 loops=2790)
        Index Cond: (((shard_id)::text = (g.shard_id)::text) AND ((sgid)::text = (sg.sgid)::text) AND (stat_start_time <= '2024-09-05 07:59:00.004256+09'::timestamp with time zone))
        Remote Filter: (('2024-09-05 07:59:00.004256+09'::timestamp with time zone < stat_end_time) AND (asgi >= tsgi))
        Storage Table Read Requests: 1
        Storage Table Read Execution Time: 0.537 ms
Planning Time: 15.051 ms
Execution Time: 269547.329 ms
Storage Read Requests: 473075
Storage Read Execution Time: 236457.762 ms
Storage Rows Scanned: 581130
Storage Write Requests: 0
Catalog Read Requests: 0
Catalog Write Requests: 0
Storage Flush Requests: 0
Storage Execution Time: 236457.762 ms
Peak Memory Usage: 708089 kB

Thank you.

Hi, sorry for the late response. It looks like the Nested Loop Join is not batchable, so it does 101070 loops, which is bad. Which version of YugabyteDB is it?
Can you run the same explain (analyze, buffers, dist) when disabling nested loops (set enable_nestloop to off)? I guess it will use Hash Join. At least it will help to see the cardinalities of the tables. If you can share the DDL to create the tables, I can look at it to be sure we are not missing something.

Thank you, this has significantly improved the speed, with explain, the response came back in 1 second, and the execution plan is showing hash join now. So should we keep enable_nestloop = off permanently?

Okay, great. At least we know that there’s a possible good plan, but I wonder why Batched Nested Loop was not used. Which version of YugabyteDB are you running (select version())?
Ideally, nested loops are batched and would not need to disable it.
With older versions, you needed to set bnl_batch_size to 1024 to enable it (and then set back enable_nestloop to true).
If you show the plan with hash join, I can check the table cardinalities and predicates. If you share the DDL I can check that batched nested loops are possible

We are using version 2.20

Here’s the plan with hash join:

Hash Anti Join  (cost=726.88..830.66 rows=1 width=131) (actual time=453.432..492.201 rows=10980 loops=1)
  Hash Cond: (((sg.shard_id)::text = (vc.shard_id)::text) AND ((sg.sgid)::text = (vc.sgid)::text))
  Buffers: temp read=19405 written=19405
  ->  Hash Join  (cost=611.88..715.64 rows=1 width=163) (actual time=422.667..441.957 rows=101880 loops=1)
        Hash Cond: ((vst.stid)::text = (sg.stid)::text)
        Buffers: temp read=18009 written=18009
        ->  Seq Scan on tablest vst  (cost=0.00..100.00 rows=1000 width=32) (actual time=0.318..0.524 rows=14 loops=1)
              Filter: ((flagd IS FALSE) AND (flagp IS TRUE))
              Storage Table Read Requests: 4
              Storage Table Read Execution Time: 0.448 ms
              Storage Table Rows Scanned: 14
        ->  Hash  (cost=611.87..611.87 rows=1 width=195) (actual time=422.323..422.323 rows=101880 loops=1)
              Buckets: 32768 (originally 1024)  Batches: 32 (originally 1)  Memory Usage: 3841kB
              Buffers: temp read=15150 written=17082
              ->  Hash Join  (cost=508.11..611.87 rows=1 width=195) (actual time=379.788..404.767 rows=101880 loops=1)
                    Hash Cond: ((sa.shard_id)::text = (sg.shard_id)::text)
                    Buffers: temp read=15150 written=15150
                    ->  Seq Scan on tablesa sa  (cost=0.00..100.00 rows=1000 width=32) (actual time=0.538..3.268 rows=15210 loops=1)
                          Filter: ((flagd IS FALSE) AND (flagp IS TRUE))
                          Storage Table Read Requests: 16
                          Storage Table Read Execution Time: 1.970 ms
                          Storage Table Rows Scanned: 15210
                    ->  Hash  (cost=508.10..508.10 rows=1 width=259) (actual time=379.188..379.188 rows=101880 loops=1)
                          Buckets: 16384 (originally 1024)  Batches: 8 (originally 1)  Memory Usage: 3969kB
                          Buffers: temp read=11542 written=14207
                          ->  Hash Join  (cost=398.09..508.10 rows=1 width=259) (actual time=318.096..359.595 rows=101880 loops=1)
                                Hash Cond: (((g.shard_id)::text = (sg.shard_id)::text) AND ((g.grp_id)::text = (sg.grp_id)::text))
                                Buffers: temp read=11542 written=11542
                                ->  Seq Scan on tableg g  (cost=0.00..102.50 rows=1000 width=97) (actual time=0.822..9.929 rows=69570 loops=1)
                                      Remote Filter: ((grp_status)::text = 'active'::text)
                                      Filter: ((flagd IS FALSE) AND (flagp IS TRUE))
                                      Storage Table Read Requests: 18
                                      Storage Table Read Execution Time: 1.725 ms
                                      Storage Table Rows Scanned: 69570
                                ->  Hash  (cost=398.07..398.07 rows=1 width=194) (actual time=317.188..317.188 rows=102240 loops=1)
                                      Buckets: 32768 (originally 1024)  Batches: 8 (originally 1)  Memory Usage: 3841kB
                                      Buffers: temp read=7941 written=9921
                                      ->  Hash Join  (cost=285.56..398.07 rows=1 width=194) (actual time=265.672..300.243 rows=102240 loops=1)
                                            Hash Cond: (((c.pid)::text = (sg.pid)::text) AND ((c.shard_id)::text = (sg.shard_id)::text))
                                            Buffers: temp read=7941 written=7941
                                            ->  Seq Scan on tablep c  (cost=0.00..102.50 rows=1000 width=66) (actual time=2.879..8.124 rows=44190 loops=1)
                                                  Remote Filter: ((pstatus)::text = 'active'::text)
                                                  Filter: ((flagd IS FALSE) AND (flagp IS TRUE))
                                                  Storage Table Read Requests: 12
                                                  Storage Table Read Execution Time: 3.455 ms
                                                  Storage Table Rows Scanned: 46080
                                            ->  Hash  (cost=285.19..285.19 rows=25 width=192) (actual time=262.675..262.675 rows=137700 loops=1)
                                                  Buckets: 32768 (originally 1024)  Batches: 8 (originally 1)  Memory Usage: 3841kB
                                                  Buffers: temp read=4061 written=6749
                                                  ->  Hash Join  (cost=115.00..285.19 rows=25 width=192) (actual time=89.609..235.752 rows=137700 loops=1)
                                                        Hash Cond: (((cv.shard_id)::text = (sg.shard_id)::text) AND ((cv.sgid)::text = (sg.sgid)::text))
                                                        Buffers: temp read=4061 written=4061
                                                        ->  Seq Scan on tablec cv  (cost=0.00..100.00 rows=1000 width=64) (actual time=1.978..80.364 rows=140220 loops=1)
                                                              Storage Table Read Requests: 140
                                                              Storage Table Read Execution Time: 53.664 ms
                                                              Storage Table Rows Scanned: 140220
                                                        ->  Hash  (cost=100.00..100.00 rows=1000 width=160) (actual time=87.490..87.490 rows=128160 loops=1)
                                                              Buckets: 32768 (originally 1024)  Batches: 8 (originally 1)  Memory Usage: 3841kB
                                                              Buffers: temp written=2098
                                                              ->  Seq Scan on tablesg sg  (cost=0.00..100.00 rows=1000 width=160) (actual time=1.390..54.860 rows=128160 loops=1)
                                                                    Filter: ((flagd IS FALSE) AND (flagp IS TRUE))
                                                                    Storage Table Read Requests: 128
                                                                    Storage Table Read Execution Time: 27.325 ms
                                                                    Storage Table Rows Scanned: 128160
  ->  Hash  (cost=100.00..100.00 rows=1000 width=64) (actual time=30.728..30.728 rows=65430 loops=1)
        Buckets: 65536 (originally 1024)  Batches: 2 (originally 1)  Memory Usage: 3585kB
        Buffers: temp written=311
        ->  Seq Scan on tablecl vc  (cost=0.00..100.00 rows=1000 width=64) (actual time=0.625..22.835 rows=65430 loops=1)
              Storage Table Read Requests: 66
              Storage Table Read Execution Time: 17.840 ms
              Storage Table Rows Scanned: 65430
Planning Time: 10.015 ms
Execution Time: 492.722 ms
Storage Read Requests: 384
Storage Read Execution Time: 106.427 ms
Storage Rows Scanned: 464684
Storage Write Requests: 0
Catalog Read Requests: 0
Catalog Write Requests: 0
Storage Flush Requests: 0
Storage Execution Time: 106.427 ms
Peak Memory Usage: 25975 kB

The DDL are:

CREATE TABLE IF NOT EXISTS public.tablesa (
	shard_id varchar NOT NULL,
	shard_name varchar NULL,
	mid varchar NOT NULL,
	il varchar NULL,
	zo INTERVAL NULL,
	aab numeric NULL,
	cl numeric NULL,
	ccode varchar NULL,
	ra numeric NULL,
	created_time timestamptz NULL DEFAULT now(),
	created_by varchar NULL,
	flagp bool NOT NULL DEFAULT 'false',
	tid varchar NULL,
	last_user_update_time timestamptz NULL DEFAULT now(),
	last_user_update_by varchar NULL,
	flagd bool NOT NULL DEFAULT 'false',
	CONSTRAINT tablesa_pk PRIMARY KEY (shard_id)
);

CREATE TABLE IF NOT EXISTS public.tablep (
	pid varchar NOT NULL,
	shard_id varchar NOT NULL,
	pname varchar NOT NULL,
	ptype varchar NOT NULL,
	psubtype varchar NULL,
	pstatus varchar NOT NULL DEFAULT 'pending',
	duration_type varchar NULL,
	lt int2 NOT NULL DEFAULT 16,
	start_date timestamptz NULL,
	end_date timestamptz NULL,
	aci int8 NOT NULL DEFAULT 0,
	tci int8 NOT NULL DEFAULT 0,
	created_time timestamptz NULL DEFAULT now(),
	created_by varchar NULL,
	flagp bool NOT NULL DEFAULT 'false',
	last_user_update_time timestamptz NULL DEFAULT now(),
	last_user_update_by varchar NULL,
	flagd bool NOT NULL DEFAULT 'false',
	CONSTRAINT tablep_pk PRIMARY KEY (shard_id, pid)
);
CREATE INDEX IF NOT EXISTS idx_tablep_pname ON public.tablep (pname, flagd);

CREATE TABLE IF NOT EXISTS public.tableg (
	grp_id varchar NOT NULL,
	grp_name varchar NOT NULL,
	grp_type varchar NULL DEFAULT 'Standard'::character varying,
	grp_status varchar NOT NULL DEFAULT 'pending',
	pid varchar NOT NULL,
	shard_id varchar NOT NULL,
	flagf bool NOT NULL DEFAULT 'true',
	created_time timestamptz NULL DEFAULT now(),
	created_by varchar NULL,
	flagp bool NOT NULL DEFAULT 'false',
	last_user_update_time timestamptz NULL DEFAULT now(),
	last_user_update_by varchar NULL,
	flagd bool NOT NULL DEFAULT 'false',
	CONSTRAINT tableg_pk PRIMARY KEY (shard_id, grp_id)
);
CREATE INDEX IF NOT EXISTS idx_tableg_grp_name ON public.tableg (grp_name, flagd);

CREATE TABLE IF NOT EXISTS public.tablesg (
	sgid varchar NOT NULL,
	sgname varchar NOT NULL,
	grp_id varchar NOT NULL,
	pid varchar NOT NULL,
	shard_id varchar NOT NULL,
	tsgi int8 NOT NULL DEFAULT 0,
	af_id varchar,
	stid varchar,
	created_time timestamptz NULL DEFAULT now(),
	created_by varchar NULL,
	flagp bool NOT NULL DEFAULT 'false',
	last_user_update_time timestamptz NULL DEFAULT now(),
	last_user_update_by varchar NULL,
	flagd bool NOT NULL DEFAULT 'false',
	tsgb numeric NOT NULL DEFAULT 0,
	asgs numeric NOT NULL DEFAULT 0,
	atype varchar NULL,
	asgi int8 NOT NULL DEFAULT 0,
	CONSTRAINT tablesg_pk PRIMARY KEY (shard_id, sgid)
);
CREATE INDEX IF NOT EXISTS idx_tablesg_grp_f ON public.tablesg (grp_id, af_id, stid, flagd);

CREATE TABLE IF NOT EXISTS public.tablec (
	cid varchar NOT NULL,
	cname varchar NOT NULL,
	sgid varchar NOT NULL,
	grp_id varchar NOT NULL,
	pid varchar NOT NULL,
	shard_id varchar NOT NULL,
	cstatus varchar NOT NULL DEFAULT 'pending',
	cdescription varchar NULL,
	created_time timestamptz NULL DEFAULT now(),
	created_by varchar NULL,
	flagp bool NOT NULL DEFAULT 'false',
	last_user_update_time timestamptz NULL DEFAULT now(),
	last_user_update_by varchar NULL,
	flagd bool NOT NULL DEFAULT 'false',
	ctype varchar NULL,
	CONSTRAINT tablec_pk PRIMARY KEY (shard_id, cid)
);
CREATE INDEX IF NOT EXISTS idx_tablec_cname ON public.tablec (cname, flagd);

CREATE TABLE IF NOT EXISTS public.tableti (
	grp_id varchar NOT NULL,
	pid varchar NOT NULL,
	shard_id varchar NOT NULL,
	day_of_week int2 NOT NULL,
	t_start_time varchar NOT NULL,
	t_end_time varchar NOT NULL,
	ntstart_time timestamptz NULL,
	ntend_time timestamptz NULL,
	created_time timestamptz NULL DEFAULT now(),
	created_by varchar NULL,
	flagp bool NOT NULL DEFAULT 'false',
	last_user_update_time timestamptz NULL DEFAULT now(),
	last_user_update_by varchar NULL,
	flagd bool NOT NULL DEFAULT 'false',
	CONSTRAINT tableti_pk PRIMARY KEY (shard_id, grp_id, day_of_week, t_start_time)
);

CREATE TABLE IF NOT EXISTS public.tablest (
	stid varchar NOT NULL,
	stname varchar NOT NULL,
	stdesc varchar NULL,
	created_time timestamptz NULL DEFAULT now(),
	created_by varchar NULL,
	flagp bool NOT NULL DEFAULT 'false',
	last_user_update_time timestamptz NULL DEFAULT now(),
	last_user_update_by varchar NULL,
	flagd bool NOT NULL DEFAULT 'false',
	CONSTRAINT tablest_pk PRIMARY KEY (stid)
);
CREATE INDEX IF NOT EXISTS idx_tablest_st_name ON public.tablest (stname, flagd);

CREATE TABLE IF NOT EXISTS public.tablesgh (
	sgid varchar NOT NULL,
	st_start_time timestamptz NOT NULL,
	st_end_time timestamptz NOT NULL,
	tsgi int8 NOT NULL DEFAULT 0,
	asgi int4 NOT NULL DEFAULT 0,
	shard_id varchar NOT NULL DEFAULT '',
	CONSTRAINT tablesgh_pk PRIMARY KEY (shard_id, sgid, st_start_time)
);

CREATE TABLE IF NOT EXISTS public.tablecl (
	sgid varchar NOT NULL,
	grp_id varchar NOT NULL,
	pid varchar NOT NULL,
	created_time timestamptz NOT NULL DEFAULT now(),
	shard_id varchar NOT NULL DEFAULT '',
	lt int2 NULL,
	flagf bool NULL,
	CONSTRAINT tablecl_pk PRIMARY KEY (shard_id, sgid)
);

CREATE INDEX IF NOT EXISTS idx_tablep_updtime ON public.tablep (last_user_update_time);
CREATE INDEX IF NOT EXISTS idx_tableg_updtime ON public.tableg (last_user_update_time);
CREATE INDEX IF NOT EXISTS idx_tablesg_updtime ON public.tablesg (last_user_update_time);
CREATE INDEX IF NOT EXISTS idx_tablec_updtime ON public.tablec (last_user_update_time);
CREATE INDEX IF NOT EXISTS idx_tablest_updtime ON public.tablest (last_user_update_time);

Appreciate your help FranckPachot.

Ok, 2.20 is quite old. Latest version would have used a YB Batched Nested Loop Join instead of Nested Loop Join. You can enable it with set yb_bnl_batch_size=1024 on 2.20

Rather than setting for the session you can use hints. You can try one of them:

/*+ Set(enable_nestloop off)*/

or

/*+ Set(yb_bnl_batch_size 1024) */

Looking at the execution plan, the best would be Merge Joins but with range sharding instead of hash sharding (changing PRIMARY KEY (shard_id, by PRIMARY KEY (shard_id ASC,) so that the key is in order and the merge join doesn’t have to do additional sort.

But it would be better to upgrade to 2024.1

For this query, if you are happy with Hash Join’s response time, then adding one of the hints is the best option. If you have other queries with problems, then maybe looking at a more general solution is better (latest version, range sharding, analyze the table, enable the cost based optimizer)

We didn’t choose 2024.1 because it is STS not LTS. Is it as stable as the LTS one? Thank you.

Yes. Please read Releases in YugabyteDB | YugabyteDB Docs and YugabyteDB releases | YugabyteDB Docs

We’re trying 2024.1 now, it is indeed much faster, thank you.

As we are trying to optimize our query, I try to just query on a single table with only a few fields involved, but ran into the following problems. Our DDL is as follows:

CREATE TABLE public.testc (
	c_id varchar NOT NULL,
	c_name varchar NOT NULL,
	s_id varchar NOT NULL,
	g_id varchar NOT NULL,
	p_id varchar NOT NULL,
	sa_id varchar NOT NULL,
	c_status varchar DEFAULT 'pending'::character varying NOT NULL,
	c_description varchar NULL,
	p_flag bool DEFAULT false NOT NULL,
	d_flag bool DEFAULT false NOT NULL,
	CONSTRAINT testc_pk PRIMARY KEY (c_id)
);
CREATE INDEX idx_testc_full ON public.testc USING lsm (sa_id HASH, c_name ASC, c_id ASC, p_flag ASC, d_flag ASC);
CREATE INDEX idx_testc_sa ON public.testc USING lsm (sa_id HASH);

And I’m trying the following query:

explain (analyze, buffers, dist) 
SELECT c_id
FROM testc 
WHERE sa_id in ('LA01', 'MA01', 'MA02')
ORDER BY c_name ASC
LIMIT 200
OFFSET 200

The execution plan is:

Limit  (cost=5.43..5.43 rows=1 width=64) (actual time=1499.243..1499.303 rows=200 loops=1)
  ->  Sort  (cost=5.40..5.43 rows=10 width=64) (actual time=1499.197..1499.232 rows=400 loops=1)
        Sort Key: c_name
        Sort Method: top-N heapsort  Memory: 118kB
        ->  Index Scan using idx_testc_sa on testc  (cost=0.00..5.24 rows=10 width=64) (actual time=17.174..1443.771 rows=200000 loops=1)
              Index Cond: ((sa_id)::text = ANY ('{LA01,MA01,MA02}'::text[]))
              Storage Table Read Requests: 196
              Storage Table Read Execution Time: 1253.300 ms
              Storage Table Rows Scanned: 200000
              Storage Index Read Requests: 196
              Storage Index Read Execution Time: 2.671 ms
              Storage Index Rows Scanned: 200000
Planning Time: 25.480 ms
Execution Time: 1499.607 ms
Storage Read Requests: 392
Storage Read Execution Time: 1255.971 ms
Storage Rows Scanned: 400000
Storage Write Requests: 0
Catalog Read Requests: 155
Catalog Read Execution Time: 125.498 ms
Catalog Write Requests: 0
Storage Flush Requests: 0
Storage Execution Time: 1381.469 ms
Peak Memory Usage: 986 kB

Our questions are:

  1. Why it doesn’t choose the index idx_testc_full, which was built to include all fields used in this query to optimize it
  2. After adding a hint to force it using idx_testc_full, the execution plan becomes following, which is faster, but still cannot avoid the costly table scan of 200000 rows.
Limit  (cost=19.95..19.95 rows=1 width=64) (actual time=523.930..523.943 rows=200 loops=1)
  ->  Sort  (cost=19.70..19.95 rows=100 width=64) (actual time=523.919..523.927 rows=400 loops=1)
        Sort Key: c_name
        Sort Method: top-N heapsort  Memory: 120kB
        ->  Index Scan using idx_testc_full on testc  (cost=0.00..16.38 rows=100 width=64) (actual time=3.213..508.276 rows=200000 loops=1)
              Index Cond: ((sa_id)::text = ANY ('{LA01,MA01,MA02}'::text[]))
              Storage Table Read Requests: 196
              Storage Table Read Execution Time: 448.714 ms
              Storage Table Rows Scanned: 200000
              Storage Index Read Requests: 196
              Storage Index Read Execution Time: 0.652 ms
              Storage Index Rows Scanned: 200000
Planning Time: 1.052 ms
Execution Time: 523.997 ms
Storage Read Requests: 392
Storage Read Execution Time: 449.365 ms
Storage Rows Scanned: 400000
Storage Write Requests: 0
Catalog Read Requests: 2
Catalog Read Execution Time: 0.707 ms
Catalog Write Requests: 0
Storage Flush Requests: 0
Storage Execution Time: 450.072 ms
Peak Memory Usage: 946 kB
  1. Any suggestions on how to optimize this further?

For comparison, we set up the same table and index on a single postgresql server and run the execution plan, it picks up the right index without hint, and is much faster. We thought yugabyteDB is using PostgreSQL as its sql engine, so how can we achieve the same efficiency as the Postgresql? Any help would be much appreciated.

PostgreSQL execution plan for reference:

Limit  (cost=47184.09..47207.43 rows=200 width=71) (actual time=39.914..44.057 rows=200 loops=1)
  Buffers: shared hit=7508 read=9
  ->  Gather Merge  (cost=47160.76..66501.71 rows=165768 width=71) (actual time=39.871..44.041 rows=400 loops=1)
        Workers Planned: 2
        Workers Launched: 2
        Buffers: shared hit=7508 read=9
        ->  Sort  (cost=46160.73..46367.94 rows=82884 width=71) (actual time=23.285..23.291 rows=315 loops=3)
              Sort Key: c_name
              Sort Method: top-N heapsort  Memory: 78kB
              Buffers: shared hit=7508 read=9
              Worker 0:  Sort Method: top-N heapsort  Memory: 78kB
              Worker 1:  Sort Method: top-N heapsort  Memory: 78kB
              ->  Parallel Index Only Scan using idx_testc_full on testc  (cost=0.55..42164.13 rows=82884 width=71) (actual time=0.039..11.647 rows=66667 loops=3)
                    Index Cond: (sa_id = ANY ('{LA01,MA01,MA02}'::text[]))
                    Heap Fetches: 100019
                    Buffers: shared hit=7492 read=9
Planning:
  Buffers: shared hit=40
Planning Time: 0.766 ms
Execution Time: 44.118 ms

Hi Jane,

Because without the cost based optimizer, the cost doesn’t account for the best index.

Try to analyze the table:

ANALYZE public.testc;

and set the following before running the query:

set yb_enable_optimizer_statistics=on;
set yb_enable_base_scans_cost_model=on;

Then I think it will use idx_testc_full

Yes, this plan is not scalable because it has to read all rows (rows=200000) and sort them (Sort Method: top-N heapsort) to get the first 400 rows, then OFFSET 200 and fetch the next 200.

Paginated queries should avoid sorting.

As you ORDER BY c_name ASC you need an index that starts with c_name ASC:

CREATE INDEX idx_testc_name_asc ON public.testc USING lsm (c_name ASC, sa_id, c_id ASC, p_flag ASC, d_flag ASC);

The plan should be:

                                 QUERY PLAN
----------------------------------------------------------------------------
 Limit
   ->  Index Scan using idx_testc_name on testc
         Storage Filter: ((sa_id)::text = ANY ('{LA01,MA01,MA02}'::text[]))

This should work if the 400 rows with ('LA01', 'MA01', 'MA02') are found quickly.

If it still reads too many rows before finding the first 400 rows that verify this condition, a specific partial index will be better:

CREATE INDEX idx_testc_name_asc ON public.testc USING lsm (c_name ASC, c_id ASC, p_flag ASC, d_flag ASC)
WHERE sa_id in ('LA01', 'MA01', 'MA02')
;

The plan should be:

                     QUERY PLAN
-----------------------------------------------------
 Limit
   ->  Index Only Scan using idx_testc_name on testc (rows=400)

Tried analyze table and
set yb_enable_optimizer_statistics=on;
set yb_enable_base_scans_cost_model=on;

It is indeed using the index idx_testc_full now and index only scan. After I create idx_testc_name_asc as you’ve suggested, the execution plan doesn’t use it though.

Since these two settings yb_enable_optimizer_statistics and yb_enable_base_scans_cost_model are so effective, should we keep them on as default? Thank you.

It seems ANALYZE public.testc; is the one that made the difference. Does this mean as we insert more data into this table, we need to do that Analyze periodically to make queries against it efficient? If so, at what frequency should we do it? Thanks.

The most important is the tables’ relative size and the columns’ selectivity. So as long as it’s similar data, no need to reanalyze.
But it is a good idea to reanalyze after a while. In future versions, we will improve the analyze and run it automatically, like in Postgres

After further test, it appears set yb_enable_base_scans_cost_model=on; is another key to ensure efficient execution plan, but this setting seems to turn itself off after a while? Should we try to leave this option on permanently? and How to do that? Thank you.

This setting is like PostgreSQL parameters. SET sets it for your session. you can set it for a database or for a user. You can also add it to the cluster server flags with ysql_pg_conf_csv