Files
openGauss-server/contrib/gms_sql/gms_sql.cpp
2024-10-25 04:00:51 -07:00

2296 lines
67 KiB
C++

#include "postgres.h"
#include "fmgr.h"
#include "funcapi.h"
#include "commands/extension.h"
#if PG_VERSION_NUM < 120000
#include "access/heapam.h"
#include "access/printtup.h"
#endif
#include "access/transam.h"
#include "access/tupconvert.h"
#include "lib/stringinfo.h"
#include "parser/scansup.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/elog.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
#include "executor/spi_priv.h"
#include "libpq/libpq.h"
#include "gms_sql.h"
PG_MODULE_MAGIC;
static char *next_token(char *str, char **start, size_t *len, ofTokenType *typ, char **sep, size_t *seplen);
PG_FUNCTION_INFO_V1(gms_sql_is_open);
PG_FUNCTION_INFO_V1(gms_sql_open_cursor);
PG_FUNCTION_INFO_V1(gms_sql_close_cursor);
PG_FUNCTION_INFO_V1(gms_sql_parse);
PG_FUNCTION_INFO_V1(gms_sql_bind_variable);
PG_FUNCTION_INFO_V1(gms_sql_bind_variable_f);
PG_FUNCTION_INFO_V1(gms_sql_bind_array_3);
PG_FUNCTION_INFO_V1(gms_sql_bind_array_5);
PG_FUNCTION_INFO_V1(gms_sql_define_column);
PG_FUNCTION_INFO_V1(gms_sql_define_array);
PG_FUNCTION_INFO_V1(gms_sql_execute);
PG_FUNCTION_INFO_V1(gms_sql_fetch_rows);
PG_FUNCTION_INFO_V1(gms_sql_execute_and_fetch);
PG_FUNCTION_INFO_V1(gms_sql_column_value);
PG_FUNCTION_INFO_V1(gms_sql_column_value_f);
PG_FUNCTION_INFO_V1(gms_sql_last_row_count);
PG_FUNCTION_INFO_V1(gms_sql_describe_columns);
PG_FUNCTION_INFO_V1(gms_sql_describe_columns_f);
PG_FUNCTION_INFO_V1(gms_sql_debug_cursor);
PG_FUNCTION_INFO_V1(gms_sql_return_result);
PG_FUNCTION_INFO_V1(gms_sql_return_result_i);
static uint32 gms_sql_index;
static THR_LOCAL uint64 last_row_count = 0;
static THR_LOCAL TransactionId last_lxid = InvalidTransactionId;
static THR_LOCAL int result_no = 0;
void set_extension_index(uint32 index)
{
gms_sql_index = index;
}
void init_session_vars(void)
{
RepallocSessionVarsArrayIfNecessary();
GmssqlContext* psc =
(GmssqlContext*)MemoryContextAllocZero(u_sess->self_mem_cxt, sizeof(GmssqlContext));
u_sess->attr.attr_common.extension_session_vars_array[gms_sql_index] = psc;
psc->gms_sql_cxt = NULL;
psc->gms_sql_cursors = NULL;
}
GmssqlContext* get_session_context()
{
if (u_sess->attr.attr_common.extension_session_vars_array[gms_sql_index] == NULL) {
init_session_vars();
}
return (GmssqlContext*)u_sess->attr.attr_common.extension_session_vars_array[gms_sql_index];
}
static void
create_cursors()
{
MemoryContext persist_cxt = get_session_context()->gms_sql_cxt;
if (!persist_cxt) {
persist_cxt = AllocSetContextCreate(u_sess->top_mem_cxt,
"gms_sql persist context",
ALLOCSET_DEFAULT_SIZES);
get_session_context()->gms_sql_cxt = persist_cxt;
get_session_context()->gms_sql_cursors = (CursorData*)MemoryContextAllocZero(persist_cxt, u_sess->attr.attr_common.maxOpenCursorCount * sizeof(CursorData));
}
}
static void
open_cursor(CursorData *cursor, int cid)
{
cursor->cid = cid;
MemoryContext persist_cxt = get_session_context()->gms_sql_cxt;
cursor->cursor_cxt = AllocSetContextCreate(persist_cxt,
"gms_sql cursor context",
ALLOCSET_DEFAULT_SIZES);
cursor->assigned = true;
}
/*
* FUNCTION gms_sql.open_cursor() RETURNS int
*/
Datum
gms_sql_open_cursor(PG_FUNCTION_ARGS)
{
int i;
(void) fcinfo;
CursorData *cursors;
if (get_session_context()->gms_sql_cursors == NULL)
create_cursors();
cursors = get_session_context()->gms_sql_cursors;
/* find and initialize first free slot */
for (i = 0; i < u_sess->attr.attr_common.maxOpenCursorCount; i++) {
if(!cursors[i].assigned) {
open_cursor(&cursors[i], i);
PG_RETURN_INT32(i);
}
}
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("too many opened cursors"),
errdetail("There is not free slot for new gms_sql's cursor."),
errhint("You should to close unused cursors")));
/* be msvc quiet */
PG_RETURN_VOID();
}
static CursorData *
get_cursor(FunctionCallInfo fcinfo, bool should_be_assigned)
{
CursorData *cursors;
CursorData *cursor;
int cid;
if (PG_ARGISNULL(0))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("cursor id is NULL")));
cid = PG_GETARG_INT32(0);
if (cid < 0 || cid >= u_sess->attr.attr_common.maxOpenCursorCount)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cursor %d value of cursor id is out of range", cid)));
cursors = get_session_context()->gms_sql_cursors;
if (cursors == NULL)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_CURSOR),
errmsg("cursor is not open")));
cursor = &cursors[cid];
if (!cursor->assigned && should_be_assigned)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_CURSOR),
errmsg("cursor is not valid")));
return cursor;
}
/*
* CREATE FUNCTION gms_sql.is_open(c int) RETURNS bool;
*/
Datum
gms_sql_is_open(PG_FUNCTION_ARGS)
{
CursorData *cursor;
cursor = get_cursor(fcinfo, false);
PG_RETURN_BOOL(cursor->assigned);
}
/*
* Release all sources assigned to cursor
*/
static void
close_cursor(CursorData *cursor)
{
if (cursor->executed && cursor->portal)
SPI_cursor_close(cursor->portal);
/* release all assigned memory */
if (cursor->cursor_cxt)
MemoryContextDelete(cursor->cursor_cxt);
if (cursor->cursor_xact_cxt)
MemoryContextDelete(cursor->cursor_xact_cxt);
if (cursor->plan)
SPI_freeplan(cursor->plan);
memset_s(cursor, sizeof(CursorData), 0, sizeof(CursorData));
}
/*
* PROCEDURE gms_sql.close_cursor(c int)
*/
Datum
gms_sql_close_cursor(PG_FUNCTION_ARGS)
{
CursorData *cursor;
cursor = get_cursor(fcinfo, false);
close_cursor(cursor);
PG_RETURN_VOID();
}
/*
* Print state of cursor - just for debug purposes
*/
Datum
gms_sql_debug_cursor(PG_FUNCTION_ARGS)
{
CursorData *cursor;
ListCell *lc;
cursor = get_cursor(fcinfo, false);
if (cursor->assigned) {
if (cursor->original_query)
elog(NOTICE, "orig query: \"%s\"", cursor->original_query);
if (cursor->parsed_query)
elog(NOTICE, "parsed query: \"%s\"", cursor->parsed_query);
} else
elog(NOTICE, "cursor is not assigned");
foreach(lc, cursor->variables) {
VariableData *var = (VariableData *) lfirst(lc);
if (var->typoid != InvalidOid) {
Oid typOutput;
bool isVarlena;
char *str;
getTypeOutputInfo(var->typoid, &typOutput, &isVarlena);
str = OidOutputFunctionCall(typOutput, var->value);
elog(NOTICE, "variable \"%s\" is assigned to \"%s\"", var->refname, str);
} else
elog(NOTICE, "variable \"%s\" is not assigned", var->refname);
}
foreach(lc, cursor->columns)
{
ColumnData *col = (ColumnData *) lfirst(lc);
elog(NOTICE, "column definition for position %d is %s",
col->position,
format_type_with_typemod(col->typoid, col->typmod));
}
PG_RETURN_VOID();
}
/*
* Search a variable in cursor's variable list
*/
static VariableData *
get_var(CursorData *cursor, char *refname, int position, bool append)
{
ListCell *lc;
VariableData *nvar;
MemoryContext oldcxt;
foreach(lc, cursor->variables) {
VariableData *var = (VariableData *) lfirst(lc);
if (strcmp(var->refname, refname) == 0)
return var;
}
if (append) {
oldcxt = MemoryContextSwitchTo(cursor->cursor_cxt);
nvar = (VariableData*)palloc0(sizeof(VariableData));
nvar->refname = pstrdup(refname);
nvar->varno = cursor->nvariables + 1;
nvar->position = position;
cursor->variables = lappend(cursor->variables, nvar);
cursor->nvariables += 1;
MemoryContextSwitchTo(oldcxt);
return nvar;
} else
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_PARAMETER),
errmsg("variable \"%s\" doesn't exists", refname)));
/* be msvc quite */
return NULL;
}
/*
* PROCEDURE gms_sql.parse(c int, stmt varchar)
*/
Datum
gms_sql_parse(PG_FUNCTION_ARGS)
{
char *query,*ptr;
char *start;
size_t len;
ofTokenType typ;
StringInfoData sinfo;
CursorData *cursor;
MemoryContext oldcxt;
cursor = get_cursor(fcinfo, true);
if (PG_ARGISNULL(1))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("parsed query string is NULL")));
if (cursor->parsed_query) {
int cid = cursor->cid;
close_cursor(cursor);
open_cursor(cursor, cid);
}
query = text_to_cstring(PG_GETARG_TEXT_P(1));
ptr = query;
initStringInfo(&sinfo);
while (ptr) {
char *startsep;
char *next_ptr;
size_t seplen;
next_ptr = next_token(ptr, &start, &len, &typ, &startsep, &seplen);
if (next_ptr) {
if (typ == TOKEN_DOLAR_STR) {
appendStringInfo(&sinfo, "%.*s", (int) seplen, startsep);
appendStringInfo(&sinfo, "%.*s", (int) len, start);
appendStringInfo(&sinfo, "%.*s", (int) seplen, startsep);
} else if (typ == TOKEN_BIND_VAR) {
char *name = downcase_truncate_identifier(start, (int) len, false);
VariableData *var = get_var(cursor, name, (int) (ptr - query), true);
appendStringInfo(&sinfo, "$%d", var->varno);
pfree(name);
} else if (typ == TOKEN_EXT_STR) {
appendStringInfo(&sinfo, "e\'%.*s\'", (int) len, start);
} else if (typ == TOKEN_STR) {
appendStringInfo(&sinfo, "\'%.*s\'", (int) len, start);
} else if (typ == TOKEN_QIDENTIF) {
appendStringInfo(&sinfo, "\"%.*s\"", (int) len, start);
} else if (typ != TOKEN_NONE) {
appendStringInfo(&sinfo, "%.*s", (int) len, start);
}
}
ptr = next_ptr;
}
/* save result to persist context */
oldcxt = MemoryContextSwitchTo(cursor->cursor_cxt);
cursor->original_query = pstrdup(query);
cursor->parsed_query = pstrdup(sinfo.data);
MemoryContextSwitchTo(oldcxt);
pfree(query);
pfree(sinfo.data);
PG_RETURN_VOID();
}
/*
* Calling procedure can be slow, so there is a function alternative
*/
static Datum
bind_variable(PG_FUNCTION_ARGS)
{
CursorData *cursor;
VariableData *var;
char *varname, *varname_downcase;
Oid valtype;
bool is_unknown = false;
cursor = get_cursor(fcinfo, true);
if (PG_ARGISNULL(1))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("name of bind variable is NULL")));
varname = text_to_cstring(PG_GETARG_TEXT_P(1));
if (*varname == ':')
varname += 1;
varname_downcase = downcase_truncate_identifier(varname, (int) strlen(varname), false);
var = get_var(cursor, varname_downcase, -1, false);
valtype = get_fn_expr_argtype(fcinfo->flinfo, 2);
if (valtype == RECORDOID)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot to bind a value of record type")));
valtype = getBaseType(valtype);
if (valtype == UNKNOWNOID) {
is_unknown = true;
valtype = TEXTOID;
}
if (var->typoid != InvalidOid) {
if (!var->typbyval)
pfree(DatumGetPointer(var->value));
var->isnull = true;
}
var->typoid = valtype;
if (!PG_ARGISNULL(2)) {
MemoryContext oldcxt;
get_typlenbyval(var->typoid, &var->typlen, &var->typbyval);
oldcxt = MemoryContextSwitchTo(cursor->cursor_cxt);
if (is_unknown)
var->value = CStringGetTextDatum(DatumGetPointer(PG_GETARG_DATUM(2)));
else
var->value = datumCopy(PG_GETARG_DATUM(2), var->typbyval, var->typlen);
var->isnull = false;
MemoryContextSwitchTo(oldcxt);
} else
var->isnull = true;
PG_RETURN_VOID();
}
/*
* CREATE PROCEDURE gms_sql.bind_variable(c int, name varchar2, value "any");
*/
Datum
gms_sql_bind_variable(PG_FUNCTION_ARGS)
{
return bind_variable(fcinfo);
}
/*
* CREATE FUNCTION gms_sql.bind_variable_f(c int, name varchar2, value "any") RETURNS void;
*/
Datum
gms_sql_bind_variable_f(PG_FUNCTION_ARGS)
{
return bind_variable(fcinfo);
}
static void
bind_array(FunctionCallInfo fcinfo, int index1, int index2)
{
CursorData *cursor;
VariableData *var;
char *varname, *varname_downcase;
Oid valtype;
Oid elementtype;
bool is_unknown = false;
cursor = get_cursor(fcinfo, true);
if (PG_ARGISNULL(1))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("name of bind variable is NULL")));
varname = text_to_cstring(PG_GETARG_TEXT_P(1));
if (*varname == ':')
varname += 1;
varname_downcase = downcase_truncate_identifier(varname, (int) strlen(varname), false);
var = get_var(cursor, varname_downcase, -1, false);
valtype = get_fn_expr_argtype(fcinfo->flinfo, 2);
if (valtype == RECORDOID)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot to bind a value of record type")));
valtype = getBaseType(valtype);
elementtype = get_element_type(valtype);
if (!OidIsValid(elementtype))
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("value is not a array")));
var->is_array = true;
var->typoid = valtype;
var->typelemid = elementtype;
get_typlenbyval(elementtype, &var->typelemlen, &var->typelembyval);
if (!PG_ARGISNULL(2)) {
MemoryContext oldcxt;
get_typlenbyval(var->typoid, &var->typlen, &var->typbyval);
oldcxt = MemoryContextSwitchTo(cursor->cursor_cxt);
if (is_unknown)
var->value = CStringGetTextDatum(DatumGetPointer(PG_GETARG_DATUM(2)));
else
var->value = datumCopy(PG_GETARG_DATUM(2), var->typbyval, var->typlen);
var->isnull = false;
MemoryContextSwitchTo(oldcxt);
} else
var->isnull = true;
var->index1 = index1;
var->index2 = index2;
}
/*
* CREATE PROCEDURE gms_sql.bind_array(c int, name varchar2, value anyarray);
*/
Datum
gms_sql_bind_array_3(PG_FUNCTION_ARGS)
{
bind_array(fcinfo, -1, -1);
PG_RETURN_VOID();
}
/*
* CREATE PROCEDURE gms_sql.bind_array(c int, name varchar2, value anyarray, index1 int, index2 int);
*/
Datum
gms_sql_bind_array_5(PG_FUNCTION_ARGS)
{
int index1, index2;
if (PG_ARGISNULL(3) || PG_ARGISNULL(4))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("index is NULL")));
index1 = PG_GETARG_INT32(3);
index2 = PG_GETARG_INT32(4);
if (index1 < 0 || index2 < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("index is below zero")));
if (index1 > index2)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("index1 is greater than index2")));
bind_array(fcinfo, index1, index2);
PG_RETURN_VOID();
}
static ColumnData *
get_col(CursorData *cursor, int position, bool append)
{
ListCell *lc;
ColumnData *ncol;
MemoryContext oldcxt;
foreach(lc, cursor->columns) {
ColumnData *col = (ColumnData *) lfirst(lc);
if (col->position == position)
return col;
}
if (append) {
oldcxt = MemoryContextSwitchTo(cursor->cursor_cxt);
ncol = (ColumnData*)palloc0(sizeof(ColumnData));
ncol->position = position;
if (cursor->max_colpos < position)
cursor->max_colpos = position;
cursor->columns = lappend(cursor->columns, ncol);
MemoryContextSwitchTo(oldcxt);
return ncol;
} else
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column no %d is not defined", position)));
/* be msvc quite */
return NULL;
}
/*
* CREATE PROCEDURE gms_sql.define_column(c int, col int, value "any", column_size int DEFAULT -1);
*/
Datum
gms_sql_define_column(PG_FUNCTION_ARGS)
{
CursorData *cursor;
ColumnData *col;
Oid valtype;
Oid basetype;
int position;
int colsize;
TYPCATEGORY category;
bool ispreferred;
cursor = get_cursor(fcinfo, true);
if (PG_ARGISNULL(1))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("column position (number) is NULL")));
position = PG_GETARG_INT32(1);
col = get_col(cursor, position, true);
valtype = get_fn_expr_argtype(fcinfo->flinfo, 2);
if (valtype == RECORDOID)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot to define a column of record type")));
if (valtype == UNKNOWNOID)
valtype = TEXTOID;
basetype = getBaseType(valtype);
if (col->typoid != InvalidOid)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
errmsg("column is defined already")));
col->typoid = valtype;
if (PG_ARGISNULL(3))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("column_size is NULL")));
colsize = PG_GETARG_INT32(3);
get_type_category_preferred(basetype, &category, &ispreferred);
col->typisstr = category == TYPCATEGORY_STRING;
col->typmod = (col->typisstr && colsize != -1) ? colsize + 4 : -1;
get_typlenbyval(basetype, &col->typlen, &col->typbyval);
col->rowcount = 1;
PG_RETURN_VOID();
}
/*
* CREATE PROCEDURE gms_sql.define_array(c int, col int, value "anyarray", rowcount int, index1 int);
*/
Datum
gms_sql_define_array(PG_FUNCTION_ARGS)
{
CursorData *cursor;
ColumnData *col;
Oid valtype;
Oid basetype;
int position;
int rowcount;
int index1;
Oid elementtype;
TYPCATEGORY category;
bool ispreferred;
cursor = get_cursor(fcinfo, true);
if (PG_ARGISNULL(1))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("column position (number) is NULL")));
position = PG_GETARG_INT32(1);
col = get_col(cursor, position, true);
valtype = get_fn_expr_argtype(fcinfo->flinfo, 2);
if (valtype == RECORDOID)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot to define a column of record type")));
get_type_category_preferred(valtype, &category, &ispreferred);
if (category != TYPCATEGORY_ARRAY)
elog(ERROR, "defined value is not array");
col->typarrayoid = valtype;
basetype = getBaseType(valtype);
elementtype = get_element_type(basetype);
if (!OidIsValid(elementtype))
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("column is not a array")));
if (col->typoid != InvalidOid)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
errmsg("column is defined already")));
col->typoid = elementtype;
if (PG_ARGISNULL(3))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("cnt is NULL")));
rowcount = PG_GETARG_INT32(3);
if (rowcount <= 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cnt is less or equal to zero")));
col->rowcount = (uint64) rowcount;
if (PG_ARGISNULL(4))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("lower_bnd is NULL")));
index1 = PG_GETARG_INT32(4);
if (index1 < 1)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("lower_bnd is less than one")));
if (index1 != 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("lower_bnd can be only only \"1\"")));
col->index1 = index1;
get_typlenbyval(col->typoid, &col->typlen, &col->typbyval);
PG_RETURN_VOID();
}
static void
cursor_xact_cxt_deletion_callback(std::shared_ptr<void> arg)
{
CursorData *cur = std::static_pointer_cast<CursorData>(arg).get();
cur->cursor_xact_cxt = NULL;
cur->tuples_cxt = NULL;
cur->processed = 0;
cur->nread = 0;
cur->executed = false;
cur->tupdesc = NULL;
cur->coltupdesc = NULL;
cur->casts = NULL;
cur->array_columns = NULL;
}
static void prepare_and_execute_cursor(CursorData *cursor)
{
Datum *values;
Oid *types;
char *nulls;
ListCell *lc;
int i;
MemoryContext oldcxt;
uint64 batch_rows = 0;
oldcxt = MemoryContextSwitchTo(cursor->cursor_xact_cxt);
/* prepare query arguments */
values = (Datum*)palloc(sizeof(Datum) * cursor->nvariables);
types = (Oid*)palloc(sizeof(Oid) * cursor->nvariables);
nulls = (char*)palloc(sizeof(char) * cursor->nvariables);
i = 0;
foreach(lc, cursor->variables) {
VariableData *var = (VariableData *) lfirst(lc);
if (var->is_array)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("a array (bulk) variable can be used only when no column is defined")));
if (!var->isnull) {
/* copy a value to xact memory context, to be independent on a outside */
values[i] = datumCopy(var->value, var->typbyval, var->typlen);
nulls[i] = ' ';
} else
nulls[i] = 'n';
if (var->typoid == InvalidOid)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_PARAMETER),
errmsg("variable \"%s\" has not a value", var->refname)));
types[i] = var->typoid;
i += 1;
}
/* prepare or refresh target tuple descriptor, used for final tupconversion */
if (cursor->tupdesc)
FreeTupleDesc(cursor->tupdesc);
#if PG_VERSION_NUM >= 120000
cursor->coltupdesc = CreateTemplateTupleDesc(cursor->max_colpos);
#else
cursor->coltupdesc = CreateTemplateTupleDesc(cursor->max_colpos, false);
#endif
/* prepare current result column tupdesc */
for (i = 1; i <= cursor->max_colpos; i++) {
ColumnData *col = get_col(cursor, i, false);
char genname[32];
snprintf_s(genname, 32, 31, "col%d", i);
Assert(col->rowcount > 0);
if (col->typarrayoid) {
if (batch_rows != 0)
batch_rows = col->rowcount < batch_rows ? col->rowcount : batch_rows;
else
batch_rows = col->rowcount;
cursor->array_columns = bms_add_member(cursor->array_columns, i);
} else {
/* in this case we cannot do batch of rows */
batch_rows = 1;
}
TupleDescInitEntry(cursor->coltupdesc, (AttrNumber) i, genname, col->typoid, col->typmod, 0);
}
cursor->batch_rows = batch_rows;
Assert(cursor->coltupdesc->natts >= 0);
cursor->casts = (CastCacheData*)palloc0(sizeof(CastCacheData) * ((unsigned int) cursor->coltupdesc->natts));
MemoryContextSwitchTo(oldcxt);
snprintf_s(cursor->cursorname, sizeof(cursor->cursorname), sizeof(cursor->cursorname) - 1, "__orafce_gms_sql_cursor_%d", cursor->cid);
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "SPI_connact failed");
cursor->portal = SPI_cursor_open_with_args(cursor->cursorname,
cursor->parsed_query,
(int) cursor->nvariables,
types,
values,
nulls,
false,
0);
/* internal error */
if (cursor->portal == NULL)
elog(ERROR,
"could not open cursor for query \"%s\": %s",
cursor->parsed_query,
SPI_result_code_string(SPI_result));
SPI_finish();
/* Describe portal and prepare cast cache */
if (cursor->portal->tupDesc) {
int natts = 0;
TupleDesc tupdesc = cursor->portal->tupDesc;
for (i = 0; i < tupdesc->natts; i++) {
Form_pg_attribute att = TupleDescAttr(tupdesc, i);
if (att->attisdropped)
continue;
natts += 1;
}
if (natts != cursor->coltupdesc->natts)
ereport(ERROR,
(errcode(ERRCODE_DATA_EXCEPTION),
errmsg("number of defined columns is different than number of query's columns")));
}
cursor->executed = true;
}
static uint64 execute_spi_plan(CursorData *cursor)
{
MemoryContext oldcxt;
Datum *values;
char *nulls;
ArrayIterator *iterators;
bool has_iterator = false;
bool has_value = true;
int max_index1 = -1;
int min_index2 = -1;
int max_rows = -1;
uint64 result = 0;
ListCell *lc;
int i;
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "SPI_connact failed");
/* prepare, or reuse cached plan */
if (!cursor->plan) {
Oid *types = NULL;
SPIPlanPtr plan;
types = (Oid*)palloc(sizeof(Oid) * cursor->nvariables);
i = 0;
foreach(lc, cursor->variables) {
VariableData *var = (VariableData *) lfirst(lc);
if (var->typoid == InvalidOid)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_PARAMETER),
errmsg("variable \"%s\" has not a value", var->refname)));
types[i++] = var->is_array ? var->typelemid : var->typoid;
}
plan = SPI_prepare(cursor->parsed_query, (int) cursor->nvariables, types);
if (!plan)
/* internal error */
elog(ERROR, "cannot to prepare plan");
if (types)
pfree(types);
SPI_keepplan(plan);
cursor->plan = plan;
}
oldcxt = MemoryContextSwitchTo(cursor->result_cxt);
/* prepare query arguments */
values = (Datum*)palloc(sizeof(Datum) * cursor->nvariables);
nulls = (char*)palloc(sizeof(char) * cursor->nvariables);
iterators = (ArrayIterator*)palloc(sizeof(ArrayIterator *) * cursor->nvariables);
has_value = true;
i = 0;
foreach(lc, cursor->variables) {
VariableData *var = (VariableData *) lfirst(lc);
if (var->is_array) {
if (!var->isnull) {
iterators[i] = array_create_iterator(DatumGetArrayTypeP(var->value), 0);
/* search do lowest common denominator */
if (var->index1 != -1) {
if (max_index1 != -1) {
max_index1 = max_index1 < var->index1 ? var->index1 : max_index1;
min_index2 = min_index2 > var->index2 ? var->index2 : min_index2;
} else {
max_index1 = var->index1;
min_index2 = var->index2;
}
}
has_iterator = true;
} else {
/* cannot to read data from NULL array */
has_value = false;
break;
}
} else {
values[i] = var->value;
nulls[i] = var->isnull ? 'n' : ' ';
}
i += 1;
}
if (has_iterator) {
if (has_value) {
if (max_index1 != -1) {
max_rows = min_index2 - max_index1 + 1;
has_value = max_rows > 0;
if (has_value && max_index1 > 1) {
i = 0;
foreach(lc, cursor->variables) {
VariableData *var = (VariableData *) lfirst(lc);
if (var->is_array) {
int j;
Assert(iterators[i]);
for (j = 1; j < max_index1; j++) {
Datum value;
bool isnull;
has_value = array_iterate(iterators[i], &value, &isnull);
if (!has_value)
break;
}
if (!has_value)
break;
}
i += 1;
}
}
}
}
while (has_value && (max_rows == -1 || max_rows > 0)) {
int rc;
i = 0;
foreach(lc, cursor->variables) {
VariableData *var = (VariableData *) lfirst(lc);
if (var->is_array) {
Datum value;
bool isnull;
has_value = array_iterate(iterators[i], &value, &isnull);
if (!has_value)
break;
values[i] = value;
nulls[i] = isnull ? 'n' : ' ';
}
i += 1;
}
if (!has_value)
break;
rc = SPI_execute_plan(cursor->plan, values, nulls, false, 0);
if (rc < 0)
/* internal error */
elog(ERROR, "cannot to execute a query");
result += SPI_processed;
if (max_rows > 0)
max_rows -= 1;
}
MemoryContextReset(cursor->result_cxt);
} else {
int rc = SPI_execute_plan(cursor->plan, values, nulls, false, 0);
if (rc < 0)
/* internal error */
elog(ERROR, "cannot to execute a query");
result = SPI_processed;
}
SPI_finish();
MemoryContextSwitchTo(oldcxt);
return result;
}
static uint64
execute_query(CursorData *cursor)
{
last_row_count = 0;
/* clean space with saved result */
if (!cursor->cursor_xact_cxt) {
MemoryContextCallback *mcb;
MemoryContext oldcxt;
cursor->cursor_xact_cxt = AllocSetContextCreate(u_sess->top_transaction_mem_cxt,
"gms_sql transaction context",
ALLOCSET_DEFAULT_SIZES);
oldcxt = MemoryContextSwitchTo(cursor->cursor_xact_cxt);
mcb = (MemoryContextCallback*)palloc0(sizeof(MemoryContextCallback));
mcb->func = cursor_xact_cxt_deletion_callback;
mcb->arg = std::shared_ptr<void>(cursor);
MemoryContextRegisterResetCallback(cursor->cursor_xact_cxt, mcb);
MemoryContextSwitchTo(oldcxt);
} else {
MemoryContext save_cxt = cursor->cursor_xact_cxt;
/* free allocated memory in cursor_xact_cxt */
MemoryContextReset(cursor->cursor_xact_cxt);
cursor->cursor_xact_cxt = save_cxt;
cursor->casts = NULL;
cursor->tupdesc = NULL;
cursor->tuples_cxt = NULL;
}
cursor->result_cxt = AllocSetContextCreate(cursor->cursor_xact_cxt,
"gms_sql short life context",
ALLOCSET_DEFAULT_SIZES);
/*
* When column definitions are available, build final query
* and open cursor for fetching. When column definitions are
* missing, then the statement can be called with high frequency
* etc INSERT, UPDATE, so use cached plan.
*/
if (cursor->columns) {
prepare_and_execute_cursor(cursor);
} else {
return execute_spi_plan(cursor);
}
return 0L;
}
/*
* CREATE FUNCTION gms_sql.execute(c int) RETURNS bigint;
*/
Datum
gms_sql_execute(PG_FUNCTION_ARGS)
{
CursorData *cursor;
cursor = get_cursor(fcinfo, true);
PG_RETURN_INT64((int64) execute_query(cursor));
}
static uint64
fetch_rows(CursorData *cursor, bool exact)
{
uint64 can_read_rows;
if (!cursor->executed)
ereport(ERROR,
(errcode(ERRCODE_INVALID_CURSOR_STATE),
errmsg("cursor is not executed")));
if (!cursor->portal)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cursor has not portal")));
if (cursor->nread == cursor->processed) {
MemoryContext oldcxt;
uint64 i;
int batch_rows;
if (!exact) {
if (cursor->array_columns)
batch_rows = (1000 / cursor->batch_rows) * cursor->batch_rows;
else
batch_rows = 1000;
} else
batch_rows = 2;
/* create or reset context for tuples */
if (!cursor->tuples_cxt)
cursor->tuples_cxt = AllocSetContextCreate(cursor->cursor_xact_cxt,
"gms_sql tuples context",
ALLOCSET_DEFAULT_SIZES);
else
MemoryContextReset(cursor->tuples_cxt);
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "SPI_connact failed");
/* try to fetch data from cursor */
SPI_cursor_fetch(cursor->portal, true, batch_rows);
if (SPI_tuptable == NULL)
elog(ERROR, "cannot fetch data");
if (exact && SPI_processed > 1)
ereport(ERROR,
(errcode(ERRCODE_TOO_MANY_ROWS),
errmsg("too many rows"),
errdetail("In exact mode only one row is expected")));
if (exact && SPI_processed == 0)
ereport(ERROR,
(errcode(ERRCODE_NO_DATA_FOUND),
errmsg("no data found"),
errdetail("In exact mode only one row is expected")));
oldcxt = MemoryContextSwitchTo(cursor->tuples_cxt);
cursor->tupdesc = CreateTupleDescCopy(SPI_tuptable->tupdesc);
for (i = 0; i < SPI_processed; i++)
cursor->tuples[i] = heap_copytuple(SPI_tuptable->vals[i]);
MemoryContextSwitchTo(oldcxt);
cursor->processed = SPI_processed;
cursor->nread = 0;
SPI_finish();
}
if (cursor->processed - cursor->nread >= cursor->batch_rows)
can_read_rows = cursor->batch_rows;
else
can_read_rows = cursor->processed - cursor->nread;
cursor->start_read = cursor->nread;
cursor->nread += can_read_rows;
last_row_count = can_read_rows;
return can_read_rows;
}
/*
* CREATE FUNCTION gms_sql.fetch_rows(c int) RETURNS int;
*/
Datum
gms_sql_fetch_rows(PG_FUNCTION_ARGS)
{
CursorData *cursor;
cursor = get_cursor(fcinfo, true);
PG_RETURN_INT32(fetch_rows(cursor, false));
}
/*
* CREATE FUNCTION gms_sql.execute_and_fetch(c int, exact bool DEFAULT false) RETURNS int;
*/
Datum
gms_sql_execute_and_fetch(PG_FUNCTION_ARGS)
{
CursorData *cursor;
bool exact;
cursor = get_cursor(fcinfo, true);
if (PG_ARGISNULL(1))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("exact option is NULL")));
exact = PG_GETARG_BOOL(1);
execute_query(cursor);
PG_RETURN_INT32(fetch_rows(cursor, exact));
}
/*
* CREATE FUNCTION gms_sql.last_row_count() RETURNS int;
*/
Datum
gms_sql_last_row_count(PG_FUNCTION_ARGS)
{
(void) fcinfo;
PG_RETURN_INT32(last_row_count);
}
/*
* Initialize cast case entry.
*/
static void
init_cast_cache_entry(CastCacheData *ccast,
Oid targettypid,
int32 targettypmod,
Oid sourcetypid)
{
Oid funcoid;
Oid basetypid;
basetypid = getBaseType(targettypid);
if (targettypid != basetypid)
ccast->targettypid = targettypid;
else
ccast->targettypid = InvalidOid;
ccast->targettypmod = targettypmod;
if (sourcetypid == targettypid)
ccast->without_cast = targettypmod == -1;
else
ccast->without_cast = false;
if (!ccast->without_cast) {
ccast->path = find_coercion_pathway(basetypid,
sourcetypid,
COERCION_ASSIGNMENT,
&funcoid);
if (ccast->path == COERCION_PATH_NONE)
ereport(ERROR,
(errcode(ERRCODE_CANNOT_COERCE),
errmsg("cannot to find cast from source type \"%s\" to target type \"%s\"",
format_type_be(sourcetypid),
format_type_be(basetypid))));
if (ccast->path == COERCION_PATH_FUNC) {
fmgr_info(funcoid, &ccast->finfo);
} else if (ccast->path == COERCION_PATH_COERCEVIAIO) {
bool typisvarlena;
getTypeOutputInfo(sourcetypid, &funcoid, &typisvarlena);
fmgr_info(funcoid, &ccast->finfo_out);
getTypeInputInfo(targettypid, &funcoid, &ccast->typIOParam);
fmgr_info(funcoid, &ccast->finfo_in);
}
if (targettypmod != -1) {
ccast->path_typmod = find_typmod_coercion_function(targettypid,
&funcoid);
if (ccast->path_typmod == COERCION_PATH_FUNC)
fmgr_info(funcoid, &ccast->finfo_typmod);
}
}
ccast->isvalid = true;
}
/*
* Apply cast rules to a value
*/
static Datum
cast_value(CastCacheData *ccast, Datum value, bool isnull)
{
if (!isnull && !ccast->without_cast) {
if (ccast->path == COERCION_PATH_FUNC) {
value = FunctionCall1(&ccast->finfo, value);
} else if (ccast->path == COERCION_PATH_RELABELTYPE) {
value = value;
} else if (ccast->path == COERCION_PATH_COERCEVIAIO) {
char *str;
str = OutputFunctionCall(&ccast->finfo_out, value);
value = InputFunctionCall(&ccast->finfo_in,
str,
ccast->typIOParam,
ccast->targettypmod);
} else
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("unsupported cast path yet %d", ccast->path)));
if (ccast->targettypmod != -1 && ccast->path_typmod == COERCION_PATH_FUNC)
value = FunctionCall3(&ccast->finfo_typmod,
value,
Int32GetDatum(ccast->targettypmod),
BoolGetDatum(true));
}
if (ccast->targettypid != InvalidOid)
domain_check(value, isnull, ccast->targettypid, NULL, NULL);
return value;
}
/*
* CALL statement is relatily slow in PLpgSQL - due repated parsing, planning.
* So I wrote two variant of this routine. When spi_transfer is true, then
* the value is copyied to SPI outer memory context.
*/
static Datum
column_value(CursorData *cursor, int pos, Oid targetTypeId, bool *isnull, bool spi_transfer)
{
Datum value;
int32 columnTypeMode;
Oid columnTypeId;
CastCacheData *ccast;
if (!cursor->executed)
ereport(ERROR,
(errcode(ERRCODE_INVALID_CURSOR_STATE),
errmsg("cursor is not executed")));
if (!cursor->tupdesc)
ereport(ERROR,
(errcode(ERRCODE_INVALID_CURSOR_STATE),
errmsg("cursor is not fetched")));
if (!cursor->coltupdesc)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("no column is defined")));
if (pos < 1 && pos > cursor->coltupdesc->natts)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("column position is of of range [1, %d]",
cursor->coltupdesc->natts)));
columnTypeId = (TupleDescAttr(cursor->coltupdesc, pos - 1))->atttypid;
columnTypeMode = (TupleDescAttr(cursor->coltupdesc, pos - 1))->atttypmod;
Assert(cursor->casts);
ccast = &cursor->casts[pos - 1];
if (!ccast->isvalid) {
Oid basetype = getBaseType(targetTypeId);
init_cast_cache_entry(ccast,
columnTypeId,
columnTypeMode,
SPI_gettypeid(cursor->tupdesc, pos));
ccast->is_array = bms_is_member(pos, cursor->array_columns);
if (ccast->is_array) {
ccast->array_targettypid = basetype != targetTypeId ? targetTypeId : InvalidOid;
if (get_array_type(getBaseType(columnTypeId)) != basetype)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("unexpected target type \"%s\" (expected type \"%s\")",
format_type_be(basetype),
format_type_be(get_array_type(getBaseType(columnTypeId))))));
} else
ccast->array_targettypid = InvalidOid;
get_typlenbyval(basetype, &ccast->typlen, &ccast->typbyval);
}
if (ccast->is_array) {
ArrayBuildState *abs = NULL;
uint64 idx;
uint64 i;
idx = cursor->start_read;
for (i = 0; i < cursor->batch_rows; i++) {
if (idx < cursor->processed) {
value = SPI_getbinval(cursor->tuples[idx], cursor->tupdesc, pos, isnull);
value = cast_value(ccast, value, *isnull);
abs = accumArrayResult(abs,
value,
*isnull,
columnTypeId,
CurrentMemoryContext);
idx += 1;
}
}
value = makeArrayResult(abs, CurrentMemoryContext);
if (ccast->array_targettypid != InvalidOid)
domain_check(value, isnull, ccast->array_targettypid, NULL, NULL);
} else {
/* Maybe it can be solved by uncached slower cast */
if (targetTypeId != columnTypeId)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("unexpected target type \"%s\" (expected type \"%s\")",
format_type_be(targetTypeId),
format_type_be(columnTypeId))));
value = SPI_getbinval(cursor->tuples[cursor->start_read], cursor->tupdesc, pos, isnull);
value = cast_value(ccast, value, *isnull);
}
if (spi_transfer)
value = SPI_datumTransfer(value, ccast->typbyval, ccast->typlen);
return value;
}
/*
* CREATE PROCEDURE gms_sql.column_value(c int, pos int, INOUT value "any");
* Note - CALL statement is slow from PLpgSQL block (against function execution).
* This is reason why this routine is in function form too.
*/
Datum
gms_sql_column_value(PG_FUNCTION_ARGS)
{
CursorData *cursor;
Datum value;
int pos;
bool isnull;
Oid targetTypeId;
MemoryContext oldcxt;
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "SPI_connact failed");
cursor = get_cursor(fcinfo, true);
if (PG_ARGISNULL(1))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("column position (number) is NULL")));
pos = PG_GETARG_INT32(1);
oldcxt = MemoryContextSwitchTo(cursor->result_cxt);
targetTypeId = get_fn_expr_argtype(fcinfo->flinfo, 2);
value = column_value(cursor, pos, targetTypeId, &isnull, true);
SPI_finish();
MemoryContextSwitchTo(oldcxt);
MemoryContextReset(cursor->result_cxt);
PG_RETURN_DATUM(value);
}
/*
* CREATE FUNCTION gms_sql.column_value(c int, pos int, value anyelement) RETURNS anyelement;
* Note - CALL statement is slow from PLpgSQL block (against function execution).
* This is reason why this routine is in function form too.
*/
Datum
gms_sql_column_value_f(PG_FUNCTION_ARGS)
{
return gms_sql_column_value(fcinfo);
}
/******************************************************************
* Simple parser - just for replacement of bind variables by
* PostgreSQL $ param placeholders.
*
******************************************************************
*/
/*
* It doesn't work for multibyte encodings, but same implementation
* is in Postgres too.
*/
static bool
is_identif(unsigned char c)
{
if (c >= 'a' && c <= 'z')
return true;
else if (c >= 'A' && c <= 'Z')
return true;
else if (c >= 0200)
return true;
else
return false;
}
/*
* simple parser to detect :identif symbols in query
*/
static char *
next_token(char *str, char **start, size_t *len, ofTokenType *typ, char **sep, size_t *seplen)
{
if (*str == '\0') {
*typ = TOKEN_NONE;
return NULL;
}
/* reduce spaces */
if (*str == ' ') {
*start = str++;
while (*str == ' ')
str++;
*typ = TOKEN_SPACES;
*len = 1;
return str;
}
/* Postgres's dolar strings */
if (*str == '$' && (str[1] == '$' ||
is_identif((unsigned char) str[1]) || str[1] == '_')) {
char *aux = str + 1;
char *endstr;
bool is_valid = false;
char *buffer;
/* try to find end of separator */
while (*aux) {
if (*aux == '$') {
is_valid = true;
aux++;
break;
} else if (is_identif((unsigned char) *aux) ||
isdigit(*aux) ||
*aux == '_') {
aux++;
} else
break;
}
if (!is_valid) {
*typ = TOKEN_OTHER;
*len = 1;
*start = str;
return str + 1;
}
/* now it looks like correct $ separator */
*start = aux; *sep = str;
Assert(aux >= str);
*seplen = (size_t) (aux - str);
*typ = TOKEN_DOLAR_STR;
/* try to find second instance */
buffer = (char*)palloc(*seplen + 1);
memcpy(buffer, *sep, *seplen);
buffer[*seplen] = '\0';
endstr = strstr(aux, buffer);
if (endstr) {
Assert(endstr >= *start);
*len = (size_t) (endstr - *start);
return endstr + *seplen;
} else {
while (*aux)
aux++;
Assert(aux >= *start);
*len = (size_t) (aux - *start);
return aux;
}
return aux;
}
/* Pair comments */
if (*str == '/' && str[1] == '*') {
*start = str; str += 2;
while (*str) {
if (*str == '*' && str[1] == '/') {
str += 2;
break;
}
str++;
}
*typ = TOKEN_COMMENT;
Assert(str >= *start);
*len = (size_t) (str - *start);
return str;
}
/* Number */
if (isdigit(*str) || (*str == '.' && isdigit(str[1]))) {
bool point = *str == '.';
*start = str++;
while (*str) {
if (isdigit(*str))
str++;
else if (*str == '.' && !point) {
str++; point = true;
} else
break;
}
*typ = TOKEN_NUMBER;
Assert(str >= *start);
*len = (size_t) (str - *start);
return str;
}
/* Double colon :: */
if (*str == ':' && str[1] == ':') {
*start = str;
*typ = TOKEN_DOUBLE_COLON;
*len = 2;
return str + 2;
}
/* Bind variable placeholder */
if (*str == ':' &&
(is_identif((unsigned char) str[1]) || str[1] == '_')) {
*start = &str[1]; str += 2;
while (*str) {
if (is_identif((unsigned char) *str) ||
isdigit(*str) ||
*str == '_')
str++;
else
break;
}
*typ = TOKEN_BIND_VAR;
Assert(str >= *start);
*len = (size_t) (str - *start);
return str;
}
/* Extended string literal */
if ((*str == 'e' || *str == 'E') && str[1] == '\'') {
*start = &str[2]; str += 2;
while (*str) {
if (*str == '\'') {
*typ = TOKEN_EXT_STR;
Assert(str >= *start);
*len = (size_t) (str - *start);
return str + 1;
}
if (*str == '\\' && str[1] == '\'')
str += 2;
else if (*str == '\\' && str[1] == '\\')
str += 2;
else
str += 1;
}
*typ = TOKEN_EXT_STR;
Assert(str >= *start);
*len = (size_t) (str - *start);
return str;
}
/* String literal */
if (*str == '\'') {
*start = &str[1]; str += 1;
while (*str) {
if (*str == '\'') {
if (str[1] != '\'') {
*typ = TOKEN_STR;
Assert(str >= *start);
*len = (size_t) (str - *start);
return str + 1;
}
str += 2;
} else
str += 1;
}
*typ = TOKEN_STR;
Assert(str >= *start);
*len = (size_t) (str - *start);
return str;
}
/* Quoted identifier */
if (*str == '"') {
*start = &str[1]; str += 1;
while (*str) {
if (*str == '"') {
if (str[1] != '"') {
*typ = TOKEN_QIDENTIF;
Assert(str >= *start);
*len = (size_t) (str - *start);
return str + 1;
}
str += 2;
} else
str += 1;
}
*typ = TOKEN_QIDENTIF;
Assert(str >= *start);
*len = (size_t) (str - *start);
return str;
}
/* Identifiers */
if (is_identif((unsigned char) *str) || *str == '_') {
*start = str++;
while (*str) {
if (is_identif((unsigned char) *str) ||
isdigit(*str) ||
*str == '_')
str++;
else
break;
}
*typ = TOKEN_IDENTIF;
Assert(str >= *start);
*len = (size_t) (str - *start);
return str;
}
/* Others */
*typ = TOKEN_OTHER;
*start = str;
*len = 1;
return str + 1;
}
typedef struct {
int type_id;
int type_col_num;
char* type_name;
} gms_sql_desc_rec_type;
static gms_sql_desc_rec_type desc_rec_type_table[] = {
{1, 11, "desc_rec", },
{2, 11, "desc_rec2"},
{3, 13, "desc_rec3"},
{4, 13, "desc_rec4"}
};
static
gms_sql_desc_rec_type* gms_sql_search_desc_rec_type(Oid typid)
{
Oid nspOid;
Oid typOid;
int i;
nspOid = get_namespace_oid("gms_sql", false);
for (i = 0; i < 4; i++) {
typOid = GetSysCacheOid2(TYPENAMENSP, CStringGetDatum(desc_rec_type_table[i].type_name), ObjectIdGetDatum(nspOid));
if (!OidIsValid(typOid)) {
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("gms_sql.%s type does not exist.", desc_rec_type_table[i].type_name)));
}
if (typid == typOid)
return &desc_rec_type_table[i];
}
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("typeid %u is not gms_sql.desc_rec.", typid)));
return NULL;
}
static int
map_type_code(Oid type_id)
{
int code = 109;
switch (type_id) {
case VARCHAROID:
case NVARCHAR2OID:
code = 1;
break;
case NUMERICOID:
case INT1OID:
case INT2OID:
case INT4OID:
case INT8OID:
case INT16OID:
case FLOAT4OID:
code = 2;
break;
case DATEOID:
code = 12;
break;
case FLOAT8OID:
code = 101;
break;
case RAWOID:
code = 23;
break;
case CHAROID:
code = 96;
break;
case CLOBOID:
code = 112;
break;
case BLOBOID:
code = 113;
break;
case JSONOID:
code = 119;
break;
case TIMESTAMPOID:
code = 180;
break;
case TIMESTAMPTZOID:
code = 181;
break;
case INTERVALOID:
code = 183;
break;
default:
break;
}
return code;
}
/*
* CREATE PROCEDURE gms_sql.describe_columns(c int, OUT col_cnt int, OUT desc_t gms_sql.desc_rec[])
*
* Returns an array of column's descriptions. Attention, the typid are related to PostgreSQL type
* system.
*/
Datum
gms_sql_describe_columns(PG_FUNCTION_ARGS)
{
CursorData *cursor;
Datum *values;
bool *nulls;
TupleDesc tupdesc;
TupleDesc desc_rec_tupdesc;
TupleDesc cursor_tupdesc;
HeapTuple tuple;
Oid arraytypid;
Oid desc_rec_typid;
Oid *types = NULL;
ArrayBuildState *abuilder = NULL;
SPIPlanPtr plan;
CachedPlanSource *plansource = NULL;
int ncolumns = 0;
int rc;
int i = 0;
bool nonatomic;
gms_sql_desc_rec_type* rec_type;
MemoryContext callercxt = CurrentMemoryContext;
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
arraytypid = TupleDescAttr(tupdesc, 1)->atttypid;
desc_rec_typid = get_element_type(arraytypid);
if (!OidIsValid(desc_rec_typid))
elog(ERROR, "second output field must be an array");
rec_type = gms_sql_search_desc_rec_type(desc_rec_typid);
desc_rec_tupdesc = lookup_rowtype_tupdesc_copy(desc_rec_typid, -1);
values = (Datum*)palloc0(rec_type->type_col_num * sizeof(Datum));
nulls = (bool*)palloc0(rec_type->type_col_num * sizeof(bool));
cursor = get_cursor(fcinfo, true);
if (cursor->variables) {
ListCell *lc;
types = (Oid*)palloc(sizeof(Oid) * cursor->nvariables);
i = 0;
foreach(lc, cursor->variables) {
VariableData *var = (VariableData *) lfirst(lc);
if (var->typoid == InvalidOid)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_PARAMETER),
errmsg("variable \"%s\" has not a value", var->refname)));
types[i++] = var->is_array ? var->typelemid : var->typoid;
}
}
/*
* Connect to SPI manager
*/
nonatomic = fcinfo->context && IsA(fcinfo->context, FunctionScanState) &&
!castNode(FunctionScanState, fcinfo->context)->atomic;
if ((rc = SPI_connect_ext(DestSPI, NULL, NULL, nonatomic ? SPI_OPT_NONATOMIC : 0)) != SPI_OK_CONNECT)
elog(ERROR, "SPI_connect failed: %s", SPI_result_code_string(rc));
plan = SPI_prepare(cursor->parsed_query, (int) cursor->nvariables, types);
if (!plan || plan->magic != _SPI_PLAN_MAGIC)
elog(ERROR, "plan is not valid");
if (list_length(plan->plancache_list) != 1)
elog(ERROR, "plan is not single execution plany");
plansource = (CachedPlanSource *) linitial(plan->plancache_list);
cursor_tupdesc = plansource->resultDesc;
ncolumns = cursor_tupdesc->natts;
for (i = 0; i < ncolumns; i++) {
HeapTuple tp;
Form_pg_type typtup;
int type_code;
text *attname = NULL;
text *schname = NULL;
text *typname = NULL;
Form_pg_attribute attr = TupleDescAttr(cursor_tupdesc, i);
/*
* 0. col_type BINARY_INTEGER := 0,
* 1. col_max_len BINARY_INTEGER := 0,
* 2. col_name VARCHAR2(32) := '',
* 3. col_name_len BINARY_INTEGER := 0,
* 4. col_schema_name VARCHAR2(32) := '',
* 5. col_schema_name_len BINARY_INTEGER := 0,
* 6. col_precision BINARY_INTEGER := 0,
* 7. col_scale BINARY_INTEGER := 0,
* 8. col_charsetid BINARY_INTEGER := 0,
* 9. col_charsetform BINARY_INTEGER := 0,
* 10. col_null_ok BOOLEAN := TRUE
* 11. col_type_name varchar2 := '',
* 12. col_type_name_len BINARY_INTEGER := 0 );
*/
memset_s(values, rec_type->type_col_num * sizeof(Datum), 0, rec_type->type_col_num * sizeof(Datum));
memset_s(nulls, rec_type->type_col_num * sizeof(bool), 0, rec_type->type_col_num * sizeof(bool));
type_code = map_type_code(attr->atttypid);
values[0] = Int32GetDatum(type_code);
tp = SearchSysCache1(TYPEOID, ObjectIdGetDatum(attr->atttypid));
if (!HeapTupleIsValid(tp))
elog(ERROR, "cache lookup failed for type %u", attr->atttypid);
typtup = (Form_pg_type) GETSTRUCT(tp);
values[1] = Int32GetDatum(0);
values[6] = Int32GetDatum(0);
values[7] = Int32GetDatum(0);
if (attr->attlen != -1)
values[1] = Int32GetDatum(attr->attlen);
else if (typtup->typcategory == 'S' && attr->atttypmod > VARHDRSZ)
values[1] = Int32GetDatum(attr->atttypmod - VARHDRSZ);
else if (attr->atttypid == NUMERICOID && attr->atttypmod > VARHDRSZ) {
values[6] = Int32GetDatum(((attr->atttypmod - VARHDRSZ) >> 16) & 0xffff);
values[7] = Int32GetDatum((((attr->atttypmod - VARHDRSZ) & 0x7ff) ^ 1024) - 1024);
}
attname = cstring_to_text(NameStr(attr->attname));
values[2] = PointerGetDatum(attname);
values[3] = DirectFunctionCall1(textlen, PointerGetDatum(attname));
if (rec_type->type_id == 1 && DatumGetInt32(values[3]) > 32)
elog(ERROR, "desc_rec.col_name(%d) is more than 32", DatumGetInt32(values[3]));
schname = cstring_to_text(get_namespace_name(typtup->typnamespace));
values[4] = PointerGetDatum(schname);
values[5] = DirectFunctionCall1(textlen, PointerGetDatum(schname));
values[8] = Int32GetDatum(0);
values[9] = Int32GetDatum(0);
values[10] = BoolGetDatum(true);
if (attr->attnotnull)
values[10] = BoolGetDatum(false);
else if (typtup->typnotnull)
values[10] = BoolGetDatum(false);
if (rec_type->type_id > 2) {
if (type_code == 109) {
typname = cstring_to_text(NameStr(typtup->typname));
values[11] = PointerGetDatum(typname);
values[12] = DirectFunctionCall1(textlen, PointerGetDatum(typname));
} else {
nulls[11] = true;
nulls[12] = true;
}
}
tuple = heap_form_tuple(desc_rec_tupdesc, values, nulls);
abuilder = accumArrayResult(abuilder,
HeapTupleGetDatum(tuple),
false,
desc_rec_typid,
CurrentMemoryContext);
ReleaseSysCache(tp);
pfree_ext(attname);
pfree_ext(schname);
pfree_ext(typname);
}
memset_s(values, rec_type->type_col_num * sizeof(Datum), 0, rec_type->type_col_num * sizeof(Datum));
memset_s(nulls, rec_type->type_col_num * sizeof(bool), 0, rec_type->type_col_num * sizeof(bool));
values[0] = Int32GetDatum(ncolumns);
nulls[0] = false;
values[1] = makeArrayResult(abuilder, callercxt);
nulls[1] = false;
SPI_freeplan(plan);
if ((rc = SPI_finish()) != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(rc));
MemoryContextSwitchTo(callercxt);
tuple = heap_form_tuple(tupdesc, values, nulls);
pfree(values);
pfree(nulls);
PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
}
Datum
gms_sql_describe_columns_f(PG_FUNCTION_ARGS)
{
return gms_sql_describe_columns(fcinfo);
}
static bool
type_is_right_align(Oid type)
{
switch (type) {
case INT2OID:
case INT4OID:
case INT8OID:
case INT16OID:
case FLOAT4OID:
case FLOAT8OID:
case NUMERICOID:
case OIDOID:
case XIDOID:
case CIDOID:
case CASHOID:
return true;
default:
return false;
}
}
static int *
set_maxwidth(int nrows, TupleDesc desc, HeapTuple* tuples)
{
int natts = desc->natts;
int *max_width;
char *value;
int i, j;
/* get max_width from header */
max_width = (int*) palloc0(natts * sizeof(int));
for (i = 0; i < natts; i++) {
char *attrname = SPI_fname(desc, i + 1);
text *attrtext= cstring_to_text(attrname);
max_width[i] = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(attrtext)));
pfree(attrname);
pfree(attrtext);
}
/* get max_width from value */
for (i = 0; i < nrows; i++) {
int len;
for (j = 0; j < natts; j++) {
value = SPI_getvalue(tuples[i], desc, j + 1);
if (value) {
text *attrtext= cstring_to_text(value);
len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(attrtext)));
if (len > max_width[j])
max_width[j] = len;
pfree(value);
pfree(attrtext);
}
}
}
return max_width;
}
static void
print_buf_to_client(StringInfo buf, char *message)
{
resetStringInfo(buf);
pq_beginmessage(buf, 'N');
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) {
pq_sendbyte(buf, PG_DIAG_MESSAGE_PRIMARY);
pq_sendstring(buf, message);
pq_sendbyte(buf, '\0');
} else {
pq_sendstring(buf, message);
}
pq_endmessage_reuse(buf);
pq_flush();
resetStringInfo(buf);
}
static void
print_header_to_client(int *max_width, TupleDesc desc, StringInfo buf, StringInfo outbuf)
{
int i, j;
TransactionId curlxid;
/* result number increase in transaction */
curlxid = SPI_get_top_transaction_id();
if (curlxid == last_lxid)
result_no++;
else
result_no = 1;
last_lxid = curlxid;
/* print ResultSet */
appendStringInfo(buf, "ResultSet #%d\n", result_no);
print_buf_to_client(outbuf, buf->data);
/* print from header */
resetStringInfo(buf);
for (i = 0; i < desc->natts; i++) {
char *attname = SPI_fname(desc, i + 1);
int nspace = max_width[i] - pg_mbstrlen(attname) + 2;
appendStringInfo(buf, "%-*s%s%-*s", nspace/2, "", attname, (nspace + 1) / 2, "");
if (i < desc->natts - 1)
appendStringInfo(buf, "%s", "|");
}
print_buf_to_client(outbuf, buf->data);
/* print horizontal line */
resetStringInfo(buf);
for (i = 0; i < desc->natts; i++) {
for (j = 0; j < (max_width[i] + 2); j++)
appendStringInfo(buf, "%s", "-");
if (i < desc->natts - 1)
appendStringInfo(buf, "%s", "+");
}
print_buf_to_client(outbuf, buf->data);
resetStringInfo(buf);
}
static void
print_row_to_client(int *max_width, TupleDesc desc, HeapTuple tuple, StringInfo buf, StringInfo outbuf)
{
char *value;
Datum datum;
int i;
int natts = desc->natts;
text *spacetext = cstring_to_text(" ");
for (i = 0; i < natts; i++) {
value = SPI_getvalue(tuple, desc, i + 1);
if (value) {
Oid typid = SPI_gettypeid(desc, i + 1);
text *attrtext= cstring_to_text(value);
if (type_is_right_align(typid)) {
datum = DirectFunctionCall3(rpad, PointerGetDatum(attrtext), max_width[i], PointerGetDatum(spacetext));
} else {
datum = DirectFunctionCall3(lpad, PointerGetDatum(attrtext), max_width[i], PointerGetDatum(spacetext));
}
char *line = DatumGetCString(DirectFunctionCall1(textout, datum));
appendStringInfo(buf, "%s%s%s", " ", line, ((i < natts - 1) ? " |" : " "));
pfree(value);
pfree(attrtext);
pfree(DatumGetPointer(datum));
} else {
appendStringInfo(buf, "%*s", max_width[i] + 2, " ");
}
}
pfree(spacetext);
print_buf_to_client(outbuf, buf->data);
resetStringInfo(buf);
}
static void
print_footer_to_client(unsigned int rows, StringInfo buf, StringInfo outbuf)
{
appendStringInfo(buf, "(%u row%s)\n", rows, (rows > 1 ? "s" : ""));
print_buf_to_client(outbuf, buf->data);
resetStringInfo(buf);
}
static void
return_result_to_client(int nrows, TupleDesc desc, HeapTuple* tuples)
{
StringInfo buf;
StringInfo outbuf;
int *max_width;
int i;
buf = makeStringInfo();
outbuf = makeStringInfo();
/* compute col width */
max_width = set_maxwidth(nrows, desc, tuples);
print_header_to_client(max_width, desc, buf, outbuf);
/* print rows */
for (i = 0; i < nrows; i++) {
print_row_to_client(max_width, desc, tuples[i], buf, outbuf);
}
/* print footer */
print_footer_to_client(nrows, buf, outbuf);
pfree_ext(max_width);
FreeStringInfo(buf);
FreeStringInfo(outbuf);
}
Datum
gms_sql_return_result(PG_FUNCTION_ARGS)
{
char* name = text_to_cstring(PG_GETARG_TEXT_PP(0));
Portal portal;
int rc = 0;
/*
* Connect to SPI manager
*/
SPI_STACK_LOG("connect", NULL, NULL);
if ((rc = SPI_connect() != SPI_OK_CONNECT))
ereport(ERROR,
(errmodule(MOD_OPT),
(errcode(ERRCODE_SPI_CONNECTION_FAILURE),
errmsg("SPI_connect failed: %s", SPI_result_code_string(rc)))));
portal = SPI_cursor_find(name);
if (portal == NULL)
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_CURSOR), errmsg("cursor \"%s\" does not exist", name)));
SPI_cursor_fetch(portal, true, FETCH_ALL);
SPI_STACK_LOG("finish", portal->sourceText, NULL);
return_result_to_client(SPI_processed, SPI_tuptable->tupdesc, SPI_tuptable->vals);
SPI_finish();
PG_RETURN_VOID();
}
Datum
gms_sql_return_result_i(PG_FUNCTION_ARGS)
{
CursorData *cursor;
int rc = 0;
Datum Values;
char Nulls;
cursor = get_cursor(fcinfo, true);
/*
* Connect to SPI manager
*/
if ((rc = SPI_connect() != SPI_OK_CONNECT))
ereport(ERROR,
(errmodule(MOD_OPT),
(errcode(ERRCODE_SPI_CONNECTION_FAILURE),
errmsg("SPI_connect failed: %s", SPI_result_code_string(rc)))));
if (cursor->plan)
SPI_execute_plan(cursor->plan, &Values, &Nulls, false, 0);
else if (cursor->parsed_query)
SPI_execute(cursor->parsed_query, false, 0);
else
ereport(ERROR,
(errmodule(MOD_OPT),
(errcode(ERRCODE_SPI_EXECUTE_FAILURE),
errmsg("sql is not parsed: %s", SPI_result_code_string(rc)))));
return_result_to_client(SPI_processed, SPI_tuptable->tupdesc, SPI_tuptable->vals);
SPI_finish();
PG_RETURN_VOID();
}