417 lines
12 KiB
C++
417 lines
12 KiB
C++
#include "postgres.h"
|
|
#include "funcapi.h"
|
|
#include "commands/extension.h"
|
|
#include "libpq/libpq.h"
|
|
#include "libpq/pqformat.h"
|
|
#include "utils/builtins.h"
|
|
#include "utils/lsyscache.h"
|
|
|
|
#include "gms_output.h"
|
|
|
|
PG_MODULE_MAGIC;
|
|
|
|
/*
|
|
* TODO: BUFSIZE_UNLIMITED to be truely unlimited (or INT_MAX),
|
|
* and allocate buffers on-demand.
|
|
*/
|
|
#define BUFSIZE_DEFAULT 20000
|
|
#define BUFSIZE_MIN 2000
|
|
#define BUFSIZE_MAX 1000000
|
|
#define BUFSIZE_UNLIMITED BUFSIZE_MAX
|
|
|
|
PG_FUNCTION_INFO_V1(gms_output_enable_default);
|
|
PG_FUNCTION_INFO_V1(gms_output_enable);
|
|
PG_FUNCTION_INFO_V1(gms_output_disable);
|
|
PG_FUNCTION_INFO_V1(gms_output_serveroutput);
|
|
PG_FUNCTION_INFO_V1(gms_output_put);
|
|
PG_FUNCTION_INFO_V1(gms_output_put_line);
|
|
PG_FUNCTION_INFO_V1(gms_output_new_line);
|
|
PG_FUNCTION_INFO_V1(gms_output_get_line);
|
|
PG_FUNCTION_INFO_V1(gms_output_get_lines);
|
|
|
|
static uint32 output_index;
|
|
|
|
static void add_str(const char *str, int len);
|
|
static void add_text(text *str);
|
|
static void add_newline(void);
|
|
static void send_buffer(void);
|
|
|
|
void set_extension_index(uint32 index)
|
|
{
|
|
output_index = index;
|
|
}
|
|
|
|
void init_session_vars(void) {
|
|
RepallocSessionVarsArrayIfNecessary();
|
|
OutputContext* psc =
|
|
(OutputContext*)MemoryContextAllocZero(u_sess->self_mem_cxt, sizeof(OutputContext));
|
|
u_sess->attr.attr_common.extension_session_vars_array[output_index] = psc;
|
|
|
|
psc->is_server_output = false;
|
|
psc->buffer = NULL;
|
|
psc->buffer_size = 0;
|
|
psc->buffer_len = 0;
|
|
psc->buffer_get = 0;
|
|
}
|
|
|
|
OutputContext* get_session_context() {
|
|
if (u_sess->attr.attr_common.extension_session_vars_array[output_index] == NULL) {
|
|
init_session_vars();
|
|
}
|
|
return (OutputContext*)u_sess->attr.attr_common.extension_session_vars_array[output_index];
|
|
}
|
|
|
|
/*
|
|
* Aux. buffer functionality
|
|
*/
|
|
static void
|
|
add_str(const char *str, int len)
|
|
{
|
|
errno_t sret = EOK;
|
|
bool is_change = false;
|
|
if (strlen(str) != 0 && GetDatabaseEncoding() != pg_get_client_encoding()) {
|
|
char *p = pg_server_to_client(str, len);
|
|
if (p != str) {
|
|
unsigned int mb_len = 0;
|
|
const char *mb_str = p;
|
|
str = p;
|
|
is_change = true;
|
|
while (*mb_str++)
|
|
mb_len++;
|
|
len = mb_len;
|
|
}
|
|
}
|
|
|
|
/* Discard all buffers if get_line was called. */
|
|
if (get_session_context()->buffer_get > 0)
|
|
{
|
|
get_session_context()->buffer_get = 0;
|
|
get_session_context()->buffer_len = 0;
|
|
get_session_context()->gms_getline = false;
|
|
get_session_context()->gms_valid_num = 0;
|
|
}
|
|
|
|
if (get_session_context()->buffer_len + len > get_session_context()->buffer_size) {
|
|
get_session_context()->buffer_len = 0;
|
|
get_session_context()->buffer_get = 0;
|
|
get_session_context()->gms_getline = false;
|
|
get_session_context()->gms_valid_num = 0;
|
|
get_session_context()->gms_serveroutput = true;
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
|
|
errmsg("buffer overflow"),
|
|
errdetail("Buffer overflow, limit of %d bytes", get_session_context()->buffer_size),
|
|
errhint("Increase buffer size in gms_output.enable() next time")));
|
|
}
|
|
|
|
if ( len != 0 ) {
|
|
sret = memcpy_s(get_session_context()->buffer + get_session_context()->buffer_len,
|
|
get_session_context()->buffer_size - get_session_context()->buffer_len, str, len);
|
|
securec_check(sret, "\0", "\0");
|
|
|
|
int tmplen = (len < (OUTPUTBUF_LEN-1)) ? len : (OUTPUTBUF_LEN-1);
|
|
sret = memcpy_s(get_session_context()->output_buf, OUTPUTBUF_LEN, get_session_context()->buffer + get_session_context()->buffer_len, tmplen);
|
|
securec_check(sret, "\0", "\0");
|
|
get_session_context()->output_buf[tmplen] = '\0';
|
|
if (pg_strcasecmp(str, "") != 0 && get_session_context()->gms_serveroutput) {
|
|
ereport(NOTICE,
|
|
(errmsg("%s", get_session_context()->output_buf), onlyfrontmsg(true)));
|
|
}
|
|
}
|
|
if (is_change) {
|
|
pfree_ext(str);
|
|
}
|
|
get_session_context()->buffer_len += len;
|
|
get_session_context()->buffer[get_session_context()->buffer_len] = '\0';
|
|
}
|
|
|
|
static void
|
|
add_text(text *str)
|
|
{
|
|
add_str(VARDATA_ANY(str), VARSIZE_ANY_EXHDR(str));
|
|
}
|
|
|
|
static void
|
|
add_newline(void)
|
|
{
|
|
add_str("", 1); /* add \0 */
|
|
if (get_session_context()->is_server_output)
|
|
send_buffer();
|
|
}
|
|
|
|
|
|
static void
|
|
send_buffer()
|
|
{
|
|
if (get_session_context()->buffer_len > 0)
|
|
{
|
|
StringInfoData msgbuf;
|
|
char *cursor = get_session_context()->buffer;
|
|
|
|
while (--get_session_context()->buffer_len > 0)
|
|
{
|
|
if (*cursor == '\0')
|
|
*cursor = '\n';
|
|
cursor++;
|
|
}
|
|
|
|
if (*cursor != '\0')
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INTERNAL_ERROR),
|
|
errmsg("internal error"),
|
|
errdetail("Wrong message format detected")));
|
|
|
|
pq_beginmessage(&msgbuf, 'N');
|
|
/*
|
|
* FrontendProtocol is not avalilable in MSVC because it is not
|
|
* PGDLLEXPORT'ed. So, we assume always the protocol >= 3.
|
|
*/
|
|
#ifndef _MSC_VER
|
|
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
|
|
{
|
|
#endif
|
|
pq_sendbyte(&msgbuf, PG_DIAG_MESSAGE_PRIMARY);
|
|
pq_sendstring(&msgbuf, get_session_context()->buffer);
|
|
pq_sendbyte(&msgbuf, '\0');
|
|
#ifndef _MSC_VER
|
|
}
|
|
else
|
|
{
|
|
*cursor++ = '\n';
|
|
*cursor = '\0';
|
|
pq_sendstring(&msgbuf, get_session_context()->buffer);
|
|
}
|
|
#endif
|
|
pq_endmessage(&msgbuf);
|
|
pq_flush();
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Aux db functions
|
|
*
|
|
*/
|
|
|
|
static void
|
|
gms_output_enable_internal(int32 n_buf_size)
|
|
{
|
|
/* We allocate +2 bytes for an end-of-line and a string terminator. */
|
|
if (get_session_context()->buffer == NULL)
|
|
{
|
|
get_session_context()->buffer = (char*)MemoryContextAlloc(u_sess->self_mem_cxt, n_buf_size + 2);
|
|
get_session_context()->buffer_size = n_buf_size;
|
|
get_session_context()->buffer_len = 0;
|
|
get_session_context()->buffer_get = 0;
|
|
get_session_context()->gms_getline = false;
|
|
get_session_context()->gms_valid_num = 0;
|
|
get_session_context()->gms_serveroutput = true;
|
|
}
|
|
else if (n_buf_size > get_session_context()->buffer_len)
|
|
{
|
|
/* We cannot shrink buffer less than current length. */
|
|
get_session_context()->buffer = (char*)repalloc(get_session_context()->buffer, n_buf_size + 2);
|
|
get_session_context()->buffer_size = n_buf_size;
|
|
}
|
|
else
|
|
{
|
|
ereport(NOTICE, (errmsg("The buf size(%d) cannot be smaller than the used buf size(%d).", n_buf_size, get_session_context()->buffer_len)));
|
|
}
|
|
}
|
|
|
|
Datum
|
|
gms_output_enable_default(PG_FUNCTION_ARGS)
|
|
{
|
|
gms_output_enable_internal(BUFSIZE_DEFAULT);
|
|
PG_RETURN_VOID();
|
|
}
|
|
|
|
Datum
|
|
gms_output_enable(PG_FUNCTION_ARGS)
|
|
{
|
|
int32 n_buf_size;
|
|
|
|
if (PG_ARGISNULL(0))
|
|
n_buf_size = BUFSIZE_UNLIMITED;
|
|
else
|
|
{
|
|
n_buf_size = PG_GETARG_INT32(0);
|
|
|
|
if (n_buf_size > BUFSIZE_MAX)
|
|
{
|
|
n_buf_size = BUFSIZE_MAX;
|
|
elog(WARNING, "Limit decreased to %d bytes.", BUFSIZE_MAX);
|
|
}
|
|
else if (n_buf_size < BUFSIZE_MIN)
|
|
{
|
|
n_buf_size = BUFSIZE_MIN;
|
|
elog(WARNING, "Limit increased to %d bytes.", BUFSIZE_MIN);
|
|
}
|
|
}
|
|
|
|
gms_output_enable_internal(n_buf_size);
|
|
PG_RETURN_VOID();
|
|
}
|
|
|
|
Datum
|
|
gms_output_disable(PG_FUNCTION_ARGS)
|
|
{
|
|
if (get_session_context()->buffer)
|
|
pfree(get_session_context()->buffer);
|
|
|
|
get_session_context()->buffer = NULL;
|
|
get_session_context()->buffer_size = 0;
|
|
get_session_context()->buffer_len = 0;
|
|
get_session_context()->buffer_get = 0;
|
|
get_session_context()->gms_getline = false;
|
|
get_session_context()->gms_valid_num = 0;
|
|
get_session_context()->gms_serveroutput = true;
|
|
PG_RETURN_VOID();
|
|
}
|
|
|
|
Datum
|
|
gms_output_serveroutput(PG_FUNCTION_ARGS)
|
|
{
|
|
get_session_context()->is_server_output = PG_GETARG_BOOL(0);
|
|
if (get_session_context()->is_server_output && !get_session_context()->buffer)
|
|
gms_output_enable_internal(BUFSIZE_DEFAULT);
|
|
PG_RETURN_VOID();
|
|
}
|
|
|
|
|
|
/*
|
|
* main functions
|
|
*/
|
|
|
|
Datum
|
|
gms_output_put(PG_FUNCTION_ARGS)
|
|
{
|
|
if (get_session_context()->buffer)
|
|
add_text(PG_GETARG_TEXT_PP(0));
|
|
PG_RETURN_VOID();
|
|
}
|
|
|
|
Datum
|
|
gms_output_put_line(PG_FUNCTION_ARGS)
|
|
{
|
|
if (get_session_context()->buffer)
|
|
{
|
|
add_text(PG_GETARG_TEXT_PP(0));
|
|
add_newline();
|
|
}
|
|
PG_RETURN_VOID();
|
|
}
|
|
|
|
Datum
|
|
gms_output_new_line(PG_FUNCTION_ARGS)
|
|
{
|
|
if (get_session_context()->buffer)
|
|
add_newline();
|
|
PG_RETURN_VOID();
|
|
}
|
|
|
|
static text *
|
|
gms_output_next(void)
|
|
{
|
|
if (get_session_context()->buffer_get < get_session_context()->buffer_len)
|
|
{
|
|
text *line = cstring_to_text(get_session_context()->buffer + get_session_context()->buffer_get);
|
|
get_session_context()->buffer_get += VARSIZE_ANY_EXHDR(line) + 1;
|
|
return line;
|
|
}
|
|
else
|
|
return NULL;
|
|
}
|
|
|
|
Datum
|
|
gms_output_get_line(PG_FUNCTION_ARGS)
|
|
{
|
|
TupleDesc tupdesc;
|
|
Datum result;
|
|
HeapTuple tuple;
|
|
Datum values[2];
|
|
bool nulls[2] = { false, false };
|
|
text *line;
|
|
|
|
/* Build a tuple descriptor for our result type */
|
|
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("function returning record called in context "
|
|
"that cannot accept type record"),
|
|
errhint("Try calling the function in the FROM clause "
|
|
"using a column definition list.")));
|
|
|
|
if ((line = gms_output_next()) != NULL)
|
|
{
|
|
values[0] = PointerGetDatum(line);
|
|
values[1] = Int32GetDatum(0); /* 0: succeeded */
|
|
}
|
|
else
|
|
{
|
|
nulls[0] = true;
|
|
values[1] = Int32GetDatum(1); /* 1: failed */
|
|
}
|
|
|
|
tuple = heap_form_tuple(tupdesc, values, nulls);
|
|
result = HeapTupleGetDatum(tuple);
|
|
|
|
get_session_context()->gms_getline = true;
|
|
PG_RETURN_DATUM(result);
|
|
}
|
|
|
|
Datum
|
|
gms_output_get_lines(PG_FUNCTION_ARGS)
|
|
{
|
|
TupleDesc tupdesc;
|
|
Datum result;
|
|
HeapTuple tuple;
|
|
Datum values[2];
|
|
bool nulls[2] = { false, false };
|
|
text *line;
|
|
int32 max_lines = PG_GETARG_INT32(1);
|
|
int32 n;
|
|
ArrayBuildState *astate = NULL;
|
|
|
|
/* Build a tuple descriptor for our result type */
|
|
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("function returning record called in context "
|
|
"that cannot accept type record"),
|
|
errhint("Try calling the function in the FROM clause "
|
|
"using a column definition list.")));
|
|
|
|
for (n = 0; n < max_lines && (line = gms_output_next()) != NULL; n++)
|
|
{
|
|
astate = accumArrayResult(astate, PointerGetDatum(line), false,
|
|
TEXTOID, CurrentMemoryContext);
|
|
}
|
|
|
|
/* 0: lines as text array */
|
|
if (n > 0)
|
|
values[0] = makeArrayResult(astate, CurrentMemoryContext);
|
|
else
|
|
{
|
|
int16 typlen;
|
|
bool typbyval;
|
|
char typalign;
|
|
ArrayType *arr;
|
|
|
|
get_typlenbyvalalign(TEXTOID, &typlen, &typbyval, &typalign);
|
|
arr = construct_md_array(
|
|
values,
|
|
NULL,
|
|
0, NULL, NULL, TEXTOID, typlen, typbyval, typalign);
|
|
values[0] = PointerGetDatum(arr);
|
|
}
|
|
|
|
get_session_context()->gms_getline = true;
|
|
/* 1: # of lines as integer */
|
|
values[1] = Int32GetDatum(n);
|
|
|
|
tuple = heap_form_tuple(tupdesc, values, nulls);
|
|
result = HeapTupleGetDatum(tuple);
|
|
|
|
PG_RETURN_DATUM(result);
|
|
}
|