Compare commits

..

11 Commits

Author SHA1 Message Date
ad6abb83b9 !23 rework: 解决uint数据类型查询报错
Merge pull request !23 from zcl/rework#bug#1
2024-04-10 01:36:58 +00:00
zcl
f3a182e1bb 增加注释和修改函数名称
修改查询oid的值的sql语句
2024-03-27 16:12:51 +08:00
ab15cbffd5 !22 解决uint数据类型查询报错
Merge pull request !22 from zcl/bug#1
2024-03-27 02:34:04 +00:00
5235f6399c 解决uint数据查询报错
连接数据库后动态注册uint类型

无效修改和少提交的修改

注册uint函数封装后再加入到connection_setup函数里

函数名改成pq_get_custom_type_oid; 去掉SHOW前缀

oid改成unsigned int类型
2024-03-27 10:20:42 +08:00
a63f319414 !19 fix:convert empty string to NULL in A compatibility mode
Merge pull request !19 from vimiix/dev
2023-09-26 03:00:24 +00:00
12b2823c32 fix:convert empty string to NULL in A compatibility mode 2023-09-25 15:41:38 +08:00
a818ac9ae9 !20 修复若干malloc与free的空间问题
Merge pull request !20 from Luan-233/master
2023-09-22 07:01:14 +00:00
19d1f3a5ed 增加边界检查,防止编译时报warning 2023-09-21 20:40:29 -07:00
1306df3963 修复若干malloc与free的空间问题 2023-09-21 10:06:52 -07:00
d91399d1ae !17 修复批量操作接口参数的数据类型错误
Merge pull request !17 from vimiix/dev
2023-09-12 03:44:55 +00:00
372e59e53a fix(extra):object type error, change to only support string or str-able params 2023-09-11 16:32:08 +08:00
8 changed files with 178 additions and 28 deletions

View File

