!5867 审计日志,支持完整性验证。
Merge pull request !5867 from 邮储-王建达/feature_audit_6.0
This commit is contained in:
@ -8583,8 +8583,8 @@
|
||||
),
|
||||
AddFuncGroup(
|
||||
"pg_query_audit", 2,
|
||||
AddBuiltinFunc(_0(3780), _1("pg_query_audit"), _2(2), _3(false), _4(true), _5(pg_query_audit), _6(2249), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(10), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('v'), _19(0), _20(2, 1184, 1184), _21(15, 1184, 1184, 1184, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25), _22(15, 'i', 'i', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o'), _23(15, "begin", "end", "time", "type", "result", "userid", "username", "database", "client_conninfo", "object_name", "detail_info", "node_name", "thread_id", "local_port", "remote_port"), _24(NULL), _25("pg_query_audit"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33(NULL), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)),
|
||||
AddBuiltinFunc(_0(3782), _1("pg_query_audit"), _2(3), _3(false), _4(true), _5(pg_query_audit), _6(2249), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(10), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('v'), _19(0), _20(3, 1184, 1184, 25), _21(16, 1184, 1184, 25, 1184, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25), _22(16, 'i', 'i', 'i', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o'), _23(16, "begin", "end", "directory", "time", "type", "result", "userid", "username", "database", "client_conninfo", "object_name", "detail_info", "node_name", "thread_id", "local_port", "remote_port"), _24(NULL), _25("pg_query_audit"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33(NULL), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0))
|
||||
AddBuiltinFunc(_0(3780), _1("pg_query_audit"), _2(2), _3(false), _4(true), _5(pg_query_audit), _6(2249), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(10), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('v'), _19(0), _20(2, 1184, 1184), _21(17, 1184, 1184, 1184, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 16), _22(17, 'i', 'i', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o'), _23(17, "begin", "end", "time", "type", "result", "userid", "username", "database", "client_conninfo", "object_name", "detail_info", "node_name", "thread_id", "local_port", "remote_port", "sha_code", "verify_result"), _24(NULL), _25("pg_query_audit"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33(NULL), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)),
|
||||
AddBuiltinFunc(_0(3782), _1("pg_query_audit"), _2(3), _3(false), _4(true), _5(pg_query_audit), _6(2249), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(10), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('v'), _19(0), _20(3, 1184, 1184, 25), _21(18, 1184, 1184, 25, 1184, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 16), _22(18, 'i', 'i', 'i', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o'), _23(18, "begin", "end", "directory", "time", "type", "result", "userid", "username", "database", "client_conninfo", "object_name", "detail_info", "node_name", "thread_id", "local_port", "remote_port", "sha_code", "verify_result"), _24(NULL), _25("pg_query_audit"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33(NULL), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0))
|
||||
),
|
||||
AddFuncGroup(
|
||||
"pg_read_binary_file", 2,
|
||||
|
||||
@ -76,12 +76,13 @@ bool will_shutdown = false;
|
||||
*
|
||||
********************************************/
|
||||
|
||||
const uint32 GRAND_VERSION_NUM = 92945;
|
||||
const uint32 GRAND_VERSION_NUM = 92946;
|
||||
|
||||
/********************************************
|
||||
* 2.VERSION NUM FOR EACH FEATURE
|
||||
* Please write indescending order.
|
||||
********************************************/
|
||||
const uint32 AUDIT_SHA_VERSION_NUM = 92946;
|
||||
const uint32 NETTIME_TRACE_VERSION_NUM = 92945;
|
||||
const uint32 HBA_CONF_VERSION_NUM = 92944;
|
||||
const uint32 PARALLEL_ENABLE_VERSION_NUM = 92941;
|
||||
|
||||
@ -30,6 +30,7 @@
|
||||
#include "lib/stringinfo.h"
|
||||
#include "libpq/libpq-be.h"
|
||||
#include "libpq/pqsignal.h"
|
||||
#include "libpq/sha2.h"
|
||||
#include "funcapi.h"
|
||||
#include "miscadmin.h"
|
||||
#include "nodes/pg_list.h"
|
||||
@ -57,6 +58,25 @@
|
||||
#include <string>
|
||||
#include <fstream>
|
||||
|
||||
#include <sstream>
|
||||
#include <iostream>
|
||||
|
||||
#ifdef HAVE_STDIO_H
|
||||
#include <stdio.h>
|
||||
#endif
|
||||
|
||||
#ifdef HAVE_STDLIB_H
|
||||
#include <stdlib.h>
|
||||
#endif
|
||||
|
||||
#ifdef HVE_STRING_H
|
||||
#include <string.h>
|
||||
#endif
|
||||
|
||||
#ifdef HAVE_OPENSSL_SHA_H
|
||||
#include <openssl/sha.h>
|
||||
#endif
|
||||
|
||||
#ifdef ENABLE_UT
|
||||
#define static
|
||||
#endif
|
||||
@ -333,6 +353,7 @@ typedef struct AuditData {
|
||||
#define PGAUDIT_RESTART_INTERVAL 60
|
||||
|
||||
#define PGAUDIT_QUERY_COLS 13
|
||||
#define PGAUDIT_QUERY_COLS_NEW 15
|
||||
|
||||
#define MAXNUMLEN 16
|
||||
|
||||
@ -340,6 +361,11 @@ typedef struct AuditData {
|
||||
#define WRITE_TO_STDAUDITFILE(ctype) (t_thrd.role == AUDITOR && ctype == STD_AUDIT_TYPE)
|
||||
#define WRITE_TO_UNIAUDITFILE(ctype) (t_thrd.role == AUDITOR && ctype == UNIFIED_AUDIT_TYPE)
|
||||
|
||||
#define MAX_DATA_LEN 1024 /*sha data len*/
|
||||
#define SHA256_LENTH 32 /*sha length*/
|
||||
#define SHA256_HEX_LENTH 512 /*sha hex length*/
|
||||
#define SHA_LOG_MAX_TIMELEN 80 /*sha date length*/
|
||||
|
||||
struct AuditEventInfo {
|
||||
AuditEventInfo() : userid{0},
|
||||
username(NULL),
|
||||
@ -406,13 +432,15 @@ static void pgaudit_rewrite_indexfile(void);
|
||||
static void pgaudit_indextbl_init_new(void);
|
||||
static void pgaudit_reset_indexfile();
|
||||
static const char* pgaudit_string_field(AuditData* adata, int num);
|
||||
static void deserialization_to_tuple(Datum (&values)[PGAUDIT_QUERY_COLS],
|
||||
static void deserialization_to_tuple(Datum (&values)[PGAUDIT_QUERY_COLS_NEW],
|
||||
AuditData *adata,
|
||||
const AuditMsgHdr &header);
|
||||
const AuditMsgHdr &header,
|
||||
bool nulls[PGAUDIT_QUERY_COLS_NEW],
|
||||
bool newVersion);
|
||||
static void pgaudit_query_file(Tuplestorestate *state, TupleDesc tdesc, uint32 fnum, TimestampTz begtime,
|
||||
TimestampTz endtime, const char *audit_directory);
|
||||
TimestampTz endtime, const char *audit_directory, bool newVersion);
|
||||
static TimestampTz pgaudit_headertime(uint32 fnum, const char *audit_directory);
|
||||
static void pgaudit_query_valid_check(const ReturnSetInfo *rsinfo, FunctionCallInfoData *fcinfo, TupleDesc &tupdesc);
|
||||
static void pgaudit_query_valid_check(const ReturnSetInfo *rsinfo, FunctionCallInfoData *fcinfo, TupleDesc &tupdesc, bool newVersion);
|
||||
|
||||
static uint32 pgaudit_get_auditfile_num();
|
||||
static void pgaudit_update_auditfile_time(pg_time_t timestamp, bool exist);
|
||||
@ -438,11 +466,15 @@ inline bool pgaudit_need_check_size_rotation()
|
||||
|
||||
/********** toughness *********/
|
||||
static void CheckAuditFile(void);
|
||||
static bool pgaudit_invalid_header(const AuditMsgHdr* header);
|
||||
static bool pgaudit_invalid_header(const AuditMsgHdr* header, bool newVersion);
|
||||
static void pgaudit_mark_corrupt_info(uint32 fnum);
|
||||
static void audit_append_xid_info(const char *detail_info, char *detail_info_xid, uint32 len);
|
||||
static bool audit_status_check_ok();
|
||||
|
||||
/*audit sha code*/
|
||||
static bool pgaudit_need_sha_code();
|
||||
static void generate_audit_sha_code(pg_time_t time, const char* type, const char* result, char *userid, const char* username, const char* dbname, char* client_info, \
|
||||
const char *object_name, const char *detail_info, const char* nodename, char* threadid, char* localport, \
|
||||
char* remoteport, unsigned char* shacode);
|
||||
static void init_audit_signal_handlers()
|
||||
{
|
||||
(void)gspqsignal(SIGHUP, sigHupHandler); /* set flag to read config file */
|
||||
@ -564,6 +596,53 @@ static void sig_thread_config_handler(int ¤tAuditRotationAge, int ¤t
|
||||
t_thrd.audit.rotation_requested = true;
|
||||
}
|
||||
}
|
||||
|
||||
/* audit sha code version: finished upgrade*/
|
||||
static bool pgaudit_need_sha_code()
|
||||
{
|
||||
if (t_thrd.proc == NULL) {
|
||||
return false;
|
||||
}
|
||||
return t_thrd.proc->workingVersionNum >= AUDIT_SHA_VERSION_NUM;
|
||||
}
|
||||
|
||||
/*
|
||||
* Brief : audit sha code
|
||||
* Description : audit sha code
|
||||
* the fileds are arraged as below sequence, Note it's not liable to modify them as to keep compatibility of version
|
||||
* time|type|result|userid|username|dbname|client_info|object_name|detail_info|nodename|threadid|localport|remoteport
|
||||
*/
|
||||
static void generate_audit_sha_code(pg_time_t time, AuditType type, AuditResult result, char* userid, const char* username, const char* dbname, char* client_info,
|
||||
const char* object_name, const char* detail_info, const char* nodename, char* threadid, char* localport,
|
||||
char* remoteport, unsigned char* shacode)
|
||||
{
|
||||
char timeTzLocaltime[SHA_LOG_MAX_TIMELEN] = {0};
|
||||
struct tm* system;
|
||||
system = localtime(&time);
|
||||
if (system != nullptr) {
|
||||
(void)strftime(timeTzLocaltime, SHA_LOG_MAX_TIMELEN, "%Y-%m-%d_%H%M%S", system);
|
||||
}
|
||||
userid = (userid != NULL && userid[0] != '\0') ? userid : NULL;
|
||||
username = (username != NULL && username[0] != '\0') ? username : NULL;
|
||||
dbname = (dbname != NULL && dbname[0] != '\0') ? dbname : NULL;
|
||||
client_info = (client_info != NULL && client_info[0] != '\0') ? client_info : NULL;
|
||||
object_name = (object_name != NULL && object_name[0] != '\0') ? object_name : NULL;
|
||||
detail_info = (detail_info != NULL && detail_info[0] != '\0') ? detail_info : NULL;
|
||||
nodename = (nodename != NULL && nodename[0] != '\0') ? nodename : NULL;
|
||||
threadid = (threadid != NULL && threadid[0] != '\0') ? threadid : NULL;
|
||||
localport = (localport != NULL && localport[0] != '\0') ? localport : NULL;
|
||||
remoteport = (remoteport != NULL && remoteport[0] != '\0') ? remoteport : NULL;
|
||||
// TimestampTz timeTz = time_t_to_timestamptz(time);
|
||||
StringInfoData str;
|
||||
initStringInfo(&str);
|
||||
appendStringInfo(&str, "%s | %d | %d | %s | %s | %s | %s | %s | %s | %s | %s | %s | %s",
|
||||
timeTzLocaltime, type, result, userid, username, dbname, client_info, object_name,
|
||||
detail_info, nodename, threadid, localport, remoteport);
|
||||
SHA256((const unsigned char*)str.data , str.len, shacode);
|
||||
pfree_ext(str.data);
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Main entry point for auditor process
|
||||
* argc/argv parameters are valid only in EXEC_BACKEND case.
|
||||
@ -1992,6 +2071,7 @@ void audit_report(AuditType type, AuditResult result, const char *object_name, c
|
||||
StringInfoData buf;
|
||||
AuditData adata;
|
||||
AuditEventInfo event_info;
|
||||
unsigned char shacode[SHA256_HEX_LENTH] = {0};
|
||||
if (!audit_get_clientinfo(type, object_name, event_info)) {
|
||||
return;
|
||||
}
|
||||
@ -2020,12 +2100,26 @@ void audit_report(AuditType type, AuditResult result, const char *object_name, c
|
||||
adata.header.signature[0] = 'A';
|
||||
adata.header.signature[1] = 'U';
|
||||
adata.header.version = 0;
|
||||
adata.header.fields = PGAUDIT_QUERY_COLS;
|
||||
if (pgaudit_need_sha_code()) {
|
||||
adata.header.fields = PGAUDIT_QUERY_COLS_NEW;
|
||||
} else {
|
||||
adata.header.fields = PGAUDIT_QUERY_COLS;
|
||||
}
|
||||
adata.header.flags = AUDIT_TUPLE_NORMAL;
|
||||
adata.header.time = current_timestamp();
|
||||
adata.header.size = 0;
|
||||
adata.type = type;
|
||||
adata.result = result;
|
||||
char hexbuf[SHA256_HEX_LENTH]={0};
|
||||
/*type result format*/
|
||||
if (pgaudit_need_sha_code()) {
|
||||
/*sha code for audit*/
|
||||
generate_audit_sha_code(adata.header.time / 1000, type, result, userid, username, dbname, client_info, object_name,
|
||||
detail_info, g_instance.attr.attr_common.PGXCNodeName, threadid, localport,
|
||||
remoteport, shacode);
|
||||
/*sha code convert to hex*/
|
||||
sha_bytes_to_hex64((uint8*)shacode, hexbuf);
|
||||
}
|
||||
initStringInfo(&buf);
|
||||
appendBinaryStringInfo(&buf, (char*)&adata, AUDIT_HEADER_SIZE);
|
||||
|
||||
@ -2040,6 +2134,9 @@ void audit_report(AuditType type, AuditResult result, const char *object_name, c
|
||||
appendStringField(&buf, (threadid[0] != '\0') ? threadid : NULL);
|
||||
appendStringField(&buf, (localport[0] != '\0') ? localport : NULL);
|
||||
appendStringField(&buf, (remoteport[0] != '\0') ? remoteport : NULL);
|
||||
if (pgaudit_need_sha_code()) {
|
||||
appendStringField(&buf, (shacode[0] != '\0') ? (const char*)hexbuf : NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
* Use the chunking protocol if we know the syslogger should be
|
||||
@ -2651,6 +2748,8 @@ static char* serialize_event_to_json(AuditData *adata, long long eventTime)
|
||||
WRITE_JSON_STRING(localPortInfo);
|
||||
event.remotePortInfo = pgaudit_string_field(adata, AUDIT_REMOTEPORT_INFO);
|
||||
WRITE_JSON_STRING(remotePortInfo);
|
||||
event.shaCode = pgaudit_string_field(adata, AUDIT_SHACODE);
|
||||
WRITE_JSON_STRING(shaCode);
|
||||
event.eventTime = eventTime;
|
||||
WRITE_JSON_INT(eventTime);
|
||||
WRITE_JSON_END();
|
||||
@ -2721,7 +2820,7 @@ static void pgaudit_query_file_for_elastic()
|
||||
if (header.signature[0] != 'A' ||
|
||||
header.signature[1] != 'U' ||
|
||||
header.version != 0 ||
|
||||
header.fields != PGAUDIT_QUERY_COLS ||
|
||||
(header.fields != PGAUDIT_QUERY_COLS && header.fields != PGAUDIT_QUERY_COLS_NEW) ||
|
||||
(header.size <= sizeof(AuditMsgHdr))) {
|
||||
ereport(LOG, (errmsg("invalid data in audit file \"%s\"", file_path)));
|
||||
break;
|
||||
@ -2799,18 +2898,36 @@ static void pgaudit_query_file_for_elastic()
|
||||
/*
|
||||
* Brief : scan the specified audit file into tuple
|
||||
* Description : Note we use old/new version to differ whether there is user_id field in the file.
|
||||
* for expanding new field later, maybe we will depend on version id to implement
|
||||
* for expanding new field later, maybe we will depend on version id to implement
|
||||
* backward compatibility but not bool variable
|
||||
*/
|
||||
static void deserialization_to_tuple(Datum (&values)[PGAUDIT_QUERY_COLS],
|
||||
AuditData *adata,
|
||||
const AuditMsgHdr &header)
|
||||
static void deserialization_to_tuple(Datum (&values)[PGAUDIT_QUERY_COLS_NEW],
|
||||
AuditData *adata,
|
||||
const AuditMsgHdr &header,
|
||||
bool nulls[PGAUDIT_QUERY_COLS_NEW],
|
||||
bool newVersion)
|
||||
{
|
||||
/*sha param*/
|
||||
char* userid = NULL;
|
||||
const char* username =NULL;
|
||||
const char* dbname = NULL;
|
||||
char* client_info = NULL;
|
||||
const char* object_name = NULL;
|
||||
const char* detail_info =NULL;
|
||||
const char* nodename = NULL;
|
||||
char* threadid = NULL;
|
||||
char* localport = NULL;
|
||||
char* remoteport = NULL;
|
||||
unsigned char shacode[SHA256_HEX_LENTH] = {0};
|
||||
const char* saved_hexbuf = NULL;
|
||||
char hexbuf[SHA256_HEX_LENTH]={0};
|
||||
|
||||
/* append timestamp info to data tuple */
|
||||
int i = 0;
|
||||
values[i++] = TimestampTzGetDatum(time_t_to_timestamptz(adata->header.time));
|
||||
values[i++] = CStringGetTextDatum(AuditTypeDesc(adata->type));
|
||||
values[i++] = CStringGetTextDatum(AuditResultDesc(adata->result));
|
||||
// values[i++] = CStringGetTextDatum((const char*)adata->shacode);
|
||||
|
||||
/*
|
||||
* new format of the audit file under correct record
|
||||
@ -2818,33 +2935,89 @@ static void deserialization_to_tuple(Datum (&values)[PGAUDIT_QUERY_COLS],
|
||||
*/
|
||||
int index_field = 0;
|
||||
const char* field = NULL;
|
||||
bool new_version = (header.fields == PGAUDIT_QUERY_COLS);
|
||||
|
||||
bool new_version = (header.fields == PGAUDIT_QUERY_COLS || header.fields == PGAUDIT_QUERY_COLS_NEW);
|
||||
field = new_version ? pgaudit_string_field(adata, index_field++) : NULL;
|
||||
userid = (char*)field;
|
||||
values[i++] = CStringGetTextDatum(FILED_NULLABLE(field)); /* user id */
|
||||
|
||||
field = pgaudit_string_field(adata, index_field++);
|
||||
username = (const char*)field;
|
||||
values[i++] = CStringGetTextDatum(FILED_NULLABLE(field)); /* user name */
|
||||
|
||||
field = pgaudit_string_field(adata, index_field++);
|
||||
dbname = (const char*)field;
|
||||
values[i++] = CStringGetTextDatum(FILED_NULLABLE(field)); /* dbname */
|
||||
|
||||
field = pgaudit_string_field(adata, index_field++);
|
||||
client_info = (char*)field;
|
||||
values[i++] = CStringGetTextDatum(FILED_NULLABLE(field)); /* client info */
|
||||
|
||||
field = pgaudit_string_field(adata, index_field++);
|
||||
if (field != NULL) {
|
||||
object_name = (const char*)field;
|
||||
}
|
||||
values[i++] = CStringGetTextDatum(FILED_NULLABLE(field)); /* object name */
|
||||
|
||||
field = pgaudit_string_field(adata, index_field++);
|
||||
detail_info = (const char*)field;
|
||||
values[i++] = CStringGetTextDatum(FILED_NULLABLE(field)); /* detail info */
|
||||
|
||||
field = pgaudit_string_field(adata, index_field++);
|
||||
nodename = (const char*)field;
|
||||
values[i++] = CStringGetTextDatum(FILED_NULLABLE(field)); /* node name */
|
||||
|
||||
field = pgaudit_string_field(adata, index_field++);
|
||||
threadid = (char*)field;
|
||||
values[i++] = CStringGetTextDatum(FILED_NULLABLE(field)); /* thread id */
|
||||
|
||||
field = pgaudit_string_field(adata, index_field++);
|
||||
localport = (char*)field;
|
||||
values[i++] = CStringGetTextDatum(FILED_NULLABLE(field)); /* local port */
|
||||
|
||||
field = pgaudit_string_field(adata, index_field++);
|
||||
remoteport = (char*)field;
|
||||
values[i++] = CStringGetTextDatum(FILED_NULLABLE(field)); /* remote port */
|
||||
|
||||
Assert(i == PGAUDIT_QUERY_COLS);
|
||||
if (header.fields == PGAUDIT_QUERY_COLS_NEW) {
|
||||
field = pgaudit_string_field(adata, index_field++); /*old version audit data, not sha_code*/
|
||||
} else {
|
||||
field = NULL;
|
||||
}
|
||||
values[i++] = CStringGetTextDatum(FILED_NULLABLE(field)); /* sha_code hex data*/
|
||||
if (pgaudit_need_sha_code()) {
|
||||
saved_hexbuf = field;
|
||||
if (header.fields == PGAUDIT_QUERY_COLS_NEW) {
|
||||
if (saved_hexbuf != NULL && saved_hexbuf[0] != '\0') {
|
||||
bool verifyResult = false;
|
||||
/*sha code for audit*/
|
||||
generate_audit_sha_code(adata->header.time, adata->type, adata->result, userid, username, dbname, client_info, object_name,
|
||||
detail_info, nodename, threadid, localport, remoteport, shacode);
|
||||
/*sha code convert to hex*/
|
||||
sha_bytes_to_hex64((uint8*)shacode, hexbuf);
|
||||
if (strcmp((const char*)hexbuf, (const char*)saved_hexbuf) == 0) {
|
||||
verifyResult = true;
|
||||
}
|
||||
values[i++] = BoolGetDatum(verifyResult); /* verify_result */
|
||||
} else {
|
||||
values[i++] = CStringGetTextDatum(FILED_NULLABLE(NULL)); /* verify_result*/
|
||||
nulls[i] = true;
|
||||
}
|
||||
} else {
|
||||
values[i++] = CStringGetTextDatum(FILED_NULLABLE(NULL)); /* verify_result*/
|
||||
nulls[i] = true;
|
||||
}
|
||||
}
|
||||
if (newVersion) {
|
||||
Assert(i == PGAUDIT_QUERY_COLS_NEW);
|
||||
} else {
|
||||
Assert(i == PGAUDIT_QUERY_COLS);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void pgaudit_query_file(Tuplestorestate *state, TupleDesc tdesc, uint32 fnum, TimestampTz begtime,
|
||||
TimestampTz endtime, const char *audit_directory)
|
||||
TimestampTz endtime, const char *audit_directory, bool newVersion)
|
||||
{
|
||||
FILE* fp = NULL;
|
||||
size_t nread = 0;
|
||||
@ -2867,8 +3040,9 @@ static void pgaudit_query_file(Tuplestorestate *state, TupleDesc tdesc, uint32 f
|
||||
}
|
||||
|
||||
do {
|
||||
Datum values[PGAUDIT_QUERY_COLS] = {0};
|
||||
bool nulls[PGAUDIT_QUERY_COLS] = {0};
|
||||
Datum values[PGAUDIT_QUERY_COLS_NEW] = {0};
|
||||
bool nulls[PGAUDIT_QUERY_COLS_NEW] = {0};
|
||||
|
||||
errno_t errorno = EOK;
|
||||
/*
|
||||
* two scenarios tell that the audit file corrupt
|
||||
@ -2880,7 +3054,7 @@ static void pgaudit_query_file(Tuplestorestate *state, TupleDesc tdesc, uint32 f
|
||||
}
|
||||
(void)fseek(fp, -1, SEEK_CUR);
|
||||
size_t header_available = fread(&header, sizeof(AuditMsgHdr), 1, fp);
|
||||
if (header_available != 1 || pgaudit_invalid_header(&header)) {
|
||||
if (header_available != 1 || pgaudit_invalid_header(&header, newVersion)) {
|
||||
ereport(LOG, (errmsg("invalid data in audit file \"%s\"", t_thrd.audit.pgaudit_filepath)));
|
||||
/* label the currupt file num, then it may be reinit in audit thread but not here. */
|
||||
pgaudit_mark_corrupt_info(fnum);
|
||||
@ -2905,7 +3079,7 @@ static void pgaudit_query_file(Tuplestorestate *state, TupleDesc tdesc, uint32 f
|
||||
/* filt and assemble audit info into tuplestore */
|
||||
datetime = time_t_to_timestamptz(adata->header.time);
|
||||
if (datetime >= begtime && datetime < endtime && header.flags == AUDIT_TUPLE_NORMAL) {
|
||||
deserialization_to_tuple(values, adata, header);
|
||||
deserialization_to_tuple(values, adata, header, nulls, newVersion);
|
||||
tuplestore_putvalues(state, tdesc, values, nulls);
|
||||
}
|
||||
|
||||
@ -2952,7 +3126,8 @@ static void pgaudit_delete_file(uint32 fnum, TimestampTz begtime, TimestampTz en
|
||||
header.signature[1] != 'U' ||
|
||||
header.version != 0 ||
|
||||
!(header.fields == (PGAUDIT_QUERY_COLS - 1) ||
|
||||
header.fields == PGAUDIT_QUERY_COLS)) {
|
||||
header.fields == PGAUDIT_QUERY_COLS ||
|
||||
header.fields == PGAUDIT_QUERY_COLS_NEW)) {
|
||||
/* make sure we are compatible with the older version audit file */
|
||||
ereport(LOG, (errmsg("invalid data in audit file \"%s\"", t_thrd.audit.pgaudit_filepath)));
|
||||
break;
|
||||
@ -3066,7 +3241,7 @@ static TimestampTz pgaudit_headertime(uint32 fnum, const char *audit_directory)
|
||||
* Brief : whether the invoke is allowed for query audit.
|
||||
* Description :
|
||||
*/
|
||||
static void pgaudit_query_valid_check(const ReturnSetInfo *rsinfo, FunctionCallInfoData *fcinfo, TupleDesc &tupdesc)
|
||||
static void pgaudit_query_valid_check(const ReturnSetInfo *rsinfo, FunctionCallInfoData *fcinfo, TupleDesc &tupdesc, bool newVersion)
|
||||
{
|
||||
Oid roleid = InvalidOid;
|
||||
/* Check some permissions first */
|
||||
@ -3090,8 +3265,14 @@ static void pgaudit_query_valid_check(const ReturnSetInfo *rsinfo, FunctionCallI
|
||||
ereport(ERROR, (errcode(ERRCODE_SYSTEM_ERROR), errmsg("return type must be a row type")));
|
||||
}
|
||||
|
||||
if (tupdesc->natts != PGAUDIT_QUERY_COLS) {
|
||||
ereport(ERROR, (errcode(ERRCODE_SYSTEM_ERROR), errmsg("attribute count of the return row type not matched")));
|
||||
if (newVersion) {
|
||||
if (tupdesc->natts != PGAUDIT_QUERY_COLS_NEW) {
|
||||
ereport(ERROR, (errcode(ERRCODE_SYSTEM_ERROR), errmsg("attribute count of the return row type not matched")));
|
||||
}
|
||||
} else {
|
||||
if (tupdesc->natts != PGAUDIT_QUERY_COLS) {
|
||||
ereport(ERROR, (errcode(ERRCODE_SYSTEM_ERROR), errmsg("attribute count of the return row type not matched")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -3111,8 +3292,12 @@ Datum pg_query_audit(PG_FUNCTION_ARGS)
|
||||
TimestampTz endtime = PG_GETARG_TIMESTAMPTZ(1);
|
||||
char* audit_dir = NULL;
|
||||
char real_audit_dir[PATH_MAX] = {0};
|
||||
bool newVersion = false;
|
||||
if (pgaudit_need_sha_code()) {
|
||||
newVersion = true;
|
||||
}
|
||||
|
||||
pgaudit_query_valid_check(rsinfo, fcinfo, tupdesc);
|
||||
pgaudit_query_valid_check(rsinfo, fcinfo, tupdesc, newVersion);
|
||||
|
||||
/*
|
||||
* When g_instance.audit_cxt.audit_indextbl is not NULL,
|
||||
@ -3167,7 +3352,7 @@ Datum pg_query_audit(PG_FUNCTION_ARGS)
|
||||
satisfied = pgaudit_check_system(begtime, endtime, index, real_audit_dir);
|
||||
if (satisfied) {
|
||||
oldcontext = MemoryContextSwitchTo(query_audit_ctx);
|
||||
pgaudit_query_file(tupstore, tupdesc, fnum, begtime, endtime, real_audit_dir);
|
||||
pgaudit_query_file(tupstore, tupdesc, fnum, begtime, endtime, real_audit_dir, newVersion);
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
MemoryContextReset(query_audit_ctx);
|
||||
satisfied = false;
|
||||
@ -3333,14 +3518,22 @@ static void CheckAuditFile(void)
|
||||
auditfile_init(true);
|
||||
}
|
||||
|
||||
static bool pgaudit_invalid_header(const AuditMsgHdr* header)
|
||||
static bool pgaudit_invalid_header(const AuditMsgHdr* header, bool newVersion)
|
||||
{
|
||||
return ((header->signature[0]) != 'A' || header->signature[1] != 'U' || header->version != 0 ||
|
||||
!(header->fields == (PGAUDIT_QUERY_COLS - 1) || header->fields == PGAUDIT_QUERY_COLS) ||
|
||||
(header->size <= sizeof(AuditMsgHdr)) ||
|
||||
(header->size >= (uint32)u_sess->attr.attr_security.Audit_RotationSize * 1024L));
|
||||
if (newVersion) {
|
||||
return ((header->signature[0]) != 'A' || header->signature[1] != 'U' || header->version != 0 ||
|
||||
!(header->fields == (PGAUDIT_QUERY_COLS - 1) || header->fields == PGAUDIT_QUERY_COLS || header->fields == PGAUDIT_QUERY_COLS_NEW) ||
|
||||
(header->size <= sizeof(AuditMsgHdr)) ||
|
||||
(header->size >= (uint32)u_sess->attr.attr_security.Audit_RotationSize * 1024L));
|
||||
} else {
|
||||
return ((header->signature[0]) != 'A' || header->signature[1] != 'U' || header->version != 0 ||
|
||||
!(header->fields == (PGAUDIT_QUERY_COLS - 1) || header->fields == PGAUDIT_QUERY_COLS) ||
|
||||
(header->size <= sizeof(AuditMsgHdr)) ||
|
||||
(header->size >= (uint32)u_sess->attr.attr_security.Audit_RotationSize * 1024L));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* mark corrupt fnum by postgres thread
|
||||
* used for reinit audit files in audit thread
|
||||
|
||||
@ -0,0 +1,39 @@
|
||||
DROP FUNCTION IF EXISTS pg_catalog.pg_query_audit(timestamptz, timestamptz) CASCADE;
|
||||
SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 3780;
|
||||
CREATE FUNCTION pg_catalog.pg_query_audit(
|
||||
TIMESTAMPTZ,
|
||||
TIMESTAMPTZ,
|
||||
OUT "time" TIMESTAMPTZ,
|
||||
OUT type TEXT,
|
||||
OUT result TEXT,
|
||||
OUT userid TEXT,
|
||||
OUT username TEXT,
|
||||
OUT database TEXT,
|
||||
OUT client_conninfo TEXT,
|
||||
OUT object_name TEXT,
|
||||
OUT detail_info TEXT,
|
||||
OUT node_name TEXT,
|
||||
OUT thread_id TEXT,
|
||||
OUT local_port TEXT,
|
||||
OUT remote_port TEXT
|
||||
) RETURNS SETOF RECORD LANGUAGE INTERNAL VOLATILE ROWS 10 STRICT as 'pg_query_audit';
|
||||
DROP FUNCTION IF EXISTS pg_catalog.pg_query_audit(timestamptz, timestamptz, text) CASCADE;
|
||||
SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 3782;
|
||||
CREATE FUNCTION pg_catalog.pg_query_audit(
|
||||
TIMESTAMPTZ,
|
||||
TIMESTAMPTZ,
|
||||
TEXT,
|
||||
OUT "time" TIMESTAMPTZ,
|
||||
OUT type TEXT,
|
||||
OUT result TEXT,
|
||||
OUT userid TEXT,
|
||||
OUT username TEXT,
|
||||
OUT database TEXT,
|
||||
OUT client_conninfo TEXT,
|
||||
OUT object_name TEXT,
|
||||
OUT detail_info TEXT,
|
||||
OUT node_name TEXT,
|
||||
OUT thread_id TEXT,
|
||||
OUT local_port TEXT,
|
||||
OUT remote_port TEXT
|
||||
) RETURNS SETOF RECORD LANGUAGE INTERNAL VOLATILE ROWS 10 STRICT as 'pg_query_audit';
|
||||
@ -0,0 +1,39 @@
|
||||
DROP FUNCTION IF EXISTS pg_catalog.pg_query_audit(timestamptz, timestamptz) CASCADE;
|
||||
SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 3780;
|
||||
CREATE FUNCTION pg_catalog.pg_query_audit(
|
||||
TIMESTAMPTZ,
|
||||
TIMESTAMPTZ,
|
||||
OUT "time" TIMESTAMPTZ,
|
||||
OUT type TEXT,
|
||||
OUT result TEXT,
|
||||
OUT userid TEXT,
|
||||
OUT username TEXT,
|
||||
OUT database TEXT,
|
||||
OUT client_conninfo TEXT,
|
||||
OUT object_name TEXT,
|
||||
OUT detail_info TEXT,
|
||||
OUT node_name TEXT,
|
||||
OUT thread_id TEXT,
|
||||
OUT local_port TEXT,
|
||||
OUT remote_port TEXT
|
||||
) RETURNS SETOF RECORD LANGUAGE INTERNAL VOLATILE ROWS 10 STRICT as 'pg_query_audit';
|
||||
DROP FUNCTION IF EXISTS pg_catalog.pg_query_audit(timestamptz, timestamptz, text) CASCADE;
|
||||
SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 3782;
|
||||
CREATE FUNCTION pg_catalog.pg_query_audit(
|
||||
TIMESTAMPTZ,
|
||||
TIMESTAMPTZ,
|
||||
TEXT,
|
||||
OUT "time" TIMESTAMPTZ,
|
||||
OUT type TEXT,
|
||||
OUT result TEXT,
|
||||
OUT userid TEXT,
|
||||
OUT username TEXT,
|
||||
OUT database TEXT,
|
||||
OUT client_conninfo TEXT,
|
||||
OUT object_name TEXT,
|
||||
OUT detail_info TEXT,
|
||||
OUT node_name TEXT,
|
||||
OUT thread_id TEXT,
|
||||
OUT local_port TEXT,
|
||||
OUT remote_port TEXT
|
||||
) RETURNS SETOF RECORD LANGUAGE INTERNAL VOLATILE ROWS 10 STRICT as 'pg_query_audit';
|
||||
@ -0,0 +1,43 @@
|
||||
DROP FUNCTION IF EXISTS pg_catalog.pg_query_audit(timestamptz, timestamptz) CASCADE;
|
||||
SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 3780;
|
||||
CREATE FUNCTION pg_catalog.pg_query_audit(
|
||||
TIMESTAMPTZ,
|
||||
TIMESTAMPTZ,
|
||||
OUT "time" TIMESTAMPTZ,
|
||||
OUT type TEXT,
|
||||
OUT result TEXT,
|
||||
OUT userid TEXT,
|
||||
OUT username TEXT,
|
||||
OUT database TEXT,
|
||||
OUT client_conninfo TEXT,
|
||||
OUT object_name TEXT,
|
||||
OUT detail_info TEXT,
|
||||
OUT node_name TEXT,
|
||||
OUT thread_id TEXT,
|
||||
OUT local_port TEXT,
|
||||
OUT remote_port TEXT,
|
||||
OUT sha_code TEXT,
|
||||
OUT verify_result BOOLEAN
|
||||
) RETURNS SETOF RECORD LANGUAGE INTERNAL VOLATILE ROWS 10 STRICT as 'pg_query_audit';
|
||||
DROP FUNCTION IF EXISTS pg_catalog.pg_query_audit(timestamptz, timestamptz, text) CASCADE;
|
||||
SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 3782;
|
||||
CREATE FUNCTION pg_catalog.pg_query_audit(
|
||||
TIMESTAMPTZ,
|
||||
TIMESTAMPTZ,
|
||||
TEXT,
|
||||
OUT "time" TIMESTAMPTZ,
|
||||
OUT type TEXT,
|
||||
OUT result TEXT,
|
||||
OUT userid TEXT,
|
||||
OUT username TEXT,
|
||||
OUT database TEXT,
|
||||
OUT client_conninfo TEXT,
|
||||
OUT object_name TEXT,
|
||||
OUT detail_info TEXT,
|
||||
OUT node_name TEXT,
|
||||
OUT thread_id TEXT,
|
||||
OUT local_port TEXT,
|
||||
OUT remote_port TEXT,
|
||||
OUT sha_code TEXT,
|
||||
OUT verify_result BOOLEAN
|
||||
) RETURNS SETOF RECORD LANGUAGE INTERNAL VOLATILE ROWS 10 STRICT as 'pg_query_audit';
|
||||
@ -0,0 +1,43 @@
|
||||
DROP FUNCTION IF EXISTS pg_catalog.pg_query_audit(timestamptz, timestamptz) CASCADE;
|
||||
SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 3780;
|
||||
CREATE FUNCTION pg_catalog.pg_query_audit(
|
||||
TIMESTAMPTZ,
|
||||
TIMESTAMPTZ,
|
||||
OUT "time" TIMESTAMPTZ,
|
||||
OUT type TEXT,
|
||||
OUT result TEXT,
|
||||
OUT userid TEXT,
|
||||
OUT username TEXT,
|
||||
OUT database TEXT,
|
||||
OUT client_conninfo TEXT,
|
||||
OUT object_name TEXT,
|
||||
OUT detail_info TEXT,
|
||||
OUT node_name TEXT,
|
||||
OUT thread_id TEXT,
|
||||
OUT local_port TEXT,
|
||||
OUT remote_port TEXT,
|
||||
OUT sha_code TEXT,
|
||||
OUT verify_result BOOLEAN
|
||||
) RETURNS SETOF RECORD LANGUAGE INTERNAL VOLATILE ROWS 10 STRICT as 'pg_query_audit';
|
||||
DROP FUNCTION IF EXISTS pg_catalog.pg_query_audit(timestamptz, timestamptz, text) CASCADE;
|
||||
SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 3782;
|
||||
CREATE FUNCTION pg_catalog.pg_query_audit(
|
||||
TIMESTAMPTZ,
|
||||
TIMESTAMPTZ,
|
||||
TEXT,
|
||||
OUT "time" TIMESTAMPTZ,
|
||||
OUT type TEXT,
|
||||
OUT result TEXT,
|
||||
OUT userid TEXT,
|
||||
OUT username TEXT,
|
||||
OUT database TEXT,
|
||||
OUT client_conninfo TEXT,
|
||||
OUT object_name TEXT,
|
||||
OUT detail_info TEXT,
|
||||
OUT node_name TEXT,
|
||||
OUT thread_id TEXT,
|
||||
OUT local_port TEXT,
|
||||
OUT remote_port TEXT,
|
||||
OUT sha_code TEXT,
|
||||
OUT verify_result BOOLEAN
|
||||
) RETURNS SETOF RECORD LANGUAGE INTERNAL VOLATILE ROWS 10 STRICT as 'pg_query_audit';
|
||||
@ -151,6 +151,7 @@ extern const uint32 ROTATE_UNROTATE_VERSION_NUM;
|
||||
extern const uint32 FLOAT_VERSION_NUMBER;
|
||||
extern const uint32 STRAIGHT_JOIN_VERSION_NUMBER;
|
||||
extern const uint32 PARALLEL_ENABLE_VERSION_NUM;
|
||||
extern const uint32 AUDIT_SHA_VERSION_NUM;
|
||||
|
||||
extern void register_backend_version(uint32 backend_version);
|
||||
extern bool contain_backend_version(uint32 version_number);
|
||||
|
||||
@ -166,7 +166,8 @@ typedef enum {
|
||||
AUDIT_NODENAME_INFO,
|
||||
AUDIT_THREADID_INFO,
|
||||
AUDIT_LOCALPORT_INFO,
|
||||
AUDIT_REMOTEPORT_INFO
|
||||
AUDIT_REMOTEPORT_INFO,
|
||||
AUDIT_SHACODE,
|
||||
} AuditStringFieldNum;
|
||||
|
||||
struct AuditElasticEvent {
|
||||
@ -182,6 +183,7 @@ struct AuditElasticEvent {
|
||||
const char* threadIdInfo;
|
||||
const char* localPortInfo;
|
||||
const char* remotePortInfo;
|
||||
const char* shaCode;
|
||||
long long eventTime;
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user