From c3b238a95acbcd3b49d9b3aeed760405a7f07a39 Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Tue, 23 Dec 2025 12:23:39 +0000 Subject: [PATCH 1/2] PG18: Implement MIN/MAX aggregate OID resolution for ANYARRAY and RECORD types in distributed tables --- .../planner/multi_logical_optimizer.c | 74 +++++++++++++++++-- src/test/regress/sql/pg18.sql | 66 +++++++++++++++++ 2 files changed, 132 insertions(+), 8 deletions(-) diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 3c5b44a6120..1e4eb0b75a8 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -2193,11 +2193,10 @@ MasterAggregateExpression(Aggref *originalAggregate, newMasterAggregate->aggfilter = NULL; /* - * If return type aggregate is anyelement, its actual return type is - * determined on the type of its argument. So we replace it with the - * argument type in that case. + * Polymorphic aggregates determine their actual return type based on + * their argument type, so replace it with the worker return type. */ - if (masterReturnType == ANYELEMENTOID) + if (IsPolymorphicType(masterReturnType)) { newMasterAggregate->aggtype = workerReturnType; @@ -3561,10 +3560,62 @@ AggregateEnabledCustom(Aggref *aggregateExpression) * and returns the corresponding aggregate function oid for the given function * name and input type. */ +typedef enum AggregateArgMatchLevel +{ + AGG_MATCH_NONE = 0, + AGG_MATCH_RECORD = 1, + AGG_MATCH_GENERAL_POLY = 2, + AGG_MATCH_ARRAY_POLY = 3, + AGG_MATCH_EXACT = 4 +} AggregateArgMatchLevel; + +static AggregateArgMatchLevel +AggregateArgumentMatchLevel(Oid declaredArgType, Oid inputType) +{ + bool inputIsArray = type_is_array(inputType) || + inputType == ANYARRAYOID || + inputType == ANYCOMPATIBLEARRAYOID; + bool inputIsEnum = type_is_enum(inputType) || + inputType == ANYENUMOID; + + if (declaredArgType == inputType) + { + return AGG_MATCH_EXACT; + } + + if ((declaredArgType == ANYARRAYOID || + declaredArgType == ANYCOMPATIBLEARRAYOID) && + inputIsArray) + { + return AGG_MATCH_ARRAY_POLY; + } + + if (declaredArgType == ANYELEMENTOID || + declaredArgType == ANYCOMPATIBLEOID) + { + return AGG_MATCH_GENERAL_POLY; + } + + if (declaredArgType == ANYENUMOID && + inputIsEnum) + { + return AGG_MATCH_GENERAL_POLY; + } + + if (declaredArgType == RECORDOID && + type_is_rowtype(inputType)) + { + return AGG_MATCH_RECORD; + } + + return AGG_MATCH_NONE; +} + static Oid AggregateFunctionOid(const char *functionName, Oid inputType) { Oid functionOid = InvalidOid; + AggregateArgMatchLevel bestMatch = AGG_MATCH_NONE; ScanKeyData scanKey[1]; int scanKeyCount = 1; @@ -3586,12 +3637,19 @@ AggregateFunctionOid(const char *functionName, Oid inputType) if (argumentCount == 1) { - /* check if input type and found value type match */ - if (procForm->proargtypes.values[0] == inputType || - procForm->proargtypes.values[0] == ANYELEMENTOID) + Oid declaredArgType = procForm->proargtypes.values[0]; + AggregateArgMatchLevel matchLevel = + AggregateArgumentMatchLevel(declaredArgType, inputType); + + if (matchLevel > bestMatch) { + bestMatch = matchLevel; functionOid = procForm->oid; - break; + + if (bestMatch == AGG_MATCH_EXACT) + { + break; + } } } Assert(argumentCount <= 1); diff --git a/src/test/regress/sql/pg18.sql b/src/test/regress/sql/pg18.sql index 01b81bce249..467e6c83f13 100644 --- a/src/test/regress/sql/pg18.sql +++ b/src/test/regress/sql/pg18.sql @@ -1891,6 +1891,72 @@ FROM wal_explain_plan; DROP TABLE wal_explain_plan; SET citus.explain_all_tasks TO default; +-- ============================================================ +-- PG18: MIN/MAX aggregate OID resolution for ANYARRAY and RECORD +-- - ANYARRAY: min/max over int[] should work on distributed tables +-- - RECORD : PG18 min/max over composites (record) should work; +-- validate via cast + field extraction to avoid record I/O issues +-- ============================================================ + +CREATE SCHEMA pg18_minmax; +SET search_path TO pg18_minmax; + +-- ------------------------------------------------------------ +-- Case A: ANYARRAY (int[]) +-- ------------------------------------------------------------ +CREATE TABLE sales_data ( + product_id int, + product text, + monthly_sales int[] +); + +SELECT create_distributed_table('sales_data', 'product_id'); + +INSERT INTO sales_data VALUES + (1, 'Laptop', ARRAY[45, 52, 38]), + (2, 'Mouse', ARRAY[67, 71, 58]), + (3, 'Keyboard', ARRAY[23, 28, 15]); + +SELECT + MIN(monthly_sales) AS min_sales_pattern, + MAX(monthly_sales) AS max_sales_pattern +FROM sales_data; + +-- ------------------------------------------------------------ +-- Case B: RECORD (PG18 composite min/max) +-- ------------------------------------------------------------ +CREATE TYPE product_rating AS ( + average_score DECIMAL(3,2), + review_count INTEGER +); + +CREATE TABLE product_ratings ( + id int, + rating product_rating +); + +SELECT create_distributed_table('product_ratings', 'id'); + +INSERT INTO product_ratings VALUES + (1, ROW(4.5, 120)::product_rating), + (2, ROW(4.2, 89)::product_rating), + (3, ROW(4.8, 156)::product_rating); + +SET citus.explain_all_tasks TO on; +EXPLAIN (COSTS FALSE) +SELECT min(rating), max(rating) FROM product_ratings; + +SELECT + ((MIN(rating))::product_rating).average_score AS lowest_avg, + ((MIN(rating))::product_rating).review_count AS lowest_count, + ((MAX(rating))::product_rating).average_score AS highest_avg, + ((MAX(rating))::product_rating).review_count AS highest_count +FROM product_ratings; + +DROP SCHEMA pg18_minmax CASCADE; +-- END: PG18: MIN/MAX aggregate OID resolution for ANYARRAY and RECORD + + -- cleanup with minimum verbosity SET client_min_messages TO ERROR; RESET search_path; From 3806b4b9ff23933674771cd204506ac6ab5f93cc Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Tue, 23 Dec 2025 14:13:17 +0000 Subject: [PATCH 2/2] PG18: Add tests for MIN/MAX aggregate OID resolution on ANYARRAY and RECORD types --- src/test/regress/expected/pg18.out | 73 ++++++++++++++++++++++++++++++ src/test/regress/sql/pg18.sql | 4 -- 2 files changed, 73 insertions(+), 4 deletions(-) diff --git a/src/test/regress/expected/pg18.out b/src/test/regress/expected/pg18.out index 94c4b198c66..8dcf23051cc 100644 --- a/src/test/regress/expected/pg18.out +++ b/src/test/regress/expected/pg18.out @@ -3010,6 +3010,79 @@ FROM wal_explain_plan; DROP TABLE wal_explain_plan; SET citus.explain_all_tasks TO default; +-- ============================================================ +-- PG18: MIN/MAX aggregate OID resolution for ANYARRAY and RECORD +-- - ANYARRAY: min/max over int[] should work on distributed tables +-- - RECORD : PG18 min/max over composites (record) should work; +-- validate via cast + field extraction to avoid record I/O issues +-- ============================================================ +CREATE SCHEMA pg18_minmax; +SET search_path TO pg18_minmax; +-- ------------------------------------------------------------ +-- Case A: ANYARRAY (int[]) +-- ------------------------------------------------------------ +CREATE TABLE sales_data ( + product_id int, + product text, + monthly_sales int[] +); +SELECT create_distributed_table('sales_data', 'product_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO sales_data VALUES + (1, 'Laptop', ARRAY[45, 52, 38]), + (2, 'Mouse', ARRAY[67, 71, 58]), + (3, 'Keyboard', ARRAY[23, 28, 15]); +SELECT + MIN(monthly_sales) AS min_sales_pattern, + MAX(monthly_sales) AS max_sales_pattern +FROM sales_data; + min_sales_pattern | max_sales_pattern +--------------------------------------------------------------------- + {23,28,15} | {67,71,58} +(1 row) + +-- ------------------------------------------------------------ +-- Case B: RECORD (PG18 composite min/max) +-- ------------------------------------------------------------ +CREATE TYPE product_rating AS ( + average_score DECIMAL(3,2), + review_count INTEGER +); +CREATE TABLE product_ratings ( + id int, + rating product_rating +); +SELECT create_distributed_table('product_ratings', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO product_ratings VALUES + (1, ROW(4.5, 120)::product_rating), + (2, ROW(4.2, 89)::product_rating), + (3, ROW(4.8, 156)::product_rating); + +SELECT + ((MIN(rating))::product_rating).average_score AS lowest_avg, + ((MIN(rating))::product_rating).review_count AS lowest_count, + ((MAX(rating))::product_rating).average_score AS highest_avg, + ((MAX(rating))::product_rating).review_count AS highest_count +FROM product_ratings; + lowest_avg | lowest_count | highest_avg | highest_count +--------------------------------------------------------------------- + 4.20 | 89 | 4.80 | 156 +(1 row) +DROP SCHEMA pg18_minmax CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table sales_data +drop cascades to type product_rating +drop cascades to table product_ratings +-- END: PG18: MIN/MAX aggregate OID resolution for ANYARRAY and RECORD -- cleanup with minimum verbosity SET client_min_messages TO ERROR; RESET search_path; diff --git a/src/test/regress/sql/pg18.sql b/src/test/regress/sql/pg18.sql index 467e6c83f13..5e6a89c6352 100644 --- a/src/test/regress/sql/pg18.sql +++ b/src/test/regress/sql/pg18.sql @@ -1942,10 +1942,6 @@ INSERT INTO product_ratings VALUES (2, ROW(4.2, 89)::product_rating), (3, ROW(4.8, 156)::product_rating); -SET citus.explain_all_tasks TO on; -EXPLAIN (COSTS FALSE) -SELECT min(rating), max(rating) FROM product_ratings; - SELECT ((MIN(rating))::product_rating).average_score AS lowest_avg, ((MIN(rating))::product_rating).review_count AS lowest_count,