From e4e1cd745fffc4576c06922d9722a2fc7ce489a5 Mon Sep 17 00:00:00 2001 From: dongning12 Date: Thu, 17 Aug 2023 22:23:32 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E8=B5=84=E6=BA=90=E6=B1=A0=E5=8C=96?= =?UTF-8?q?=E3=80=91=E9=80=82=E9=85=8DDMS=E6=8A=A5=E6=96=87=E5=85=BC?= =?UTF-8?q?=E5=AE=B9=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ddes/adapter/ss_dms_callback.cpp | 11 ++- src/gausskernel/ddes/ddes_commit_id | 5 +- src/include/ddes/dms/dms_api.h | 71 ++++++++++++++++--- 3 files changed, 73 insertions(+), 14 deletions(-) diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index 8291bdb24..a15c27993 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -971,9 +971,12 @@ static int32 CBProcessReleaseAllLock(uint32 len) return res; } -static int32 CBProcessBroadcast(void *db_handle, char *data, unsigned int len, char *output_msg, - uint32 *output_msg_len) +static int32 CBProcessBroadcast(void *db_handle, dms_broadcast_context_t *broad_ctx) { + char *data = broad_ctx->data; + unsigned int len = broad_ctx->len; + char *output_msg = broad_ctx->output_msg; + unsigned int *output_msg_len = broad_ctx->output_msg_len; int32 ret = DMS_SUCCESS; SSBroadcastOp bcast_op = *(SSBroadcastOp *)data; @@ -1034,8 +1037,10 @@ static int32 CBProcessBroadcast(void *db_handle, char *data, unsigned int len, c return ret; } -static int32 CBProcessBroadcastAck(void *db_handle, char *data, unsigned int len) +static int32 CBProcessBroadcastAck(void *db_handle, dms_broadcast_context_t *broad_ctx) { + char *data = broad_ctx->data; + unsigned int len = broad_ctx->len; int32 ret = DMS_SUCCESS; SSBroadcastOpAck bcast_op = *(SSBroadcastOpAck *)data; diff --git a/src/gausskernel/ddes/ddes_commit_id b/src/gausskernel/ddes/ddes_commit_id index 0a6c6dcbf..514b56258 100644 --- a/src/gausskernel/ddes/ddes_commit_id +++ b/src/gausskernel/ddes/ddes_commit_id @@ -1,2 +1,3 @@ -dms_commit_id=a6def3bbf3ff29d0289bcd02e14ece57bd6fbc6f -dss_commit_id=fcc80facb9e1ecc23df81a296de890de58bc0cf4 +dms_commit_id=19301cb7b3dd7f959cf6269c6d2aa8c8f23804fa +dss_commit_id=45ae7916cbdda2b2e64c02811db30df565cf34fa +cbb_commit_id=41cadda4643656bcd856bd37d2b6908e44a5d46c \ No newline at end of file diff --git a/src/include/ddes/dms/dms_api.h b/src/include/ddes/dms/dms_api.h index efdf3e076..cfa844a74 100644 --- a/src/include/ddes/dms/dms_api.h +++ b/src/include/ddes/dms/dms_api.h @@ -187,12 +187,24 @@ typedef struct st_dms_xmap_ctx { unsigned int dest_id; } dms_xmap_ctx_t; -typedef struct st_dms_context { - unsigned int inst_id; // current instance id - unsigned int sess_id; // current session id - unsigned int rmid; // current rm id - dms_session_e sess_type; // request page: recovery session flag +typedef struct st_dms_process_context { void *db_handle; + unsigned int sess_id; // current session id + unsigned int rmid; // current rm id + unsigned int inst_id; // current instance id +} dms_process_context_t; + +typedef struct st_dms_context { + union { + struct { + void *db_handle; + unsigned int sess_id; // current session id + unsigned int rmid; // current rm id + unsigned int inst_id; // current instance id + }; + dms_process_context_t proc_ctx; + }; + dms_session_e sess_type; // request page: recovery session flag unsigned char is_try; unsigned char type; unsigned short len; @@ -274,6 +286,14 @@ typedef struct st_dms_edp_info { }; } dms_edp_info_t; +typedef struct st_dms_broadcast_context { + char *data; + unsigned int len; + char *output_msg; + unsigned int *output_msg_len; + unsigned int msg_version; +} dms_broadcast_context_t; + typedef struct st_dms_buf_ctrl { volatile unsigned char is_remote_dirty; volatile unsigned char lock_mode; // used only in DMS, 0: Null, 1: Shared lock, 2: Exclusive lock @@ -505,6 +525,12 @@ typedef struct st_dcs_batch_buf { unsigned int max_count; } dcs_batch_buf_t; +typedef enum en_dms_inst_behavior { + DMS_INST_BEHAVIOR_IN_IDLE = 0, + DMS_INST_BEHAVIOR_IN_REFORM, + DMS_INST_BEHAVIOR_IN_BACKUP, +} dms_inst_behavior_t; + typedef int(*dms_get_list_stable)(void *db_handle, unsigned long long *list_stable, unsigned char *reformer_id); typedef int(*dms_save_list_stable)(void *db_handle, unsigned long long list_stable, unsigned char reformer_id, unsigned long long list_in, unsigned int save_ctrl); @@ -582,9 +608,8 @@ typedef char *(*dms_mem_alloc)(void *context, unsigned int size); typedef void(*dms_mem_free)(void *context, void *ptr); typedef void(*dms_mem_reset)(void *context); // The maximum length of output_msg is 128 bytes. -typedef int (*dms_process_broadcast)(void *db_handle, char *data, unsigned int len, char *output_msg, - unsigned int *output_msg_len); -typedef int (*dms_process_broadcast_ack)(void *db_handle, char *data, unsigned int len); +typedef int (*dms_process_broadcast)(void *db_handle, dms_broadcast_context_t *broad_ctx); +typedef int (*dms_process_broadcast_ack)(void *db_handle, dms_broadcast_context_t *broad_ctx); typedef int(*dms_get_txn_info)(void *db_handle, unsigned long long xid, unsigned char is_scan, dms_txn_info_t *txn_info); typedef int(*dms_get_opengauss_xid_csn)(void *db_handle, dms_opengauss_xid_csn_t *csn_req, @@ -611,12 +636,14 @@ typedef int (*dms_drc_buf_res_rebuild_parallel)(void *db_handle, unsigned char t typedef int(*dms_ctl_rcy_clean_parallel_t)(void *db_handle, unsigned char thread_index, unsigned char thread_num); typedef unsigned char(*dms_ckpt_session)(void *db_handle); typedef void (*dms_check_if_build_complete)(void *db_handle, unsigned int *build_complete); +typedef void (*dms_check_if_restore_recover)(void *db_handle, unsigned int *rst_recover); typedef int (*dms_db_is_primary)(void *db_handle); typedef void (*dms_set_switchover_result)(void *db_handle, int result); typedef void (*dms_set_db_role)(void *db_handle, unsigned char is_primary); typedef int (*dms_mount_to_recovery)(void *db_handle, unsigned int *has_offline); typedef int(*dms_get_open_status)(void *db_handle); typedef void (*dms_reform_set_dms_role)(void *db_handle, unsigned int reformer_id); +typedef void (*dms_reset_user)(void *db_handle, unsigned long long list_in); // for openGauss typedef void (*dms_thread_init_t)(unsigned char need_startup, char **reg_data); @@ -650,6 +677,13 @@ typedef void (*dms_verify_page)(dms_buf_ctrl_t *buf_ctrl, char *new_page); typedef int (*dms_drc_validate)(void *db_handle); typedef int (*dms_db_check_lock)(void *db_handle); typedef int (*dms_cache_msg)(void *db_handle, char* msg); +typedef void (*dms_ckpt_enque_one_page)(void *db_handle, dms_buf_ctrl_t *ctrl); +typedef int (*dms_set_remove_point)(void *db_handle, unsigned int node_id, void *curr_point); +typedef int (*dms_get_enable_checksum)(void *db_handle); +typedef unsigned int (*dms_calc_page_checksum)(void *db_handle, dms_buf_ctrl_t *ctrl, unsigned int page_size); +typedef int (*dms_verify_page_checksum)(void *db_handle, dms_buf_ctrl_t *ctrl, unsigned int page_size, int cks); +typedef int (*dms_update_node_oldest_xmin)(void *db_handle, unsigned char inst_id, unsigned long long oldest_xmin); +typedef void (*dms_set_inst_behavior)(void *db_handle, dms_inst_behavior_t inst_behavior); typedef struct st_dms_callback { // used in reform @@ -678,6 +712,8 @@ typedef struct st_dms_callback { dms_drc_buf_res_rebuild_parallel dms_reform_rebuild_parallel; dms_ctl_rcy_clean_parallel_t dms_ctl_rcy_clean_parallel; dms_check_if_build_complete check_if_build_complete; + dms_check_if_restore_recover check_if_restore_recover; + dms_reset_user reset_user; // used in reform for opengauss dms_thread_init_t dms_thread_init; @@ -780,6 +816,15 @@ typedef struct st_dms_callback { dms_drc_validate drc_validate; dms_db_check_lock db_check_lock; dms_cache_msg cache_msg; + dms_ckpt_enque_one_page ckpt_enque_one_page; + dms_set_remove_point set_remove_point; + dms_get_enable_checksum get_enable_checksum; + dms_calc_page_checksum calc_page_checksum; + dms_verify_page_checksum verify_page_checksum; + dms_update_node_oldest_xmin update_node_oldest_xmin; + + //for shared storage backup + dms_set_inst_behavior set_inst_behavior; } dms_callback_t; typedef struct st_dms_instance_net_addr { @@ -851,11 +896,19 @@ typedef enum en_dms_info_id { DMS_INFO_REFORM_LAST = 1, } dms_info_id_e; +typedef enum st_protocol_version { + PROTO_VER_0 = 0, // invalid version + PROTO_VER_1 = 1, // first version +} protocol_version_e; + +#define INVALID_PROTO_VER PROTO_VER_0 +#define SW_PROTO_VER PROTO_VER_1 + #define DMS_LOCAL_MAJOR_VER_WEIGHT 1000000 #define DMS_LOCAL_MINOR_VER_WEIGHT 1000 #define DMS_LOCAL_MAJOR_VERSION 0 #define DMS_LOCAL_MINOR_VERSION 0 -#define DMS_LOCAL_VERSION 70 +#define DMS_LOCAL_VERSION 88 #ifdef __cplusplus }