【资源池化】适配DMS报文兼容性

This commit is contained in:
dongning12
2023-08-17 22:23:32 +08:00
committed by chendong76
parent c7a3c843f0
commit e4e1cd745f
3 changed files with 73 additions and 14 deletions

View File

@ -971,9 +971,12 @@ static int32 CBProcessReleaseAllLock(uint32 len)
return res;
}
static int32 CBProcessBroadcast(void *db_handle, char *data, unsigned int len, char *output_msg,
uint32 *output_msg_len)
static int32 CBProcessBroadcast(void *db_handle, dms_broadcast_context_t *broad_ctx)
{
char *data = broad_ctx->data;
unsigned int len = broad_ctx->len;
char *output_msg = broad_ctx->output_msg;
unsigned int *output_msg_len = broad_ctx->output_msg_len;
int32 ret = DMS_SUCCESS;
SSBroadcastOp bcast_op = *(SSBroadcastOp *)data;
@ -1034,8 +1037,10 @@ static int32 CBProcessBroadcast(void *db_handle, char *data, unsigned int len, c
return ret;
}
static int32 CBProcessBroadcastAck(void *db_handle, char *data, unsigned int len)
static int32 CBProcessBroadcastAck(void *db_handle, dms_broadcast_context_t *broad_ctx)
{
char *data = broad_ctx->data;
unsigned int len = broad_ctx->len;
int32 ret = DMS_SUCCESS;
SSBroadcastOpAck bcast_op = *(SSBroadcastOpAck *)data;

View File

@ -1,2 +1,3 @@
dms_commit_id=a6def3bbf3ff29d0289bcd02e14ece57bd6fbc6f
dss_commit_id=fcc80facb9e1ecc23df81a296de890de58bc0cf4
dms_commit_id=19301cb7b3dd7f959cf6269c6d2aa8c8f23804fa
dss_commit_id=45ae7916cbdda2b2e64c02811db30df565cf34fa
cbb_commit_id=41cadda4643656bcd856bd37d2b6908e44a5d46c

View File

@ -187,12 +187,24 @@ typedef struct st_dms_xmap_ctx {
unsigned int dest_id;
} dms_xmap_ctx_t;
typedef struct st_dms_context {
unsigned int inst_id; // current instance id
unsigned int sess_id; // current session id
unsigned int rmid; // current rm id
dms_session_e sess_type; // request page: recovery session flag
typedef struct st_dms_process_context {
void *db_handle;
unsigned int sess_id; // current session id
unsigned int rmid; // current rm id
unsigned int inst_id; // current instance id
} dms_process_context_t;
typedef struct st_dms_context {
union {
struct {
void *db_handle;
unsigned int sess_id; // current session id
unsigned int rmid; // current rm id
unsigned int inst_id; // current instance id
};
dms_process_context_t proc_ctx;
};
dms_session_e sess_type; // request page: recovery session flag
unsigned char is_try;
unsigned char type;
unsigned short len;
@ -274,6 +286,14 @@ typedef struct st_dms_edp_info {
};
} dms_edp_info_t;
typedef struct st_dms_broadcast_context {
char *data;
unsigned int len;
char *output_msg;
unsigned int *output_msg_len;
unsigned int msg_version;
} dms_broadcast_context_t;
typedef struct st_dms_buf_ctrl {
volatile unsigned char is_remote_dirty;
volatile unsigned char lock_mode; // used only in DMS, 0: Null, 1: Shared lock, 2: Exclusive lock
@ -505,6 +525,12 @@ typedef struct st_dcs_batch_buf {
unsigned int max_count;
} dcs_batch_buf_t;
typedef enum en_dms_inst_behavior {
DMS_INST_BEHAVIOR_IN_IDLE = 0,
DMS_INST_BEHAVIOR_IN_REFORM,
DMS_INST_BEHAVIOR_IN_BACKUP,
} dms_inst_behavior_t;
typedef int(*dms_get_list_stable)(void *db_handle, unsigned long long *list_stable, unsigned char *reformer_id);
typedef int(*dms_save_list_stable)(void *db_handle, unsigned long long list_stable, unsigned char reformer_id,
unsigned long long list_in, unsigned int save_ctrl);
@ -582,9 +608,8 @@ typedef char *(*dms_mem_alloc)(void *context, unsigned int size);
typedef void(*dms_mem_free)(void *context, void *ptr);
typedef void(*dms_mem_reset)(void *context);
// The maximum length of output_msg is 128 bytes.
typedef int (*dms_process_broadcast)(void *db_handle, char *data, unsigned int len, char *output_msg,
unsigned int *output_msg_len);
typedef int (*dms_process_broadcast_ack)(void *db_handle, char *data, unsigned int len);
typedef int (*dms_process_broadcast)(void *db_handle, dms_broadcast_context_t *broad_ctx);
typedef int (*dms_process_broadcast_ack)(void *db_handle, dms_broadcast_context_t *broad_ctx);
typedef int(*dms_get_txn_info)(void *db_handle, unsigned long long xid,
unsigned char is_scan, dms_txn_info_t *txn_info);
typedef int(*dms_get_opengauss_xid_csn)(void *db_handle, dms_opengauss_xid_csn_t *csn_req,
@ -611,12 +636,14 @@ typedef int (*dms_drc_buf_res_rebuild_parallel)(void *db_handle, unsigned char t
typedef int(*dms_ctl_rcy_clean_parallel_t)(void *db_handle, unsigned char thread_index, unsigned char thread_num);
typedef unsigned char(*dms_ckpt_session)(void *db_handle);
typedef void (*dms_check_if_build_complete)(void *db_handle, unsigned int *build_complete);
typedef void (*dms_check_if_restore_recover)(void *db_handle, unsigned int *rst_recover);
typedef int (*dms_db_is_primary)(void *db_handle);
typedef void (*dms_set_switchover_result)(void *db_handle, int result);
typedef void (*dms_set_db_role)(void *db_handle, unsigned char is_primary);
typedef int (*dms_mount_to_recovery)(void *db_handle, unsigned int *has_offline);
typedef int(*dms_get_open_status)(void *db_handle);
typedef void (*dms_reform_set_dms_role)(void *db_handle, unsigned int reformer_id);
typedef void (*dms_reset_user)(void *db_handle, unsigned long long list_in);
// for openGauss
typedef void (*dms_thread_init_t)(unsigned char need_startup, char **reg_data);
@ -650,6 +677,13 @@ typedef void (*dms_verify_page)(dms_buf_ctrl_t *buf_ctrl, char *new_page);
typedef int (*dms_drc_validate)(void *db_handle);
typedef int (*dms_db_check_lock)(void *db_handle);
typedef int (*dms_cache_msg)(void *db_handle, char* msg);
typedef void (*dms_ckpt_enque_one_page)(void *db_handle, dms_buf_ctrl_t *ctrl);
typedef int (*dms_set_remove_point)(void *db_handle, unsigned int node_id, void *curr_point);
typedef int (*dms_get_enable_checksum)(void *db_handle);
typedef unsigned int (*dms_calc_page_checksum)(void *db_handle, dms_buf_ctrl_t *ctrl, unsigned int page_size);
typedef int (*dms_verify_page_checksum)(void *db_handle, dms_buf_ctrl_t *ctrl, unsigned int page_size, int cks);
typedef int (*dms_update_node_oldest_xmin)(void *db_handle, unsigned char inst_id, unsigned long long oldest_xmin);
typedef void (*dms_set_inst_behavior)(void *db_handle, dms_inst_behavior_t inst_behavior);
typedef struct st_dms_callback {
// used in reform
@ -678,6 +712,8 @@ typedef struct st_dms_callback {
dms_drc_buf_res_rebuild_parallel dms_reform_rebuild_parallel;
dms_ctl_rcy_clean_parallel_t dms_ctl_rcy_clean_parallel;
dms_check_if_build_complete check_if_build_complete;
dms_check_if_restore_recover check_if_restore_recover;
dms_reset_user reset_user;
// used in reform for opengauss
dms_thread_init_t dms_thread_init;
@ -780,6 +816,15 @@ typedef struct st_dms_callback {
dms_drc_validate drc_validate;
dms_db_check_lock db_check_lock;
dms_cache_msg cache_msg;
dms_ckpt_enque_one_page ckpt_enque_one_page;
dms_set_remove_point set_remove_point;
dms_get_enable_checksum get_enable_checksum;
dms_calc_page_checksum calc_page_checksum;
dms_verify_page_checksum verify_page_checksum;
dms_update_node_oldest_xmin update_node_oldest_xmin;
//for shared storage backup
dms_set_inst_behavior set_inst_behavior;
} dms_callback_t;
typedef struct st_dms_instance_net_addr {
@ -851,11 +896,19 @@ typedef enum en_dms_info_id {
DMS_INFO_REFORM_LAST = 1,
} dms_info_id_e;
typedef enum st_protocol_version {
PROTO_VER_0 = 0, // invalid version
PROTO_VER_1 = 1, // first version
} protocol_version_e;
#define INVALID_PROTO_VER PROTO_VER_0
#define SW_PROTO_VER PROTO_VER_1
#define DMS_LOCAL_MAJOR_VER_WEIGHT 1000000
#define DMS_LOCAL_MINOR_VER_WEIGHT 1000
#define DMS_LOCAL_MAJOR_VERSION 0
#define DMS_LOCAL_MINOR_VERSION 0
#define DMS_LOCAL_VERSION 70
#define DMS_LOCAL_VERSION 88
#ifdef __cplusplus
}