@ -1142,7 +1142,7 @@ def register_composite(name, conn_or_curs, globally=False, factory=None):
return caster
def _paginate(seq, page_size):
def _paginate(seq, page_size, to_byte=False):
"""Consume an iterable and return it in chunks.
Every chunk is at most `page_size`. Never return an empty chunk.
@ -1152,7 +1152,16 @@ def _paginate(seq, page_size):
while True:
try:
for i in range(page_size):
if not to_byte:
page.append(next(it))
continue
vs = next(it)
if isinstance(vs, (list, tuple)):
# Ignore None object
# Serialized params to bytes
page.append(list(map(lambda v: v if v is None else str(v).encode('utf-8'), vs)))
else:
page.append(vs)
yield page
page = []
except StopIteration:
@ -1308,16 +1317,17 @@ def execute_prepared_batch(cur, prepared_statement_name, args_list, page_size=10
r"""
[openGauss libpq only]
Execute prepared statement with api `PQexecPreparedBatch` (new api in openGauss)
Execute prepared statement with api `PQexecPreparedBatch` (new api in openGauss's libpq.so)
Param:
argslist: 2d list, do nothing if empty
Arguments:
argslist: Two-dimensional list, if empty, return directly
Each parameter in the argument list must be a string or be string-able(should implements `__str__` magic method)
"""
if len(args_list) == 0:
return
nparams = len(args_list[0])
for page in _paginate(args_list, page_size=page_size):
for page in _paginate(args_list, page_size=page_size, to_byte=True):
cur.execute_prepared_batch(prepared_statement_name, nparams, len(page), page)
@ -1325,14 +1335,15 @@ def execute_params_batch(cur, sql_format, args_list, page_size=100):
r"""
[openGauss libpq only]
Execute sql with api `PQexecParamsBatch` (new api in openGauss)
Execute sql with api `PQexecParamsBatch` (new api in openGauss's libpq.so)
Arguments:
argslist: 2d list, do nothing if empty
argslist: Two-dimensional list, if empty, return directly
Each parameter in the argument list must be a string or be string-able(should implements `__str__` magic method)
"""
if len(args_list) == 0:
return
nparams = len(args_list[0])
for page in _paginate(args_list, page_size=page_size):
for page in _paginate(args_list, page_size=page_size, to_byte=True):
cur.execute_params_batch(sql_format, nparams, len(page), page)

View File

@ -227,7 +227,7 @@ PyObject *Bytes_Format(PyObject *format, PyObject *args, char place_holder) {
} /* '%' */
} /* until end */
if (dict) { // if args' type is dict, the func ends
if (dict || (arglen < 0) || (argidx < 0)) { // args' type is dict, the func ends
if (args_owned) Py_DECREF(args);
if (!(result = resize_bytes(result, reslen - rescnt))) return NULL; // resize and return
if (place_holder != '%') {
@ -238,8 +238,8 @@ PyObject *Bytes_Format(PyObject *format, PyObject *args, char place_holder) {
}
args_list = (char **)malloc(sizeof(char *) * arglen); // buffer
memset(args_list, NULL, sizeof(char *) * arglen);
args_len = (Py_ssize_t *)malloc(sizeof(Py_ssize_t *) * arglen); // length of every argument
memset(args_list, 0, sizeof(char *) * arglen);
args_len = (Py_ssize_t *)malloc(sizeof(Py_ssize_t) * arglen); // length of every argument
while ((args_value = getnextarg(args, arglen, &argidx)) != NULL) { // stop when receive NULL
Py_ssize_t length = 0;
if (!Bytes_CheckExact(args_value)) {
@ -251,14 +251,14 @@ PyObject *Bytes_Format(PyObject *format, PyObject *args, char place_holder) {
length = Bytes_GET_SIZE(args_value);
// printf("type: %s, len: %d, value: %s\n", Py_TYPE(args_value)->tp_name, length, args_buffer);
args_len[argidx - 1] = length;
args_list[argidx - 1] = (char *)malloc(sizeof(char *) * (length + 1));
args_list[argidx - 1] = (char *)malloc(sizeof(char) * (length + 1));
Py_MEMCPY(args_list[argidx - 1], args_buffer, length);
args_list[argidx - 1][length] = '\0';
Py_XDECREF(args_value);
}
arg_usecnt = (int *)malloc(sizeof(int) * arglen);
memset(arg_usecnt, 0, sizeof(char *) * arglen);
memset(arg_usecnt, 0, sizeof(int) * arglen);
fmt = Bytes_AS_STRING(format); // get pointer of format
fmtcnt = Bytes_GET_SIZE(format); // get length of format

View File

@ -46,6 +46,13 @@ extern "C" {
#define STATE_ON 1
#define STATE_DEFAULT 2
/* sql_compatibility values */
#define SQL_COMPATIBILITY_A 1
#define SQL_COMPATIBILITY_OTHER 5
// #define SQL_COMPATIBILITY_B 2
// #define SQL_COMPATIBILITY_C 3
// #define SQL_COMPATIBILITY_PG 4
/* connection status */
#define CONN_STATUS_SETUP 0
#define CONN_STATUS_READY 1
@ -148,6 +155,8 @@ struct connectionObject {
/* inside a with block */
int entered;
int sql_compatibility;
};
/* map isolation level values into a numeric const */

View File

@ -37,6 +37,8 @@
#include <string.h>
#include <ctype.h>
#include "psycopg/typecast.h"
#include "psycopg/typecast_basic.c"
extern HIDDEN const char *srv_isolevels[];
extern HIDDEN const char *srv_readonly[];
@ -1268,6 +1270,9 @@ static struct PyMemberDef connectionObject_members[] = {
{"server_version", T_INT,
offsetof(connectionObject, server_version), READONLY,
"Server version."},
{"sql_compatibility", T_INT,
offsetof(connectionObject, sql_compatibility), READONLY,
"Server sql_compatibility param value."},
{NULL}
};
@ -1311,12 +1316,61 @@ static struct PyGetSetDef connectionObject_getsets[] = {
};
#undef EXCEPTION_GETTER
/* register the uint typecasters */
static int
register_type_uint(connectionObject *self, PyThreadState **tstate)
{
int rv = -1;
typecastObject *obj = NULL;
PyObject *name = NULL, *values = NULL;
Py_ssize_t i, len = 0;
char* uint_arr[] = {"\'uint1\'", "\'uint2\'", "\'uint4\'", "\'uint8\'"};
int size = sizeof(uint_arr) / sizeof(uint_arr[0]);
long int* _typecast_INTEGER_types;
_typecast_INTEGER_types = (long int*)malloc((size+1)*sizeof(long int));
for (int i=0; i< size; i++) {
unsigned int uint_val = pq_get_pg_catalog_custom_type_oid(self, uint_arr[i], tstate);
_typecast_INTEGER_types[i] = (long int)uint_val;
}
typecastObject_initlist _typecast_builtins[] = {
{"INTEGER", _typecast_INTEGER_types, typecast_INTEGER_cast, NULL},
};
name = Text_FromUTF8(_typecast_builtins->name);
if (!name) goto end;
while (_typecast_builtins->values[len] != 0) len++;
values = PyTuple_New(len);
if (!values) goto end;
for (i = 0; i < len ; i++) {
PyTuple_SET_ITEM(values, i, PyInt_FromLong(_typecast_builtins->values[i]));
}
obj = (typecastObject *)typecast_new(name, values, NULL, NULL);
if (obj) {
obj->ccast = _typecast_builtins->cast;
obj->pcast = NULL;
}
if (typecast_add((PyObject *)obj, self->string_types, 0) < 0) { goto end; }
rv = 0;
free(_typecast_INTEGER_types);
end:
Py_XDECREF(values);
Py_XDECREF(name);
return rv;
}
/* initialization and finalization methods */
static int
connection_setup(connectionObject *self, const char *dsn, long int async)
{
int rv = -1;
char *sql_compatibility_value = NULL;
Dprintf("connection_setup: init connection object at %p, "
"async %ld, refcnt = " FORMAT_CODE_PY_SSIZE_T,
@ -1334,6 +1388,7 @@ connection_setup(connectionObject *self, const char *dsn, long int async)
self->isolevel = ISOLATION_LEVEL_DEFAULT;
self->readonly = STATE_DEFAULT;
self->deferrable = STATE_DEFAULT;
self->sql_compatibility = SQL_COMPATIBILITY_A;
#ifdef CONN_CHECK_PID
self->procpid = getpid();
#endif
@ -1356,7 +1411,23 @@ connection_setup(connectionObject *self, const char *dsn, long int async)
FORMAT_CODE_PY_SSIZE_T,
self, Py_REFCNT(self));
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
sql_compatibility_value = pq_get_guc_locked(self, "sql_compatibility", &_save);
if (register_type_uint(self, &_save)) { goto exit; }
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
if (strcmp(sql_compatibility_value, "A") == 0) {
self->sql_compatibility = SQL_COMPATIBILITY_A;
} else {
self->sql_compatibility = SQL_COMPATIBILITY_OTHER;
}
exit:
if (sql_compatibility_value){
free(sql_compatibility_value);
}
return rv;
}

View File

@ -628,7 +628,7 @@ curs_execute_prepared_batch(cursorObject *self, PyObject *args)
int nParams = 0, nBatch = 0;
PyObject *argsList = NULL;
Py_ssize_t rowIdx, colIdx, total;
int rowIdx, colIdx, total;
char **paramValues = NULL;
PGresult *res = NULL;
@ -642,7 +642,7 @@ curs_execute_prepared_batch(cursorObject *self, PyObject *args)
}
Dprintf("execute_prepared_batch parsed statement_name: %s, nParams: %d, nBatch: %d",
stmtName, nParams, nBatch);
total = nBatch*nParams;
total = nBatch * nParams;
EXC_IF_CURS_CLOSED(self);
EXC_IF_CURS_ASYNC(self, execute_prepared_batch);
@ -679,11 +679,17 @@ curs_execute_prepared_batch(cursorObject *self, PyObject *args)
PyObject *argItem = PySequence_GetItem(rowArgs, colIdx);
if (argItem == Py_None) {
paramValues[rowIdx*nParams+colIdx] = "NULL";
paramValues[rowIdx * nParams + colIdx] = NULL;
} else {
PyObject *t = microprotocol_getquoted(argItem, self->conn);
paramValues[rowIdx*nParams+colIdx] = strdup(Bytes_AsString(t));
Py_XDECREF(t);
if (!(argItem = psyco_ensure_bytes(argItem))) {
goto exit;
}
// convert empty string to NULL in A compatibility mode
if (self->conn->sql_compatibility == SQL_COMPATIBILITY_A && PyObject_Length(argItem) == 0) {
paramValues[rowIdx * nParams + colIdx] = NULL;
} else {
paramValues[rowIdx * nParams + colIdx] = Bytes_AsString(argItem);
}
}
Py_XDECREF(argItem);
}
@ -715,7 +721,7 @@ curs_execute_params_batch(cursorObject *self, PyObject *args)
int nParams = 0, nBatch = 0;
PyObject *argsList = NULL;
Py_ssize_t rowIdx, colIdx, total;
int rowIdx, colIdx, total;
char **paramValues = NULL;
PGresult *res = NULL;
@ -729,7 +735,7 @@ curs_execute_params_batch(cursorObject *self, PyObject *args)
Dprintf("execute_params_batch parsed sql: %s, nParams: %d, nBatch: %d",
sql, nParams, nBatch);
total = nBatch*nParams;
total = nBatch * nParams;
EXC_IF_CURS_CLOSED(self);
EXC_IF_CURS_ASYNC(self, execute_params_batch);
@ -765,11 +771,17 @@ curs_execute_params_batch(cursorObject *self, PyObject *args)
PyObject *argItem = PySequence_GetItem(rowArgs, colIdx);
if (argItem == Py_None) {
paramValues[rowIdx*nParams+colIdx] = "NULL";
paramValues[rowIdx * nParams + colIdx] = NULL;
} else {
PyObject *t = microprotocol_getquoted(argItem, self->conn);
paramValues[rowIdx*nParams+colIdx] = strdup(Bytes_AsString(t));
Py_XDECREF(t);
if (!(argItem = psyco_ensure_bytes(argItem))) {
goto exit;
}
// convert empty string to NULL in A compatibility mode
if (self->conn->sql_compatibility == SQL_COMPATIBILITY_A && PyObject_Length(argItem) == 0) {
paramValues[rowIdx * nParams + colIdx] = NULL;
} else {
paramValues[rowIdx * nParams + colIdx] = Bytes_AsString(argItem);
}
}
Py_XDECREF(argItem);
}

View File

@ -43,7 +43,7 @@
#include "psycopg/pgtypes.h"
#include "psycopg/error.h"
#include "psycopg/column.h"
#include <stdlib.h>
#include "psycopg/libpq_support.h"
#include "libpq-fe.h"
@ -607,6 +607,52 @@ cleanup:
return rv;
}
/* Get the oid of uint type created by the dolphin plugin, which is only in the pg_catalog by default */
unsigned int
pq_get_pg_catalog_custom_type_oid(connectionObject *conn, const char *param, PyThreadState **tstate)
{
char query[256];
int size;
unsigned int rv = 0;
size = PyOS_snprintf(query, sizeof(query), "select oid from pg_type where typnamespace = 11 and typname= %s", param);
if (size < 0 || (size_t)size >= sizeof(query)) {
conn_set_error(conn, "query too large");
goto cleanup;
}
if (!psyco_green()) {
conn_set_result(conn, PQexec(conn->pgconn, query));
} else {
PyEval_RestoreThread(*tstate);
conn_set_result(conn, psyco_exec_green(conn, query));
*tstate = PyEval_SaveThread();
}
if (!conn->pgres) {
Dprintf("pq_get_pg_catalog_custom_type_oid: PQexec returned NULL");
PyEval_RestoreThread(*tstate);
if (!PyErr_Occurred()) {
conn_set_error(conn, PQerrorMessage(conn->pgconn));
}
*tstate = PyEval_SaveThread();
goto cleanup;
}
if (PQresultStatus(conn->pgres) != PGRES_TUPLES_OK) {
Dprintf("pq_get_pg_catalog_custom_type_oid: result was not TUPLES_OK (%s)",
PQresStatus(PQresultStatus(conn->pgres)));
goto cleanup;
}
rv = atoi(strdup(PQgetvalue(conn->pgres, 0, 0)));
CLEARPGRES(conn->pgres);
cleanup:
return rv;
}
/* Set a session parameter.
*
* The function should be called on a locked connection without

View File

@ -459,7 +459,7 @@ PyTypeObject typecastType = {
0, /*tp_new*/
};
static PyObject *
PyObject *
typecast_new(PyObject *name, PyObject *values, PyObject *cast, PyObject *base)
{
typecastObject *obj;

View File

@ -89,3 +89,4 @@ HIDDEN PyObject *typecast_cast(
PyObject *self, const char *str, Py_ssize_t len, PyObject *curs);
#endif /* !defined(PSYCOPG_TYPECAST_H) */
PyObject *typecast_new(PyObject *name, PyObject *values, PyObject *cast, PyObject *base);