From 4814e0a03c36024d904bce7d0d2655e30b2b1dfc Mon Sep 17 00:00:00 2001 From: yigecheng Date: Wed, 6 Nov 2024 10:04:40 +0800 Subject: [PATCH] Add support for corr_s/corr_k --- src/common/backend/catalog/builtin_funcs.ini | 26 ++ src/common/backend/catalog/pg_aggregate.cpp | 4 +- src/common/backend/utils/adt/Makefile | 2 +- src/common/backend/utils/adt/corr_sk.cpp | 402 ++++++++++++++++++ src/common/backend/utils/init/globals.cpp | 2 +- src/include/catalog/pg_aggregate.h | 5 + .../rollback-post_catalog_maindb_93_020.sql | 8 + .../rollback-post_catalog_otherdb_93_020.sql | 8 + .../upgrade-post_catalog_maindb_93_020.sql | 31 ++ .../upgrade-post_catalog_otherdb_93_020.sql | 31 ++ src/include/utils/builtins.h | 5 + src/include/utils/corr_sk.h | 33 ++ src/test/regress/expected/agg.out | 114 +++++ src/test/regress/sql/agg.sql | 45 ++ 14 files changed, 713 insertions(+), 3 deletions(-) create mode 100644 src/common/backend/utils/adt/corr_sk.cpp create mode 100644 src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_93_020.sql create mode 100644 src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_93_020.sql create mode 100644 src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_93_020.sql create mode 100644 src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_93_020.sql create mode 100644 src/include/utils/corr_sk.h diff --git a/src/common/backend/catalog/builtin_funcs.ini b/src/common/backend/catalog/builtin_funcs.ini index 2edbfc0d7..16f5edd33 100644 --- a/src/common/backend/catalog/builtin_funcs.ini +++ b/src/common/backend/catalog/builtin_funcs.ini @@ -7077,6 +7077,32 @@ "median_transfn", 1, AddBuiltinFunc(_0(5559), _1("median_transfn"), _2(2), _3(false), _4(false), _5(median_transfn), _6(2281), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('i'), _19(0), _20(2, 2281, 2276), _21(NULL), _22(NULL), _23(NULL), _24(NULL), _25("median_transfn"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33(NULL), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)), ), + AddFuncGroup( + "corr_s", 2, + AddBuiltinFunc(_0(5561), _1("corr_s"), _2(3), _3(false), _4(false), _5(aggregate_dummy), _6(701), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(true), _15(false), _16(false), _17(false), _18('i'), _19(0), _20(3, 701, 701, 25), _21(NULL), _22(NULL), _23(NULL), _24(NULL), _25("aggregate_dummy"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("Calculates Spearman's rank correlation with text parameter"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)), + AddBuiltinFunc(_0(5566), _1("corr_s"), _2(2), _3(false), _4(false), _5(aggregate_dummy), _6(701), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(true), _15(false), _16(false), _17(false), _18('i'), _19(0), _20(2, 701, 701), _21(NULL), _22(NULL), _23(NULL), _24(NULL), _25("aggregate_dummy"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("Calculates Spearman's rank correlation without text parameter"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) + ), + AddFuncGroup( + "corr_k", 2, + AddBuiltinFunc(_0(5562), _1("corr_k"), _2(3), _3(false), _4(false), _5(aggregate_dummy), _6(701), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(true), _15(false), _16(false), _17(false), _18('i'), _19(0), _20(3, 701, 701, 25), _21(NULL), _22(NULL), _23(NULL), _24(NULL), _25("aggregate_dummy"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("Calculates Kendall's tau correlation with text parameter"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)), + AddBuiltinFunc(_0(5567), _1("corr_k"), _2(2), _3(false), _4(false), _5(aggregate_dummy), _6(701), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(true), _15(false), _16(false), _17(false), _18('i'), _19(0), _20(2, 701, 701), _21(NULL), _22(NULL), _23(NULL), _24(NULL), _25("aggregate_dummy"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("Calculates Kendall's tau correlation without text parameter"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) + ), + AddFuncGroup( + "corr_s_final_fn", 1, + AddBuiltinFunc(_0(5563), _1("corr_s_final_fn"), _2(1), _3(false), _4(false), _5(corr_s_final_fn), _6(701), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('i'), _19(0), _20(1, 2281), _21(NULL), _22(NULL), _23(NULL), _24(NULL), _25("corr_s_final_fn"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("Final function for corr_s to return double precision"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) + ), + AddFuncGroup( + "corr_k_final_fn", 1, + AddBuiltinFunc(_0(5564), _1("corr_k_final_fn"), _2(1), _3(false), _4(false), _5(corr_k_final_fn), _6(701), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('i'), _19(0), _20(1, 2281), _21(NULL), _22(NULL), _23(NULL), _24(NULL), _25("corr_k_final_fn"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("Final function for corr_k to return double precision"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) + ), + AddFuncGroup( + "corr_sk_trans_fn", 1, + AddBuiltinFunc(_0(5565), _1("corr_sk_trans_fn"), _2(4), _3(false), _4(false), _5(corr_sk_trans_fn), _6(2281), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('i'), _19(0), _20(4, 2281, 701, 701, 25), _21(NULL), _22(NULL), _23(NULL), _24(NULL), _25("corr_sk_trans_fn"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("Transition function for corr_s/corr_k with 3rd arg"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) + ), + AddFuncGroup( + "corr_sk_trans_fn_no3", 1, + AddBuiltinFunc(_0(5568), _1("corr_sk_trans_fn_no3"), _2(3), _3(false), _4(false), _5(corr_sk_trans_fn_no3), _6(2281), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('i'), _19(0), _20(3, 2281, 701, 701), _21(NULL), _22(NULL), _23(NULL), _24(NULL), _25("corr_sk_trans_fn_no3"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("Transition function for corr_s/corr_k without 3rd arg"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) + ), AddFuncGroup( "min", 22, AddBuiltinFunc(_0(2051), _1("min"), _2(1), _3(false), _4(false), _5(aggregate_dummy), _6(2277), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(true), _15(false), _16(false), _17(false), _18('i'), _19(0), _20(1, 2277), _21(NULL), _22(NULL), _23(NULL), _24(NULL), _25("aggregate_dummy"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("the average (arithmetic mean) as numeric of all bigint values"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)), diff --git a/src/common/backend/catalog/pg_aggregate.cpp b/src/common/backend/catalog/pg_aggregate.cpp index 3f92b0c6f..2256805eb 100644 --- a/src/common/backend/catalog/pg_aggregate.cpp +++ b/src/common/backend/catalog/pg_aggregate.cpp @@ -68,7 +68,9 @@ static void InternalAggIsSupported(const char *aggName) , "age_collect", "age_percentilecont", - "age_percentiledisc" + "age_percentiledisc", + "corr_s", + "corr_k" }; uint len = lengthof(supportList); diff --git a/src/common/backend/utils/adt/Makefile b/src/common/backend/utils/adt/Makefile index 194b7a967..6ef958546 100644 --- a/src/common/backend/utils/adt/Makefile +++ b/src/common/backend/utils/adt/Makefile @@ -40,7 +40,7 @@ OBJS = acl.o arrayfuncs.o array_selfuncs.o array_typanalyze.o \ tsvector.o tsvector_op.o tsvector_parser.o \ txid.o uuid.o windowfuncs.o xml.o extended_statistics.o clientlogic_bytea.o clientlogicsettings.o \ median_aggs.o expr_distinct.o nlssort.o memory_func.o first_last_agg.o encrypt_decrypt.o expandeddatum.o \ - subtype.o bitvec.o f2s.o halfutils.o halfvec.o sparsevec.o vector.o + subtype.o bitvec.o f2s.o halfutils.o halfvec.o sparsevec.o vector.o corr_sk.o VECTOR_OPT = -march=native diff --git a/src/common/backend/utils/adt/corr_sk.cpp b/src/common/backend/utils/adt/corr_sk.cpp new file mode 100644 index 000000000..8d33281e8 --- /dev/null +++ b/src/common/backend/utils/adt/corr_sk.cpp @@ -0,0 +1,402 @@ +#include +#include +#include "postgres.h" +#include "fmgr.h" +#include "utils/array.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/elog.h" +#include "catalog/pg_type.h" +#include "utils/corr_sk.h" + +#undef gettext +#undef dgettext +#undef ngettext +#undef dngettext +#include + +constexpr int INIT_CORR_ARRAY_LENGTH = 64; +constexpr int MAX_CORR_ARRAY_LENGTH = 524288; + +enum class ModeType { + COEFFICIENT, + ONE_SIDED_SIG, + ONE_SIDED_SIG_POS, + ONE_SIDED_SIG_NEG, + TWO_SIDED_SIG, + ILLEGAL +}; + +typedef struct CorrBuildState { + MemoryContext mcontext; /* where all the temp stuff is kept */ + Datum* x_data_array; /* array of accumulated Datums x */ + Datum* y_data_array; /* array of accumulated Datums y */ + uint32 maxlen; /* allocated length of above arrays */ + uint32 count; /* number of valid entries in above arrays */ + Oid dtype; /* data type of the Datums */ + int16 typlen; /* needed info about datatype */ + bool typbyval; + char typalign; + ModeType mode; /* result mode */ +} CorrBuildState; + + +// convert text to cstring, then to ModeType +ModeType parse_mode_type(text* mode_text) +{ + char *mode = text_to_cstring(mode_text); + if (strcmp(mode, "COEFFICIENT") == 0) { + return ModeType::COEFFICIENT; + } else if (strcmp(mode, "ONE_SIDED_SIG") == 0) { + return ModeType::ONE_SIDED_SIG; + } else if (strcmp(mode, "ONE_SIDED_SIG_POS") == 0) { + return ModeType::ONE_SIDED_SIG_POS; + } else if (strcmp(mode, "ONE_SIDED_SIG_NEG") == 0) { + return ModeType::ONE_SIDED_SIG_NEG; + } else if (strcmp(mode, "TWO_SIDED_SIG") == 0) { + return ModeType::TWO_SIDED_SIG; + } else { + return ModeType::ILLEGAL; + } +} + +static CorrBuildState* CreateCorrBuildState(Oid elemType, MemoryContext aggCtx) +{ + MemoryContext corrCtx, oldCtx; + + /* Make a temporary context to hold all the junk */ + corrCtx = AllocSetContextCreate(aggCtx, "AccumCorrSet", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); + oldCtx = MemoryContextSwitchTo(corrCtx); + + CorrBuildState* cstate = (CorrBuildState*)palloc(sizeof(CorrBuildState)); + cstate->mcontext = corrCtx; + cstate->maxlen = INIT_CORR_ARRAY_LENGTH; /* starting size */ + cstate->x_data_array = (Datum*)palloc(cstate->maxlen * sizeof(Datum)); + cstate->y_data_array = (Datum*)palloc(cstate->maxlen * sizeof(Datum)); + cstate->count = 0; + cstate->dtype = elemType; + cstate->mode = ModeType::COEFFICIENT; + get_typlenbyvalalign(elemType, &cstate->typlen, &cstate->typbyval, &cstate->typalign); + + (void)MemoryContextSwitchTo(oldCtx); + + return cstate; +} + +static void CorrPutDatum(CorrBuildState* cstate, Datum xvalue, Datum yvalue) +{ + MemoryContext oldCtx = MemoryContextSwitchTo(cstate->mcontext); + + /* enlarge data_array[] if needed */ + if (cstate->count >= cstate->maxlen) { + if (cstate->maxlen < MAX_CORR_ARRAY_LENGTH) { + cstate->maxlen *= 2; + if (cstate->maxlen > MAX_CORR_ARRAY_LENGTH) { + cstate->maxlen = MAX_CORR_ARRAY_LENGTH; + } + cstate->x_data_array = (Datum*)repalloc(cstate->x_data_array, cstate->maxlen * sizeof(Datum)); + cstate->y_data_array = (Datum*)repalloc(cstate->y_data_array, cstate->maxlen * sizeof(Datum)); + + } else { + ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("correlation array length limit exceeded"))); + } + } + + /* Ensure pass-by-ref stuff is copied into mcontext; and detoast it too if it's varlena. */ + if (!cstate->typbyval) { + if (cstate->typlen == -1) { + xvalue = PointerGetDatum(PG_DETOAST_DATUM_COPY(xvalue)); + yvalue = PointerGetDatum(PG_DETOAST_DATUM_COPY(yvalue)); + } else { + xvalue = datumCopy(xvalue, cstate->typbyval, cstate->typlen); + yvalue = datumCopy(yvalue, cstate->typbyval, cstate->typlen); + } + } + + cstate->x_data_array[cstate->count] = xvalue; + cstate->y_data_array[cstate->count] = yvalue; + cstate->count++; + + (void)MemoryContextSwitchTo(oldCtx); +} + +// General initialization and state handling for transition function +static CorrBuildState* init_corr_sk_trans_fn(PG_FUNCTION_ARGS, bool check_mode) +{ + Oid arg1Typeid = get_fn_expr_argtype(fcinfo->flinfo, 1); + if (arg1Typeid == InvalidOid) { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("could not determine input data type"))); + } + + /* Get the MemoryContext to keep the working state */ + MemoryContext aggCtx; + if (!AggCheckCallContext(fcinfo, &aggCtx)) { + ereport(ERROR, + (errcode(ERRCODE_SQL_ROUTINE_EXCEPTION), errmsg("function called in non-aggregate context"))); + } + + CorrBuildState *cstate = NULL; + if (PG_ARGISNULL(0)) { + /* Create the transition state workspace */ + cstate = CreateCorrBuildState(arg1Typeid, aggCtx); + } else { + cstate = (CorrBuildState*)PG_GETARG_POINTER(0); + } + + /* Skip null values */ + if (PG_ARGISNULL(1) || PG_ARGISNULL(2)) { + return cstate; + } + + /* Check the datatype consistency */ + Assert(cstate->dtype == arg1Typeid); + + float8 xvalue = PG_GETARG_FLOAT8(1); + float8 yvalue = PG_GETARG_FLOAT8(2); + + CorrPutDatum(cstate, xvalue, yvalue); + + if (check_mode) { + if (PG_ARGISNULL(3)) { + ereport(ERROR, (errmsg("illegal argument for function"))); + } + + text *mode_text = PG_GETARG_TEXT_P(3); + // text -> ModeType + ModeType mode = parse_mode_type(mode_text); + cstate->mode = mode; + } + + return cstate; +} + +// Handle no 3rd arg case +Datum corr_sk_trans_fn_no3(PG_FUNCTION_ARGS) +{ + CorrBuildState *cstate = init_corr_sk_trans_fn(fcinfo, false); + PG_RETURN_POINTER(cstate); +} + +// Handle with 3rd arg case +Datum corr_sk_trans_fn(PG_FUNCTION_ARGS) +{ + CorrBuildState *cstate = init_corr_sk_trans_fn(fcinfo, true); + PG_RETURN_POINTER(cstate); +} + + +struct DataWithIndex { + Datum value; + uint32 index; +}; + +/* The comparison function for sorting DataWithIndex */ +int compare(const void *a, const void *b) { + DataWithIndex *dp1 = (DataWithIndex *)a; + DataWithIndex *dp2 = (DataWithIndex *)b; + + if (dp1->value < dp2->value) return -1; + if (dp1->value > dp2->value) return 1; + return 0; +} + +void calculate_ranks(Datum data[], float8 ranks[], uint32 n) +{ + struct DataWithIndex *sortedData = (struct DataWithIndex *)malloc(n * sizeof(struct DataWithIndex)); + + for (uint32 i = 0; i < n; i++) { + sortedData[i].value = data[i]; + sortedData[i].index = i; + } + + qsort(sortedData, n, sizeof(struct DataWithIndex), compare); + + for (uint32 i = 0; i < n; i++) { + ranks[sortedData[i].index] = (float8)(i + 1); + } + + for (uint32 i = 0; i < n;) { + uint32 j = i + 1; + while (j < n && sortedData[j].value == sortedData[i].value) { + j++; + } + if (j > i + 1) { + float8 avg_rank = 0.5 * (ranks[sortedData[i].index] + ranks[sortedData[j - 1].index]); + for (uint32 k = i; k < j; k++) { + ranks[sortedData[k].index] = avg_rank; + } + } + i = j; + } + + free(sortedData); +} + +float8 calculate_t_statistic(float8 correlation, uint32 n) +{ + return correlation * sqrt((n - 2) / (1 - correlation * correlation)); +} + +float8 calculate_z_statistic(float8 tau_b, uint32 n) +{ + float8 denominator = sqrt((float8)(2 * (2 * n + 5)) / (float8)(9 * n * (n - 1))); + return tau_b / denominator; +} + +float8 calculate_standard_deviation(const float8 data[], uint32 size, float8 mean) +{ + float8 sumOfSquares = 0.0; + for (uint32 i = 0; i < size; i++) { + sumOfSquares += (data[i] - mean) * (data[i] - mean); + } + return sqrt(sumOfSquares / (size - 1)); +} + +Datum corr_s_final_fn(PG_FUNCTION_ARGS) +{ + float8 result; + + if (PG_ARGISNULL(0)) { + PG_RETURN_NULL(); /* returns null if no input values */ + } + + CorrBuildState *state = (CorrBuildState *) PG_GETARG_POINTER(0); + + if (state->count == 0) { + PG_RETURN_NULL(); + } + + uint32 n = state->count; + Datum* x_data = state->x_data_array; + Datum* y_data = state->y_data_array; + + float8* x_ranks = (float8*)palloc(n * sizeof(float8)); + float8* y_ranks = (float8*)palloc(n * sizeof(float8)); + float8* x_rank_m = (float8*)palloc(n * sizeof(float8)); + float8* y_rank_m = (float8*)palloc(n * sizeof(float8)); + float8* diff = (float8*)palloc(n * sizeof(float8)); + + calculate_ranks(x_data, x_ranks, n); + calculate_ranks(y_data, y_ranks, n); + + // Spearman's correlation coefficient + float8 mean_rank = ((float8)((1 + n) * n) / 2.0) / n; + float8 sum_diff = 0.0; + + for (uint32 i = 0; i < n; i++) { + x_rank_m[i] = x_ranks[i] - mean_rank; + y_rank_m[i] = y_ranks[i] - mean_rank; + diff[i] = x_rank_m[i] * y_rank_m[i]; + sum_diff += diff[i]; + } + float8 covariance = sum_diff / (float8)(n - 1); + float8 xr_st_dev = calculate_standard_deviation(x_ranks, n, mean_rank); + float8 yr_st_dev = calculate_standard_deviation(y_ranks, n, mean_rank); + float8 spearman_rho_corr = covariance / (xr_st_dev * yr_st_dev); + + // T-statistic + float8 t_stat = calculate_t_statistic(spearman_rho_corr, n); + + // T-distribution + float8 df = n - 2; + boost::math::students_t_distribution t_dist(df); + + float8 one_sided_p_value_pos = 1 - boost::math::cdf(t_dist, t_stat); + float8 one_sided_p_value_neg = 1 - one_sided_p_value_pos; + float8 two_sided_p_value = 2 * one_sided_p_value_pos; + + pfree(x_ranks); + pfree(y_ranks); + pfree(x_rank_m); + pfree(y_rank_m); + pfree(diff); + + ModeType mode = state->mode; + if (mode == ModeType::COEFFICIENT) { + result = spearman_rho_corr; + } else if (mode == ModeType::ONE_SIDED_SIG || mode == ModeType::ONE_SIDED_SIG_POS) { + result = one_sided_p_value_pos; + } else if (mode == ModeType::ONE_SIDED_SIG_NEG) { + result = one_sided_p_value_neg; + } else if (mode == ModeType::TWO_SIDED_SIG) { + result = two_sided_p_value; + } else { // mode == ModeType::ILLEGAL + ereport(ERROR, (errmsg("illegal argument for function"))); + } + + PG_RETURN_DATUM(Float8GetDatum(result)); +} + + +Datum corr_k_final_fn(PG_FUNCTION_ARGS) +{ + float8 result; + + if (PG_ARGISNULL(0)) { + PG_RETURN_NULL(); /* returns null if no input values */ + } + + CorrBuildState *state = (CorrBuildState *) PG_GETARG_POINTER(0); + + if (state->count == 0) { + PG_RETURN_NULL(); + } + + uint32 n = state->count; + Datum* x_data = state->x_data_array; + Datum* y_data = state->y_data_array; + + float8* x_ranks = (float8*)palloc(n * sizeof(float8)); + float8* y_ranks = (float8*)palloc(n * sizeof(float8)); + + calculate_ranks(x_data, x_ranks, n); + calculate_ranks(y_data, y_ranks, n); + + // Kendall's tau-b correlation coefficient + int concordant = 0; + int discordant = 0; + int tiedX = 0; + int tiedY = 0; + + for (uint32 i = 0; i < n - 1; i++) { + for (uint32 j = i + 1; j < n; j++) { + if ((x_ranks[i] > x_ranks[j] && y_ranks[i] > y_ranks[j]) || (x_ranks[i] < x_ranks[j] && y_ranks[i] < y_ranks[j])) { + concordant++; + } else if ((x_ranks[i] > x_ranks[j] && y_ranks[i] < y_ranks[j]) || (x_ranks[i] < x_ranks[j] && y_ranks[i] > y_ranks[j])) { + discordant++; + } + if (x_ranks[i] == x_ranks[j]) tiedX++; + if (y_ranks[i] == y_ranks[j]) tiedY++; + } + } + + uint32 n0 = n * (n - 1) / 2; + float8 tau_b = (float8)(concordant - discordant) / sqrt((n0 - tiedX) * (n0 - tiedY)); + + // z-statistic + float8 z_stat = calculate_z_statistic(tau_b, n); + + // Normal distribution + boost::math::normal_distribution<> normal_dist(0.0, 1.0); + float8 one_sided_p_value_pos = 1.0 - boost::math::cdf(normal_dist, z_stat); + float8 one_sided_p_value_neg = boost::math::cdf(normal_dist, z_stat); + float8 two_sided_p_value = 2 * one_sided_p_value_pos; + + pfree(x_ranks); + pfree(y_ranks); + + ModeType mode = state->mode; + if (mode == ModeType::COEFFICIENT) { + result = tau_b; + } else if (mode == ModeType::ONE_SIDED_SIG || mode == ModeType::ONE_SIDED_SIG_POS) { + result = one_sided_p_value_pos; + } else if (mode == ModeType::ONE_SIDED_SIG_NEG) { + result = one_sided_p_value_neg; + } else if (mode == ModeType::TWO_SIDED_SIG) { + result = two_sided_p_value; + } else { // mode == ModeType::ILLEGAL + ereport(ERROR, (errmsg("illegal argument for function"))); + } + + PG_RETURN_DATUM(Float8GetDatum(result)); +} \ No newline at end of file diff --git a/src/common/backend/utils/init/globals.cpp b/src/common/backend/utils/init/globals.cpp index c45a163b7..47e6b3e4b 100644 --- a/src/common/backend/utils/init/globals.cpp +++ b/src/common/backend/utils/init/globals.cpp @@ -77,7 +77,7 @@ bool will_shutdown = false; * ********************************************/ -const uint32 GRAND_VERSION_NUM = 93019; +const uint32 GRAND_VERSION_NUM = 93020; /******************************************** * 2.VERSION NUM FOR EACH FEATURE diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h index c362b9641..bb12c2c93 100644 --- a/src/include/catalog/pg_aggregate.h +++ b/src/include/catalog/pg_aggregate.h @@ -479,6 +479,11 @@ DATA(insert (4461 ordered_set_transition - mode_final 0 2281 _null_ _null_ o 0)) DATA(insert (5555 median_transfn - median_float8_finalfn 0 2281 _null_ _null_ n 0)); DATA(insert (5556 median_transfn - median_interval_finalfn 0 2281 _null_ _null_ n 0)); +DATA(insert (5561 corr_sk_trans_fn - corr_s_final_fn 0 2281 _null_ _null_ n 0)); +DATA(insert (5562 corr_sk_trans_fn - corr_k_final_fn 0 2281 _null_ _null_ n 0)); +DATA(insert (5566 corr_sk_trans_fn_no3 - corr_s_final_fn 0 2281 _null_ _null_ n 0)); +DATA(insert (5567 corr_sk_trans_fn_no3 - corr_k_final_fn 0 2281 _null_ _null_ n 0)); + /* percentile */ DATA(insert ( 9990 tdigest_merge tdigest_merge_to_one calculate_quantile_of 0 4406 _null_ _null_ n 0)); #define ADDTDIGESTMERGEOID 9990 diff --git a/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_93_020.sql b/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_93_020.sql new file mode 100644 index 000000000..6dc6674c0 --- /dev/null +++ b/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_93_020.sql @@ -0,0 +1,8 @@ +DROP AGGREGATE IF EXISTS pg_catalog.corr_s(float8, float8, text); +DROP AGGREGATE IF EXISTS pg_catalog.corr_s(float8, float8); +DROP AGGREGATE IF EXISTS pg_catalog.corr_k(float8, float8, text); +DROP AGGREGATE IF EXISTS pg_catalog.corr_k(float8, float8); +DROP FUNCTION IF EXISTS pg_catalog.corr_sk_trans_fn(internal, float8, float8, text) CASCADE; +DROP FUNCTION IF EXISTS pg_catalog.corr_sk_trans_fn_no3(internal, float8, float8) CASCADE; +DROP FUNCTION IF EXISTS pg_catalog.corr_s_final_fn(internal) CASCADE; +DROP FUNCTION IF EXISTS pg_catalog.corr_k_final_fn(internal) CASCADE; \ No newline at end of file diff --git a/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_93_020.sql b/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_93_020.sql new file mode 100644 index 000000000..6dc6674c0 --- /dev/null +++ b/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_93_020.sql @@ -0,0 +1,8 @@ +DROP AGGREGATE IF EXISTS pg_catalog.corr_s(float8, float8, text); +DROP AGGREGATE IF EXISTS pg_catalog.corr_s(float8, float8); +DROP AGGREGATE IF EXISTS pg_catalog.corr_k(float8, float8, text); +DROP AGGREGATE IF EXISTS pg_catalog.corr_k(float8, float8); +DROP FUNCTION IF EXISTS pg_catalog.corr_sk_trans_fn(internal, float8, float8, text) CASCADE; +DROP FUNCTION IF EXISTS pg_catalog.corr_sk_trans_fn_no3(internal, float8, float8) CASCADE; +DROP FUNCTION IF EXISTS pg_catalog.corr_s_final_fn(internal) CASCADE; +DROP FUNCTION IF EXISTS pg_catalog.corr_k_final_fn(internal) CASCADE; \ No newline at end of file diff --git a/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_93_020.sql b/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_93_020.sql new file mode 100644 index 000000000..ed5c05ecd --- /dev/null +++ b/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_93_020.sql @@ -0,0 +1,31 @@ +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 5563; +CREATE OR REPLACE FUNCTION pg_catalog.corr_s_final_fn(internal) +RETURNS float8 LANGUAGE INTERNAL IMMUTABLE NOT SHIPPABLE AS 'corr_s_final_fn'; +COMMENT ON FUNCTION pg_catalog.corr_s_final_fn(internal) IS 'Final function for corr_s to return double precision'; + +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 5564; +CREATE OR REPLACE FUNCTION pg_catalog.corr_k_final_fn(internal) +RETURNS float8 LANGUAGE INTERNAL IMMUTABLE NOT SHIPPABLE AS 'corr_k_final_fn'; +COMMENT ON FUNCTION pg_catalog.corr_k_final_fn(internal) IS 'Final function for corr_k to return double precision'; + +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 5565; +CREATE OR REPLACE FUNCTION pg_catalog.corr_sk_trans_fn(internal, float8, float8, text) +RETURNS internal LANGUAGE INTERNAL IMMUTABLE NOT SHIPPABLE AS 'corr_sk_trans_fn'; +COMMENT ON FUNCTION pg_catalog.corr_sk_trans_fn(internal, float8, float8, text) IS 'Transition function for corr_s/corr_k with 3rd arg'; + +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 5568; +CREATE OR REPLACE FUNCTION pg_catalog.corr_sk_trans_fn_no3(internal, float8, float8) +RETURNS internal LANGUAGE INTERNAL IMMUTABLE NOT SHIPPABLE AS 'corr_sk_trans_fn_no3'; +COMMENT ON FUNCTION pg_catalog.corr_sk_trans_fn_no3(internal, float8, float8) IS 'Transition function for corr_s/corr_k without 3rd arg'; + +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 5561; +CREATE AGGREGATE pg_catalog.corr_s(float8, float8, text) (SFUNC=corr_sk_trans_fn, STYPE= internal, finalfunc = corr_s_final_fn); + +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 5566; +CREATE AGGREGATE pg_catalog.corr_s(float8, float8) (SFUNC=corr_sk_trans_fn_no3, STYPE= internal, finalfunc = corr_s_final_fn); + +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 5562; +CREATE AGGREGATE pg_catalog.corr_k(float8, float8, text) (SFUNC=corr_sk_trans_fn, STYPE= internal, finalfunc = corr_k_final_fn); + +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 5567; +CREATE AGGREGATE pg_catalog.corr_k(float8, float8) (SFUNC=corr_sk_trans_fn_no3, STYPE= internal, finalfunc = corr_k_final_fn); \ No newline at end of file diff --git a/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_93_020.sql b/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_93_020.sql new file mode 100644 index 000000000..ed5c05ecd --- /dev/null +++ b/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_93_020.sql @@ -0,0 +1,31 @@ +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 5563; +CREATE OR REPLACE FUNCTION pg_catalog.corr_s_final_fn(internal) +RETURNS float8 LANGUAGE INTERNAL IMMUTABLE NOT SHIPPABLE AS 'corr_s_final_fn'; +COMMENT ON FUNCTION pg_catalog.corr_s_final_fn(internal) IS 'Final function for corr_s to return double precision'; + +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 5564; +CREATE OR REPLACE FUNCTION pg_catalog.corr_k_final_fn(internal) +RETURNS float8 LANGUAGE INTERNAL IMMUTABLE NOT SHIPPABLE AS 'corr_k_final_fn'; +COMMENT ON FUNCTION pg_catalog.corr_k_final_fn(internal) IS 'Final function for corr_k to return double precision'; + +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 5565; +CREATE OR REPLACE FUNCTION pg_catalog.corr_sk_trans_fn(internal, float8, float8, text) +RETURNS internal LANGUAGE INTERNAL IMMUTABLE NOT SHIPPABLE AS 'corr_sk_trans_fn'; +COMMENT ON FUNCTION pg_catalog.corr_sk_trans_fn(internal, float8, float8, text) IS 'Transition function for corr_s/corr_k with 3rd arg'; + +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 5568; +CREATE OR REPLACE FUNCTION pg_catalog.corr_sk_trans_fn_no3(internal, float8, float8) +RETURNS internal LANGUAGE INTERNAL IMMUTABLE NOT SHIPPABLE AS 'corr_sk_trans_fn_no3'; +COMMENT ON FUNCTION pg_catalog.corr_sk_trans_fn_no3(internal, float8, float8) IS 'Transition function for corr_s/corr_k without 3rd arg'; + +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 5561; +CREATE AGGREGATE pg_catalog.corr_s(float8, float8, text) (SFUNC=corr_sk_trans_fn, STYPE= internal, finalfunc = corr_s_final_fn); + +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 5566; +CREATE AGGREGATE pg_catalog.corr_s(float8, float8) (SFUNC=corr_sk_trans_fn_no3, STYPE= internal, finalfunc = corr_s_final_fn); + +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 5562; +CREATE AGGREGATE pg_catalog.corr_k(float8, float8, text) (SFUNC=corr_sk_trans_fn, STYPE= internal, finalfunc = corr_k_final_fn); + +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 5567; +CREATE AGGREGATE pg_catalog.corr_k(float8, float8) (SFUNC=corr_sk_trans_fn_no3, STYPE= internal, finalfunc = corr_k_final_fn); \ No newline at end of file diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index c7574f60f..b2150b30d 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -1152,6 +1152,11 @@ extern Datum timestamptz_list_agg_noarg2_transfn(PG_FUNCTION_ARGS); extern Datum interval_list_agg_transfn(PG_FUNCTION_ARGS); extern Datum interval_list_agg_noarg2_transfn(PG_FUNCTION_ARGS); +extern Datum corr_sk_trans_fn(PG_FUNCTION_ARGS); +extern Datum corr_sk_trans_fn_no3(PG_FUNCTION_ARGS); +extern Datum corr_s_final_fn(PG_FUNCTION_ARGS); +extern Datum corr_k_final_fn(PG_FUNCTION_ARGS); + extern Datum text_concat(PG_FUNCTION_ARGS); extern Datum text_concat_ws(PG_FUNCTION_ARGS); extern Datum text_left(PG_FUNCTION_ARGS); diff --git a/src/include/utils/corr_sk.h b/src/include/utils/corr_sk.h new file mode 100644 index 000000000..9dd2039f2 --- /dev/null +++ b/src/include/utils/corr_sk.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2020 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + * corr_sk.h + * Aggregate for computing the statistical correlation + * + * IDENTIFICATION + * src/include/utils/corr_sk.h + * ------------------------------------------------------------------------- + */ + +#ifndef CORR_SK_H +#define CORR_SK_H + +#include "fmgr.h" + +extern Datum corr_sk_trans_fn(PG_FUNCTION_ARGS); +extern Datum corr_sk_trans_fn_no3(PG_FUNCTION_ARGS); +extern Datum corr_s_final_fn(PG_FUNCTION_ARGS); +extern Datum corr_k_final_fn(PG_FUNCTION_ARGS); + +#endif /* CORR_SK_H */ \ No newline at end of file diff --git a/src/test/regress/expected/agg.out b/src/test/regress/expected/agg.out index 5160b9ef2..dd7a6cd8f 100644 --- a/src/test/regress/expected/agg.out +++ b/src/test/regress/expected/agg.out @@ -189,6 +189,120 @@ select sum(a)+sum(b) , d ,1 from test_agg_false where 0=1 group by d; ----------+---+---------- (0 rows) +CREATE TABLE test_table (column_x double precision , column_y double precision); +INSERT INTO test_table (column_x, column_y) VALUES (55, 38); +INSERT INTO test_table (column_x, column_y) VALUES (46, 29); +INSERT INTO test_table (column_x, column_y) VALUES (41, 24); +INSERT INTO test_table (column_x, column_y) VALUES (48, 33); +INSERT INTO test_table (column_x, column_y) VALUES (51, 39); +INSERT INTO test_table (column_x, column_y) VALUES (49, 32); +SELECT corr_s(column_x, column_y) FROM test_table; + corr_s +------------------ + .885714285714286 +(1 row) + +SELECT corr_s(column_x, column_y, 'COEFFICIENT') FROM test_table; + corr_s +------------------ + .885714285714286 +(1 row) + +SELECT corr_s(column_x, column_y, 'ONE_SIDED_SIG') FROM test_table; + corr_s +-------------------- + .00942274052478131 +(1 row) + +SELECT corr_s(column_x, column_y, 'ONE_SIDED_SIG_POS') FROM test_table; + corr_s +-------------------- + .00942274052478131 +(1 row) + +SELECT corr_s(column_x, column_y, 'ONE_SIDED_SIG_NEG') FROM test_table; + corr_s +------------------ + .990577259475219 +(1 row) + +SELECT corr_s(column_x, column_y, 'TWO_SIDED_SIG') FROM test_table; + corr_s +------------------- + .0188454810495626 +(1 row) + +SELECT corr_k(column_x, column_y) FROM test_table; + corr_k +------------------ + .733333333333333 +(1 row) + +SELECT corr_k(column_x, column_y, 'COEFFICIENT') FROM test_table; + corr_k +------------------ + .733333333333333 +(1 row) + +SELECT corr_k(column_x, column_y, 'ONE_SIDED_SIG') FROM test_table; + corr_k +------------------- + .0193887521961533 +(1 row) + +SELECT corr_k(column_x, column_y, 'ONE_SIDED_SIG_POS') FROM test_table; + corr_k +------------------- + .0193887521961533 +(1 row) + +SELECT corr_k(column_x, column_y, 'ONE_SIDED_SIG_NEG') FROM test_table; + corr_k +------------------ + .980611247803847 +(1 row) + +SELECT corr_k(column_x, column_y, 'TWO_SIDED_SIG') FROM test_table; + corr_k +------------------- + .0387775043923066 +(1 row) + +CREATE TABLE null_table1 (column_x double precision, column_y double precision); +INSERT INTO null_table1 (column_x, column_y) VALUES (null, null); +INSERT INTO null_table1 (column_x, column_y) VALUES (null, null); +SELECT corr_s(column_x, column_y, 'COEFFICIENT') FROM null_table1; + corr_s +-------- + +(1 row) + +CREATE TABLE null_table2 (column_x double precision, column_y double precision); +INSERT INTO null_table2(column_x, column_y) VALUES (null, 38); +INSERT INTO null_table2(column_x, column_y) VALUES (null, 29); +SELECT corr_s(column_x, column_y, 'COEFFICIENT') FROM null_table2; + corr_s +-------- + +(1 row) + +CREATE TABLE null_table3 (column_x double precision, column_y double precision); +INSERT INTO null_table3(column_x, column_y) VALUES (55, 38); +INSERT INTO null_table3(column_x, column_y) VALUES (null, 29); +INSERT INTO null_table3(column_x, column_y) VALUES (41, 24); +INSERT INTO null_table3(column_x, column_y) VALUES (48, 33); +INSERT INTO null_table3(column_x, column_y) VALUES (51, 39); +INSERT INTO null_table3(column_x, column_y) VALUES (49, 32); +SELECT corr_s(column_x, column_y, 'COEFFICIENT') FROM null_table3; + corr_s +-------- + .8 +(1 row) + +drop table test_table; +drop table null_table1; +drop table null_table2; +drop table null_table3; drop table t1; drop schema aggregate CASCADE; NOTICE: drop cascades to 3 other objects diff --git a/src/test/regress/sql/agg.sql b/src/test/regress/sql/agg.sql index f0551756f..5f1d3d442 100644 --- a/src/test/regress/sql/agg.sql +++ b/src/test/regress/sql/agg.sql @@ -91,5 +91,50 @@ select sin(sum(a)+sum(b)) , d from test_agg_false where 0=1 group by d; explain (verbose ,costs off) select sum(a)+sum(b) , d , 1 from test_agg_false where 0=1 group by d; select sum(a)+sum(b) , d ,1 from test_agg_false where 0=1 group by d; +CREATE TABLE test_table (column_x double precision , column_y double precision); +INSERT INTO test_table (column_x, column_y) VALUES (55, 38); +INSERT INTO test_table (column_x, column_y) VALUES (46, 29); +INSERT INTO test_table (column_x, column_y) VALUES (41, 24); +INSERT INTO test_table (column_x, column_y) VALUES (48, 33); +INSERT INTO test_table (column_x, column_y) VALUES (51, 39); +INSERT INTO test_table (column_x, column_y) VALUES (49, 32); + +SELECT corr_s(column_x, column_y) FROM test_table; +SELECT corr_s(column_x, column_y, 'COEFFICIENT') FROM test_table; +SELECT corr_s(column_x, column_y, 'ONE_SIDED_SIG') FROM test_table; +SELECT corr_s(column_x, column_y, 'ONE_SIDED_SIG_POS') FROM test_table; +SELECT corr_s(column_x, column_y, 'ONE_SIDED_SIG_NEG') FROM test_table; +SELECT corr_s(column_x, column_y, 'TWO_SIDED_SIG') FROM test_table; + +SELECT corr_k(column_x, column_y) FROM test_table; +SELECT corr_k(column_x, column_y, 'COEFFICIENT') FROM test_table; +SELECT corr_k(column_x, column_y, 'ONE_SIDED_SIG') FROM test_table; +SELECT corr_k(column_x, column_y, 'ONE_SIDED_SIG_POS') FROM test_table; +SELECT corr_k(column_x, column_y, 'ONE_SIDED_SIG_NEG') FROM test_table; +SELECT corr_k(column_x, column_y, 'TWO_SIDED_SIG') FROM test_table; + +CREATE TABLE null_table1 (column_x double precision, column_y double precision); +INSERT INTO null_table1 (column_x, column_y) VALUES (null, null); +INSERT INTO null_table1 (column_x, column_y) VALUES (null, null); +SELECT corr_s(column_x, column_y, 'COEFFICIENT') FROM null_table1; + +CREATE TABLE null_table2 (column_x double precision, column_y double precision); +INSERT INTO null_table2(column_x, column_y) VALUES (null, 38); +INSERT INTO null_table2(column_x, column_y) VALUES (null, 29); +SELECT corr_s(column_x, column_y, 'COEFFICIENT') FROM null_table2; + +CREATE TABLE null_table3 (column_x double precision, column_y double precision); +INSERT INTO null_table3(column_x, column_y) VALUES (55, 38); +INSERT INTO null_table3(column_x, column_y) VALUES (null, 29); +INSERT INTO null_table3(column_x, column_y) VALUES (41, 24); +INSERT INTO null_table3(column_x, column_y) VALUES (48, 33); +INSERT INTO null_table3(column_x, column_y) VALUES (51, 39); +INSERT INTO null_table3(column_x, column_y) VALUES (49, 32); +SELECT corr_s(column_x, column_y, 'COEFFICIENT') FROM null_table3; + +drop table test_table; +drop table null_table1; +drop table null_table2; +drop table null_table3; drop table t1; drop schema aggregate CASCADE;