!2036 openGauss多级缓存管理

Merge pull request !2036 from 阙鸣健/master
This commit is contained in:
opengauss-bot
2022-09-08 01:43:47 +00:00
committed by Gitee
26 changed files with 1378 additions and 316 deletions

View File

@ -152,7 +152,7 @@ static Acl* ProcAclDefault(Oid ownerId)
* @in c: character.
* @return: True or false.
*/
static bool check_special_character(char c)
bool check_special_character(char c)
{
switch (c) {
case ' ':

View File

@ -911,6 +911,19 @@ static void InitStorageConfigureNamesBool()
NULL,
NULL,
NULL},
{{"enable_nvm",
PGC_POSTMASTER,
NODE_ALL,
DEVELOPER_OPTIONS,
gettext_noop("Enable nvm buffer manager."),
NULL},
&g_instance.attr.attr_storage.nvm_attr.enable_nvm,
false,
NULL,
NULL,
NULL},
#ifdef USE_ASSERT_CHECKING
{{"enable_segment",
PGC_SIGHUP,
@ -1045,6 +1058,20 @@ static void InitStorageConfigureNamesInt()
NULL,
NULL,
NULL},
{{"nvm_buffers",
PGC_POSTMASTER,
NODE_ALL,
RESOURCES_MEM,
gettext_noop("Sets the number of shared memory buffers used by the server."),
NULL,
GUC_UNIT_BLOCKS},
&g_instance.attr.attr_storage.NNvmBuffers,
0,
0,
INT_MAX / 2,
NULL,
NULL,
NULL},
{{"segment_buffers",
PGC_POSTMASTER,
NODE_ALL,
@ -3371,7 +3398,32 @@ static void InitStorageConfigureNamesReal()
NULL,
NULL,
NULL},
{{"bypass_dram",
PGC_SIGHUP,
NODE_ALL,
DEVELOPER_OPTIONS,
gettext_noop("bypass_dram."),
NULL},
&g_instance.attr.attr_storage.nvm_attr.bypassDram,
0.01,
0.0,
1.0,
NULL,
NULL,
NULL},
{{"bypass_nvm",
PGC_SIGHUP,
NODE_ALL,
DEVELOPER_OPTIONS,
gettext_noop("bypass_nvm."),
NULL},
&g_instance.attr.attr_storage.nvm_attr.bypassNvm,
0.5,
0.0,
1.0,
NULL,
NULL,
NULL},
/* End-of-list marker */
{{NULL,
(GucContext)0,
@ -3558,6 +3610,18 @@ static void InitStorageConfigureNamesString()
NULL,
NULL},
{{"nvm_file_path",
PGC_POSTMASTER,
NODE_SINGLENODE,
DEVELOPER_OPTIONS,
gettext_noop("Sets file path of nvm."),
NULL},
&g_instance.attr.attr_storage.nvm_attr.nvm_file_path,
"",
check_nvm_path,
NULL,
NULL},
#ifndef ENABLE_MULTIPLE_NODES
{{"dcf_config",
PGC_POSTMASTER,

View File

@ -182,6 +182,7 @@ list(APPEND gaussdb_objects
$<TARGET_OBJECTS:gausskernel_storage_replication_heartbeat>
$<TARGET_OBJECTS:gausskernel_storage_replication>
$<TARGET_OBJECTS:gausskernel_storage_buffer>
$<TARGET_OBJECTS:gausskernel_storage_nvm>
$<TARGET_OBJECTS:gausskernel_storage_cmgr>
$<TARGET_OBJECTS:gausskernel_storage_cstore_compression>
$<TARGET_OBJECTS:gausskernel_storage_cstore>

View File

@ -113,7 +113,7 @@ static void setup_bgwriter_signalhook(void)
sigdelset(&t_thrd.libpq_cxt.BlockSig, SIGQUIT);
}
static void bgwriter_handle_exceptions(WritebackContext wb_context, MemoryContext bgwriter_cxt)
static void bgwriter_handle_exceptions(WritebackContext *wb_context, MemoryContext bgwriter_cxt)
{
/*
* Close all open files after any error. This is helpful on Windows,
@ -168,7 +168,7 @@ static void bgwriter_handle_exceptions(WritebackContext wb_context, MemoryContex
MemoryContextResetAndDeleteChildren(bgwriter_cxt);
/* re-initialize to avoid repeated errors causing problems */
WritebackContextInit(&wb_context, &u_sess->attr.attr_storage.bgwriter_flush_after);
WritebackContextInit(wb_context, &u_sess->attr.attr_storage.bgwriter_flush_after);
/* Now we can allow interrupts again */
RESUME_INTERRUPTS();
@ -237,7 +237,7 @@ void BackgroundWriterMain(void)
int* oldTryCounter = NULL;
if (sigsetjmp(local_sigjmp_buf, 1) != 0) {
gstrace_tryblock_exit(true, oldTryCounter);
bgwriter_handle_exceptions(wb_context, bgwriter_context);
bgwriter_handle_exceptions(&wb_context, bgwriter_context);
/* Report wait end here, when there is no further possibility of wait */
pgstat_report_waitevent(WAIT_EVENT_END);
@ -530,7 +530,8 @@ Datum bgwriter_view_get_last_flush_num()
Datum bgwriter_view_get_candidate_nums()
{
int candidate_num = get_curr_candidate_nums(true) + get_curr_candidate_nums(false);
int candidate_num = get_curr_candidate_nums(CAND_LIST_NORMAL) + get_curr_candidate_nums(CAND_LIST_NVM) +
get_curr_candidate_nums(CAND_LIST_SEG);
return Int32GetDatum(candidate_num);
}
@ -586,7 +587,7 @@ void invalid_buffer_bgwriter_main()
if (sigsetjmp(localSigjmpBuf, 1) != 0) {
ereport(WARNING, (errmsg("invalidate buffer bgwriter exception occured.")));
bgwriter_handle_exceptions(wb_context, bgwriter_context);
bgwriter_handle_exceptions(&wb_context, bgwriter_context);
}
/* We can now handle ereport(ERROR) */

View File

@ -84,18 +84,16 @@ static void ckpt_try_prune_dirty_page_queue();
/* candidate buffer list handle function */
static uint32 calculate_pagewriter_flush_num();
static void candidate_buf_push(int buf_id, int thread_id);
static void seg_candidate_buf_push(int buf_id, int thread_id);
static void candidate_buf_push(CandidateList *list, int buf_id);
static void init_candidate_list();
static uint32 incre_ckpt_pgwr_flush_dirty_page(WritebackContext wb_context,
static uint32 incre_ckpt_pgwr_flush_dirty_page(WritebackContext *wb_context,
const CkptSortItem *dirty_buf_list, int start, int batch_num);
static void incre_ckpt_pgwr_flush_dirty_queue(WritebackContext wb_context);
static void incre_ckpt_pgwr_scan_buf_pool(WritebackContext wb_context);
static void incre_ckpt_pgwr_flush_dirty_queue(WritebackContext *wb_context);
static void incre_ckpt_pgwr_scan_buf_pool(WritebackContext *wb_context);
static void push_to_candidate_list(BufferDesc *buf_desc);
static uint32 get_candidate_buf_and_flush_list(uint32 start, uint32 end, uint32 max_flush_num,
bool *contain_hashbucket);
static int64 get_thread_candidate_nums(int thread_id);
static int64 get_thread_seg_candidate_nums(int thread_id);
static int64 get_thread_candidate_nums(CandidateList *list);
const int XLOG_LSN_SWAP = 32;
Datum ckpt_view_get_node_name()
@ -210,7 +208,7 @@ Datum ckpt_view_get_twophase_flush_num()
Datum ckpt_view_get_candidate_nums()
{
int candidate_num = get_curr_candidate_nums(false);
int candidate_num = get_curr_candidate_nums(CAND_LIST_NORMAL);
return Int32GetDatum(candidate_num);
}
@ -226,7 +224,7 @@ Datum ckpt_view_get_num_clock_sweep()
Datum ckpt_view_get_seg_candidate_nums()
{
int candidate_num = get_curr_candidate_nums(true);
int candidate_num = get_curr_candidate_nums(CAND_LIST_SEG);
return Int32GetDatum(candidate_num);
}
@ -354,45 +352,38 @@ void candidate_buf_init(void)
static void init_candidate_list()
{
int thread_num = g_instance.ckpt_cxt_ctl->pgwr_procs.sub_num;
int normal_avg_num = SharedBufferNumber / thread_num;
int seg_avr_num = SEGMENT_BUFFER_NUM / thread_num;
int normal_avg_num = NORMAL_SHARED_BUFFER_NUM / thread_num;
int nvm_avg_num = NVM_BUFFER_NUM / thread_num;
int seg_avg_num = SEGMENT_BUFFER_NUM / thread_num;
PageWriterProc *pgwr = NULL;
Buffer *cand_buffers = g_instance.ckpt_cxt_ctl->candidate_buffers;
/* Init main thread, the candidate list only store segment buffer */
pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[0];
pgwr->cand_buf_list = NULL;
pgwr->cand_list_size = 0;
pgwr->head = 0;
pgwr->tail = 0;
pgwr->seg_head = 0;
pgwr->seg_tail = 0;
pgwr->seg_cand_list_size = 0;
pgwr->seg_cand_buf_list = NULL;
INIT_CANDIDATE_LIST(pgwr->normal_list, NULL, 0, 0, 0);
INIT_CANDIDATE_LIST(pgwr->nvm_list, NULL, 0, 0, 0);
INIT_CANDIDATE_LIST(pgwr->seg_list, NULL, 0, 0, 0);
for (int i = 1; i <= thread_num; i++) {
pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[i];
int start = normal_avg_num * (i - 1);
int end = start + normal_avg_num;
int seg_start = SegmentBufferStartID + seg_avr_num * (i - 1);
int seg_end = seg_start + seg_avr_num;
int nvm_start = NvmBufferStartID + nvm_avg_num * (i - 1);
int nvm_end = nvm_start + nvm_avg_num;
int seg_start = SegmentBufferStartID + seg_avg_num * (i - 1);
int seg_end = seg_start + seg_avg_num;
if (i == thread_num) {
end += SharedBufferNumber % thread_num;
end += NORMAL_SHARED_BUFFER_NUM % thread_num;
nvm_end += NVM_BUFFER_NUM % thread_num;
seg_end += SEGMENT_BUFFER_NUM % thread_num;
}
/* init normal candidat list */
pgwr->head = 0;
pgwr->tail = 0;
pgwr->buf_id_start = start;
pgwr->cand_list_size = end - start;
pgwr->cand_buf_list = &g_instance.ckpt_cxt_ctl->candidate_buffers[start];
/* init segment candidat list */
pgwr->seg_head = 0;
pgwr->seg_tail = 0;
pgwr->seg_cand_list_size = seg_end - seg_start;
pgwr->seg_cand_buf_list = &g_instance.ckpt_cxt_ctl->candidate_buffers[seg_start];
pgwr->seg_id_start = seg_start;
INIT_CANDIDATE_LIST(pgwr->normal_list, &cand_buffers[start], end - start, 0, 0);
INIT_CANDIDATE_LIST(pgwr->nvm_list, &cand_buffers[nvm_start], nvm_end - nvm_start, 0, 0);
INIT_CANDIDATE_LIST(pgwr->seg_list, &cand_buffers[seg_start], seg_end - seg_start, 0, 0);
pgwr->normal_list.buf_id_start = start;
pgwr->nvm_list.buf_id_start = nvm_start;
pgwr->seg_list.buf_id_start = seg_start;
}
}
@ -992,7 +983,7 @@ static uint32 calculate_pagewriter_flush_num()
counter = 0;
}
dirty_page_pct = g_instance.ckpt_cxt_ctl->actual_dirty_page_num / (float)(g_instance.attr.attr_storage.NBuffers);
dirty_page_pct = g_instance.ckpt_cxt_ctl->actual_dirty_page_num / (float)(SegmentBufferStartID);
dirty_slot_pct = get_dirty_page_num() / (float)(g_instance.ckpt_cxt_ctl->dirty_page_queue_size);
dirty_percent = MAX(dirty_page_pct, dirty_slot_pct) / u_sess->attr.attr_storage.dirty_page_percent_max;
@ -1264,7 +1255,8 @@ static void ckpt_pagewriter_main_thread_loop(void)
HandlePageWriterMainInterrupts();
candidate_num = get_curr_candidate_nums(false) + get_curr_candidate_nums(true);
candidate_num = get_curr_candidate_nums(CAND_LIST_NORMAL) + get_curr_candidate_nums(CAND_LIST_NVM) +
get_curr_candidate_nums(CAND_LIST_SEG);
while (get_dirty_page_num() == 0 && candidate_num == (uint32)TOTAL_BUFFER_NUM &&
!t_thrd.pagewriter_cxt.shutdown_requested) {
rc = WaitLatch(&t_thrd.proc->procLatch, WL_TIMEOUT | WL_POSTMASTER_DEATH, (long)TEN_MILLISECOND);
@ -1277,7 +1269,8 @@ static void ckpt_pagewriter_main_thread_loop(void)
HandlePageWriterMainInterrupts();
candidate_num = get_curr_candidate_nums(false) + get_curr_candidate_nums(true);
candidate_num = get_curr_candidate_nums(CAND_LIST_NORMAL) + get_curr_candidate_nums(CAND_LIST_NVM) +
get_curr_candidate_nums(CAND_LIST_SEG);
if (candidate_num == 0) {
/* wakeup sub thread scan the buffer pool, init the candidate list */
wakeup_sub_thread();
@ -1390,7 +1383,7 @@ static void ckpt_pagewriter_sub_thread_loop()
/* scan buffer pool, get flush list and candidate list */
now = get_time_ms();
if (t_thrd.pagewriter_cxt.next_scan_time <= now) {
incre_ckpt_pgwr_scan_buf_pool(wb_context);
incre_ckpt_pgwr_scan_buf_pool(&wb_context);
now = get_time_ms();
t_thrd.pagewriter_cxt.next_scan_time = now +
MAX(u_sess->attr.attr_storage.BgWriterDelay, u_sess->attr.attr_storage.pageWriterSleep);
@ -1412,7 +1405,7 @@ static void ckpt_pagewriter_sub_thread_loop()
/* flush one batch dirty pages */
ResourceOwnerEnlargeBuffers(t_thrd.utils_cxt.CurrentResourceOwner);
incre_ckpt_pgwr_flush_dirty_queue(wb_context);
incre_ckpt_pgwr_flush_dirty_queue(&wb_context);
/* add up completed pages */
completed_pages = pg_atomic_add_fetch_u32(
@ -1952,7 +1945,7 @@ static void ckpt_try_skip_invalid_elem_in_queue_head()
return;
}
static uint32 incre_ckpt_pgwr_flush_dirty_page(WritebackContext wb_context,
static uint32 incre_ckpt_pgwr_flush_dirty_page(WritebackContext *wb_context,
const CkptSortItem *dirty_buf_list, int start, int batch_num)
{
uint32 num_actual_flush = 0;
@ -1972,7 +1965,7 @@ static uint32 incre_ckpt_pgwr_flush_dirty_page(WritebackContext wb_context,
if ((buf_state & BM_CHECKPOINT_NEEDED) && (buf_state & BM_DIRTY)) {
UnlockBufHdr(buf_desc, buf_state);
sync_state = SyncOneBuffer(buf_id, false, &wb_context, true);
sync_state = SyncOneBuffer(buf_id, false, wb_context, true);
if ((sync_state & BUF_WRITTEN)) {
num_actual_flush++;
}
@ -1983,7 +1976,7 @@ static uint32 incre_ckpt_pgwr_flush_dirty_page(WritebackContext wb_context,
return num_actual_flush;
}
static void incre_ckpt_pgwr_flush_dirty_queue(WritebackContext wb_context)
static void incre_ckpt_pgwr_flush_dirty_queue(WritebackContext *wb_context)
{
int thread_id = t_thrd.pagewriter_cxt.pagewriter_id;
PageWriterProc* pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id];
@ -2022,7 +2015,7 @@ static void incre_ckpt_pgwr_flush_dirty_queue(WritebackContext wb_context)
return;
}
static void incre_ckpt_pgwr_flush_dirty_list(WritebackContext wb_context, uint32 need_flush_num,
static void incre_ckpt_pgwr_flush_dirty_list(WritebackContext *wb_context, uint32 need_flush_num,
bool is_new_relfilenode)
{
int thread_id = t_thrd.pagewriter_cxt.pagewriter_id;
@ -2111,15 +2104,18 @@ static bool check_buffer_dirty_flag(BufferDesc* buf_desc)
return false;
}
static uint32 get_list_flush_max_num(bool is_segment)
static uint32 get_list_flush_max_num(CandListType type)
{
int thread_num = g_instance.ckpt_cxt_ctl->pgwr_procs.sub_num;
uint32 max_io = g_instance.ckpt_cxt_ctl->pgwr_procs.list_flush_max / thread_num;
uint32 dirty_list_size = MAX_DIRTY_LIST_FLUSH_NUM / thread_num;
if (is_segment) {
if (type == CAND_LIST_SEG) {
double seg_percent = ((double)(SEGMENT_BUFFER_NUM) / (double)(TOTAL_BUFFER_NUM));
max_io = max_io * seg_percent;
} else if (type == CAND_LIST_NVM) {
double nvm_percent = ((double)(NVM_BUFFER_NUM) / (double)(TOTAL_BUFFER_NUM));
max_io = max_io * nvm_percent;
} else {
double buffer_percent = ((double)(NORMAL_SHARED_BUFFER_NUM) / (double)(TOTAL_BUFFER_NUM));
max_io = max_io * buffer_percent;
@ -2131,7 +2127,7 @@ static uint32 get_list_flush_max_num(bool is_segment)
}
const float GAP_PERCENT = 0.15;
static uint32 get_list_flush_num(bool is_segment)
static uint32 get_list_flush_num(CandListType type)
{
double percent_target = u_sess->attr.attr_storage.candidate_buf_percent_target;
uint32 cur_candidate_num;
@ -2139,11 +2135,18 @@ static uint32 get_list_flush_num(bool is_segment)
uint32 high_water_mark;
uint32 flush_num = 0;
uint32 min_io = DW_DIRTY_PAGE_MAX_FOR_NOHBK;
uint32 max_io = get_list_flush_max_num(is_segment);
uint32 buffer_num = (is_segment ? SEGMENT_BUFFER_NUM : NORMAL_SHARED_BUFFER_NUM);
uint32 max_io = get_list_flush_max_num(type);
uint32 buffer_num;
if (type == CAND_LIST_SEG) {
buffer_num = SEGMENT_BUFFER_NUM;
} else if (type == CAND_LIST_NVM) {
buffer_num = NVM_BUFFER_NUM;
} else {
buffer_num = NORMAL_SHARED_BUFFER_NUM;
}
total_target = buffer_num * percent_target;
high_water_mark = buffer_num * (percent_target / HIGH_WATER);
cur_candidate_num = get_curr_candidate_nums(is_segment);
cur_candidate_num = get_curr_candidate_nums(type);
/* If the slots are sufficient, the standby DN does not need to flush too many pages. */
if (RecoveryInProgress() && cur_candidate_num >= total_target / 2) {
@ -2173,7 +2176,8 @@ static uint32 get_list_flush_num(bool is_segment)
* then scan the segment buffer pool.
*/
const int MAX_SCAN_BATCH_NUM = 131072 * 10; /* 10GB buffers */
static void incre_ckpt_pgwr_scan_buf_pool(WritebackContext wb_context)
static void incre_ckpt_pgwr_scan_candidate_list(WritebackContext *wb_context, CandidateList *list, CandListType type)
{
int ratio_buf_id_start;
int ratio_cand_list_size;
@ -2181,7 +2185,6 @@ static void incre_ckpt_pgwr_scan_buf_pool(WritebackContext wb_context)
int ratio_shared_buffers;
int thread_num = g_instance.ckpt_cxt_ctl->pgwr_procs.sub_num;
int thread_id = t_thrd.pagewriter_cxt.pagewriter_id;
PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id];
uint32 need_flush_num = 0;
int start = 0;
int end = 0;
@ -2189,12 +2192,17 @@ static void incre_ckpt_pgwr_scan_buf_pool(WritebackContext wb_context)
int batch_scan_num = 0;
uint32 max_flush_num = 0;
bool am_standby = g_instance.ckpt_cxt_ctl->is_standby_mode;
int normal_shared_buffers = SharedBufferNumber;
int shared_buffers;
/* handle the normal buffer pool */
if (get_thread_candidate_nums(thread_id) < pgwr->cand_list_size) {
if (am_standby) {
ratio_shared_buffers = int(normal_shared_buffers * u_sess->attr.attr_storage.shared_buffers_fraction);
if (type == CAND_LIST_NVM) {
shared_buffers = NVM_BUFFER_NUM;
} else if (type == CAND_LIST_NORMAL) {
shared_buffers = NORMAL_SHARED_BUFFER_NUM;
}
if (get_thread_candidate_nums(list) < list->cand_list_size) {
if (am_standby && type != CAND_LIST_SEG) {
ratio_shared_buffers = int(shared_buffers * u_sess->attr.attr_storage.shared_buffers_fraction);
ratio_avg_num = ratio_shared_buffers / thread_num;
ratio_buf_id_start = ratio_avg_num * (thread_id - 1);
ratio_cand_list_size = ratio_avg_num;
@ -2202,30 +2210,30 @@ static void incre_ckpt_pgwr_scan_buf_pool(WritebackContext wb_context)
ratio_cand_list_size += ratio_shared_buffers % thread_num;
}
start = MAX(ratio_buf_id_start, pgwr->next_scan_ratio_loc);
start = MAX(ratio_buf_id_start, list->next_scan_ratio_loc);
end = ratio_buf_id_start + ratio_cand_list_size;
batch_scan_num = MIN(ratio_cand_list_size, MAX_SCAN_BATCH_NUM);
} else {
start = MAX(pgwr->buf_id_start, pgwr->next_scan_normal_loc);
end = pgwr->buf_id_start + pgwr->cand_list_size;
batch_scan_num = MIN(pgwr->cand_list_size, MAX_SCAN_BATCH_NUM);
start = MAX(list->buf_id_start, list->next_scan_loc);
end = list->buf_id_start + list->cand_list_size;
batch_scan_num = MIN(list->cand_list_size, MAX_SCAN_BATCH_NUM);
}
end = MIN(start + batch_scan_num, end);
max_flush_num = get_list_flush_num(false);
max_flush_num = get_list_flush_num(type);
need_flush_num = get_candidate_buf_and_flush_list(start, end, max_flush_num, &is_new_relfilenode);
if (am_standby) {
if (am_standby && type != CAND_LIST_SEG) {
if (end >= ratio_buf_id_start + ratio_cand_list_size) {
pgwr->next_scan_ratio_loc = ratio_buf_id_start;
list->next_scan_ratio_loc = ratio_buf_id_start;
} else {
pgwr->next_scan_ratio_loc = end;
list->next_scan_ratio_loc = end;
}
} else {
if (end >= pgwr->buf_id_start + pgwr->cand_list_size) {
pgwr->next_scan_normal_loc = pgwr->buf_id_start;
if (end >= list->buf_id_start + list->cand_list_size) {
list->next_scan_loc = list->buf_id_start;
} else {
pgwr->next_scan_normal_loc = end;
list->next_scan_loc = end;
}
}
@ -2233,25 +2241,17 @@ static void incre_ckpt_pgwr_scan_buf_pool(WritebackContext wb_context)
incre_ckpt_pgwr_flush_dirty_list(wb_context, need_flush_num, is_new_relfilenode);
}
}
}
/* handle the segment buffer pool */
if (get_thread_seg_candidate_nums(thread_id) < pgwr->seg_cand_list_size) {
start = MAX(pgwr->seg_id_start, pgwr->next_scan_seg_loc);
end = pgwr->seg_id_start + pgwr->seg_cand_list_size;
batch_scan_num = MIN(pgwr->seg_cand_list_size, MAX_SCAN_BATCH_NUM);
end = MIN(start + batch_scan_num, end);
max_flush_num = get_list_flush_num(true);
static void incre_ckpt_pgwr_scan_buf_pool(WritebackContext *wb_context)
{
int thread_id = t_thrd.pagewriter_cxt.pagewriter_id;
PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id];
need_flush_num = get_candidate_buf_and_flush_list(start, end, max_flush_num, &is_new_relfilenode);
if (end >= pgwr->seg_id_start + pgwr->seg_cand_list_size) {
pgwr->next_scan_seg_loc = pgwr->seg_id_start;
} else {
pgwr->next_scan_seg_loc = end;
}
if (need_flush_num > 0) {
incre_ckpt_pgwr_flush_dirty_list(wb_context, need_flush_num, is_new_relfilenode);
}
}
/* handle the normal\nvm\segment buffer pool */
incre_ckpt_pgwr_scan_candidate_list(wb_context, &pgwr->normal_list, CAND_LIST_NORMAL);
incre_ckpt_pgwr_scan_candidate_list(wb_context, &pgwr->nvm_list, CAND_LIST_NVM);
incre_ckpt_pgwr_scan_candidate_list(wb_context, &pgwr->seg_list, CAND_LIST_SEG);
return;
}
@ -2314,10 +2314,12 @@ static uint32 get_candidate_buf_and_flush_list(uint32 start, uint32 end, uint32
/* Not dirty, put directly into flushed candidates */
if (!(local_buf_state & BM_DIRTY)) {
if (g_instance.ckpt_cxt_ctl->candidate_free_map[buf_id] == false) {
if (buf_id < (uint32)SegmentBufferStartID) {
candidate_buf_push(buf_id, thread_id);
if (buf_id < (uint32)NvmBufferStartID) {
candidate_buf_push(&pgwr->normal_list, buf_id);
} else if (buf_id < (uint32)SegmentBufferStartID) {
candidate_buf_push(&pgwr->nvm_list, buf_id);
} else {
seg_candidate_buf_push(buf_id, thread_id);
candidate_buf_push(&pgwr->seg_list, buf_id);
}
g_instance.ckpt_cxt_ctl->candidate_free_map[buf_id] = true;
candidates++;
@ -2359,6 +2361,7 @@ UNLOCK:
static void push_to_candidate_list(BufferDesc *buf_desc)
{
uint32 thread_id = t_thrd.pagewriter_cxt.pagewriter_id;
PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id];
int buf_id = buf_desc->buf_id;
uint32 buf_state = pg_atomic_read_u32(&buf_desc->state);
bool emptyUsageCount = (!NEED_CONSIDER_USECOUNT || BUF_STATE_GET_USAGECOUNT(buf_state) == 0);
@ -2372,10 +2375,12 @@ static void push_to_candidate_list(BufferDesc *buf_desc)
if (g_instance.ckpt_cxt_ctl->candidate_free_map[buf_id] == false) {
emptyUsageCount = (!NEED_CONSIDER_USECOUNT || BUF_STATE_GET_USAGECOUNT(buf_state) == 0);
if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 && emptyUsageCount && !(buf_state & BM_DIRTY)) {
if (buf_id < SegmentBufferStartID) {
candidate_buf_push(buf_id, thread_id);
if (buf_id < NvmBufferStartID) {
candidate_buf_push(&pgwr->normal_list, buf_id);
} else if (buf_id < SegmentBufferStartID) {
candidate_buf_push(&pgwr->nvm_list, buf_id);
} else {
seg_candidate_buf_push(buf_id, thread_id);
candidate_buf_push(&pgwr->seg_list, buf_id);
}
g_instance.ckpt_cxt_ctl->candidate_free_map[buf_id] = true;
}
@ -2391,42 +2396,22 @@ static void push_to_candidate_list(BufferDesc *buf_desc)
* @in: buf_id, buffer id which need push to the list
* @in: thread_id, pagewriter thread id.
*/
static void candidate_buf_push(int buf_id, int thread_id)
static void candidate_buf_push(CandidateList *list, int buf_id)
{
PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id];
uint32 list_size = pgwr->cand_list_size;
uint32 list_size = list->cand_list_size;
uint32 tail_loc;
pg_memory_barrier();
volatile uint64 head = pg_atomic_read_u64(&pgwr->head);
volatile uint64 head = pg_atomic_read_u64(&list->head);
pg_memory_barrier();
volatile uint64 tail = pg_atomic_read_u64(&pgwr->tail);
volatile uint64 tail = pg_atomic_read_u64(&list->tail);
if (unlikely(tail - head >= list_size)) {
return;
}
tail_loc = tail % list_size;
pgwr->cand_buf_list[tail_loc] = buf_id;
(void)pg_atomic_fetch_add_u64(&pgwr->tail, 1);
}
static void seg_candidate_buf_push(int buf_id, int thread_id)
{
PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id];
uint32 list_size = pgwr->seg_cand_list_size;
uint32 tail_loc;
pg_memory_barrier();
volatile uint64 head = pg_atomic_read_u64(&pgwr->seg_head);
pg_memory_barrier();
volatile uint64 tail = pg_atomic_read_u64(&pgwr->seg_tail);
if (unlikely(tail - head >= list_size)) {
return;
}
tail_loc = tail % list_size;
pgwr->seg_cand_buf_list[tail_loc] = buf_id;
(void)pg_atomic_fetch_add_u64(&pgwr->seg_tail, 1);
list->cand_buf_list[tail_loc] = buf_id;
(void)pg_atomic_fetch_add_u64(&list->tail, 1);
}
/**
@ -2434,71 +2419,34 @@ static void seg_candidate_buf_push(int buf_id, int thread_id)
* @in: buf_id, store the buffer id from the list.
* @in: thread_id, pagewriter thread id
*/
bool candidate_buf_pop(int *buf_id, int thread_id)
bool candidate_buf_pop(CandidateList *list, int *buf_id)
{
PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id];
uint32 list_size = pgwr->cand_list_size;
uint32 list_size = list->cand_list_size;
uint32 head_loc;
while (true) {
pg_memory_barrier();
uint64 head = pg_atomic_read_u64(&pgwr->head);
uint64 head = pg_atomic_read_u64(&list->head);
pg_memory_barrier();
volatile uint64 tail = pg_atomic_read_u64(&pgwr->tail);
volatile uint64 tail = pg_atomic_read_u64(&list->tail);
if (unlikely(head >= tail)) {
return false; /* candidate list is empty */
}
head_loc = head % list_size;
*buf_id = pgwr->cand_buf_list[head_loc];
if (pg_atomic_compare_exchange_u64(&pgwr->head, &head, head + 1)) {
*buf_id = list->cand_buf_list[head_loc];
if (pg_atomic_compare_exchange_u64(&list->head, &head, head + 1)) {
return true;
}
}
}
bool seg_candidate_buf_pop(int *buf_id, int thread_id)
static int64 get_thread_candidate_nums(CandidateList *list)
{
PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id];
uint32 list_size = pgwr->seg_cand_list_size;
uint32 head_loc;
while (true) {
pg_memory_barrier();
uint64 head = pg_atomic_read_u64(&pgwr->seg_head);
pg_memory_barrier();
volatile uint64 tail = pg_atomic_read_u64(&pgwr->seg_tail);
if (unlikely(head >= tail)) {
return false; /* candidate list is empty */
}
head_loc = head % list_size;
*buf_id = pgwr->seg_cand_buf_list[head_loc];
if (pg_atomic_compare_exchange_u64(&pgwr->seg_head, &head, head + 1)) {
return true;
}
}
}
static int64 get_thread_candidate_nums(int thread_id)
{
PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id];
volatile uint64 head = pg_atomic_read_u64(&pgwr->head);
volatile uint64 head = pg_atomic_read_u64(&list->head);
pg_memory_barrier();
volatile uint64 tail = pg_atomic_read_u64(&pgwr->tail);
int64 curr_cand_num = tail - head;
Assert(curr_cand_num >= 0);
return curr_cand_num;
}
static int64 get_thread_seg_candidate_nums(int thread_id)
{
PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id];
volatile uint64 head = pg_atomic_read_u64(&pgwr->seg_head);
pg_memory_barrier();
volatile uint64 tail = pg_atomic_read_u64(&pgwr->seg_tail);
volatile uint64 tail = pg_atomic_read_u64(&list->tail);
int64 curr_cand_num = tail - head;
Assert(curr_cand_num >= 0);
return curr_cand_num;
@ -2507,25 +2455,25 @@ static int64 get_thread_seg_candidate_nums(int thread_id)
/**
* @Description: Return a rough estimate of the current number of buffers in the candidate list.
*/
uint32 get_curr_candidate_nums(bool segment)
uint32 get_curr_candidate_nums(CandListType type)
{
uint32 currCandidates = 0;
PageWriterProc *pgwr = NULL;
if (segment) {
for (int i = 1; i < g_instance.ckpt_cxt_ctl->pgwr_procs.num; i++) {
pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[i];
if (pgwr->proc != NULL) {
currCandidates += get_thread_seg_candidate_nums(i);
}
}
return currCandidates;
}
CandidateList *list = NULL;
for (int i = 1; i < g_instance.ckpt_cxt_ctl->pgwr_procs.num; i++) {
pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[i];
if (type == CAND_LIST_NORMAL) {
list = &pgwr->normal_list;
} else if (type == CAND_LIST_NVM) {
list = &pgwr->nvm_list;
} else {
list = &pgwr->seg_list;
}
if (pgwr->proc != NULL) {
currCandidates += get_thread_candidate_nums(i);
currCandidates += get_thread_candidate_nums(list);
}
}
return currCandidates;

View File

@ -1309,6 +1309,7 @@ static void knl_t_storage_init(knl_t_storage_context* storage_cxt)
storage_cxt->BufferDescriptors = NULL;
storage_cxt->BufferBlocks = NULL;
storage_cxt->NvmBufferBlocks = NULL;
storage_cxt->BackendWritebackContext = (WritebackContext*)palloc0(sizeof(WritebackContext));
storage_cxt->SharedBufHash = NULL;
storage_cxt->InProgressBuf = NULL;

View File

@ -12,7 +12,6 @@ set(CMAKE_MODULE_PATH
${CMAKE_CURRENT_SOURCE_DIR}/cstore
${CMAKE_CURRENT_SOURCE_DIR}/replication
${CMAKE_CURRENT_SOURCE_DIR}/sync
${CMAKE_CURRENT_SOURCE_DIR}/file
${CMAKE_CURRENT_SOURCE_DIR}/freespace
${CMAKE_CURRENT_SOURCE_DIR}/ipc
@ -21,6 +20,7 @@ set(CMAKE_MODULE_PATH
${CMAKE_CURRENT_SOURCE_DIR}/page
${CMAKE_CURRENT_SOURCE_DIR}/remote
${CMAKE_CURRENT_SOURCE_DIR}/smgr
${CMAKE_CURRENT_SOURCE_DIR}/nvm
${CMAKE_CURRENT_SOURCE_DIR}/tcap
${CMAKE_CURRENT_SOURCE_DIR}/xlog_share_storage
${CMAKE_CURRENT_SOURCE_DIR}/dorado_operation
@ -49,6 +49,7 @@ add_subdirectory(lmgr)
add_subdirectory(page)
add_subdirectory(remote)
add_subdirectory(smgr)
add_subdirectory(nvm)
add_subdirectory(tcap)
add_subdirectory(xlog_share_storage)
add_subdirectory(dorado_operation)

View File

@ -25,7 +25,7 @@ subdir = src/gausskernel/storage
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
SUBDIRS = access bulkload replication buffer cmgr cstore file freespace ipc large_object lmgr page remote smgr tcap sync dorado_operation xlog_share_storage
SUBDIRS = access bulkload replication buffer cmgr cstore file freespace ipc large_object lmgr page remote smgr nvm tcap sync dorado_operation xlog_share_storage
ifeq ($(enable_mot), yes)
SUBDIRS += mot

View File

@ -18,6 +18,7 @@
#include "gs_bbox.h"
#include "storage/buf/bufmgr.h"
#include "storage/buf/buf_internals.h"
#include "storage/nvm/nvm.h"
#include "storage/ipc.h"
#include "storage/cucache_mgr.h"
#include "pgxc/pgxc.h"
@ -80,14 +81,18 @@ void InitBufferPool(void)
candidate_buf_init();
#ifdef __aarch64__
buffer_size = TOTAL_BUFFER_NUM * (Size)BLCKSZ + PG_CACHE_LINE_SIZE;
buffer_size = (TOTAL_BUFFER_NUM - NVM_BUFFER_NUM) * (Size)BLCKSZ + PG_CACHE_LINE_SIZE;
t_thrd.storage_cxt.BufferBlocks =
(char *)CACHELINEALIGN(ShmemInitStruct("Buffer Blocks", buffer_size, &found_bufs));
#else
buffer_size = TOTAL_BUFFER_NUM * (Size)BLCKSZ;
buffer_size = (TOTAL_BUFFER_NUM - NVM_BUFFER_NUM) * (Size)BLCKSZ;
t_thrd.storage_cxt.BufferBlocks = (char *)ShmemInitStruct("Buffer Blocks", buffer_size, &found_bufs);
#endif
if (g_instance.attr.attr_storage.nvm_attr.enable_nvm) {
nvm_init();
}
if (BBOX_BLACKLIST_SHARE_BUFFER) {
/* Segment Buffer is exclued from the black list, as it contains many critical information for debug */
bbox_blacklist_add(SHARED_BUFFER, t_thrd.storage_cxt.BufferBlocks, NORMAL_SHARED_BUFFER_NUM * (Size)BLCKSZ);
@ -185,7 +190,7 @@ Size BufferShmemSize(void)
size = add_size(size, PG_CACHE_LINE_SIZE);
/* size of data pages */
size = add_size(size, mul_size(TOTAL_BUFFER_NUM, BLCKSZ));
size = add_size(size, mul_size((NORMAL_SHARED_BUFFER_NUM + SEGMENT_BUFFER_NUM), BLCKSZ));
#ifdef __aarch64__
size = add_size(size, PG_CACHE_LINE_SIZE);
#endif

View File

@ -30,11 +30,6 @@
#include "gstrace/storage_gstrace.h"
extern uint32 hashquickany(uint32 seed, register const unsigned char *data, register int len);
/* entry for buffer lookup hashtable */
typedef struct {
BufferTag key; /* Tag of a disk page */
int id; /* Associated buffer ID */
} BufferLookupEnt;
/*
* Estimate space needed for mapping hashtable

View File

@ -61,6 +61,7 @@
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/smgr/segment.h"
#include "storage/nvm/nvm.h"
#include "storage/standby.h"
#include "utils/aiomem.h"
#include "utils/guc.h"
@ -353,8 +354,6 @@ void ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
}
static void BufferSync(int flags);
static uint32 WaitBufHdrUnlocked(BufferDesc* buf);
static void WaitIO(BufferDesc* buf);
static void TerminateBufferIO_common(BufferDesc* buf, bool clear_dirty, uint32 set_flag_bits);
void shared_buffer_write_error_callback(void* arg);
static BufferDesc* BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, BlockNumber blockNum,
@ -654,15 +653,7 @@ static volatile BufferDesc *PageListBufferAlloc(SMgrRelation smgr, char relpersi
old_hash = BufTableHashCode(&old_tag);
old_partition_lock = BufMappingPartitionLock(old_hash);
/* Must lock the lower-numbered partition first to avoid deadlocks. */
if (old_partition_lock < new_partition_lock) {
(void)LWLockAcquire(old_partition_lock, LW_EXCLUSIVE);
(void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE);
} else if (old_partition_lock > new_partition_lock) {
(void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE);
(void)LWLockAcquire(old_partition_lock, LW_EXCLUSIVE);
} else {
(void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE);
}
LockTwoLWLock(new_partition_lock, old_partition_lock);
} else {
/* if it wasn't valid, we need only the new partition */
(void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE);
@ -2378,7 +2369,7 @@ void SimpleMarkBufDirty(BufferDesc *buf)
}
static void PageCheckIfCanEliminate(BufferDesc *buf, uint32 *oldFlags, bool *needGetLock)
void PageCheckIfCanEliminate(BufferDesc *buf, uint32 *oldFlags, bool *needGetLock)
{
Block tmpBlock = BufHdrGetBlock(buf);
@ -2402,7 +2393,7 @@ static void PageCheckIfCanEliminate(BufferDesc *buf, uint32 *oldFlags, bool *nee
}
#ifdef USE_ASSERT_CHECKING
static void PageCheckWhenChosedElimination(const BufferDesc *buf, uint32 oldFlags)
void PageCheckWhenChosedElimination(const BufferDesc *buf, uint32 oldFlags)
{
if ((oldFlags & BM_TAG_VALID) && RecoveryInProgress()) {
if (!XLByteEQ(buf->lsn_dirty, InvalidXLogRecPtr)) {
@ -2435,6 +2426,10 @@ static void PageCheckWhenChosedElimination(const BufferDesc *buf, uint32 oldFlag
static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber fork_num, BlockNumber block_num,
BufferAccessStrategy strategy, bool *found, const XLogPhyBlock *pblk)
{
if (g_instance.attr.attr_storage.nvm_attr.enable_nvm) {
return NvmBufferAlloc(smgr, relpersistence, fork_num, block_num, strategy, found, pblk);
}
Assert(!IsSegmentPhysicalRelNode(smgr->smgr_rnode.node));
BufferTag new_tag; /* identity of requested block */
@ -2635,16 +2630,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe
* Must lock the lower-numbered partition first to avoid
* deadlocks.
*/
if (old_partition_lock < new_partition_lock) {
(void)LWLockAcquire(old_partition_lock, LW_EXCLUSIVE);
(void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE);
} else if (old_partition_lock > new_partition_lock) {
(void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE);
(void)LWLockAcquire(old_partition_lock, LW_EXCLUSIVE);
} else {
/* only one partition, only one lock */
(void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE);
}
LockTwoLWLock(new_partition_lock, old_partition_lock);
} else {
/* if it wasn't valid, we need only the new partition */
(void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE);
@ -4651,7 +4637,7 @@ XLogRecPtr BufferGetLSNAtomic(Buffer buffer)
void DropSegRelNodeSharedBuffer(RelFileNode node, ForkNumber forkNum)
{
for (int i = 0; i < g_instance.attr.attr_storage.NBuffers; i++) {
for (int i = 0; i < SegmentBufferStartID; i++) {
BufferDesc *buf_desc = GetBufferDescriptor(i);
uint32 buf_state;
@ -4696,7 +4682,7 @@ void DropSegRelNodeSharedBuffer(RelFileNode node, ForkNumber forkNum)
void RangeForgetBuffer(RelFileNode node, ForkNumber forkNum, BlockNumber firstDelBlock,
BlockNumber endDelBlock)
{
for (int i = 0; i < g_instance.attr.attr_storage.NBuffers; i++) {
for (int i = 0; i < SegmentBufferStartID; i++) {
BufferDesc *buf_desc = GetBufferDescriptor(i);
uint32 buf_state;
@ -4716,7 +4702,7 @@ void DropRelFileNodeShareBuffers(RelFileNode node, ForkNumber forkNum, BlockNumb
{
int i;
for (i = 0; i < g_instance.attr.attr_storage.NBuffers; i++) {
for (i = 0; i < SegmentBufferStartID; i++) {
BufferDesc *buf_desc = GetBufferDescriptor(i);
uint32 buf_state;
/*
@ -4817,7 +4803,7 @@ void DropRelFileNodeAllBuffersUsingHash(HTAB *relfilenode_hashtbl)
{
int i;
for (i = 0; i < g_instance.attr.attr_storage.NBuffers; i++) {
for (i = 0; i < SegmentBufferStartID; i++) {
BufferDesc *buf_desc = GetBufferDescriptor(i);
uint32 buf_state;
bool found = false;
@ -4869,7 +4855,7 @@ void DropRelFileNodeAllBuffersUsingHash(HTAB *relfilenode_hashtbl)
void DropRelFileNodeOneForkAllBuffersUsingHash(HTAB *relfilenode_hashtbl)
{
int i;
for (i = 0; i < g_instance.attr.attr_storage.NBuffers; i++) {
for (i = 0; i < SegmentBufferStartID; i++) {
BufferDesc *buf_desc = GetBufferDescriptor(i);
uint32 buf_state;
bool found = false;
@ -5005,9 +4991,9 @@ void DropRelFileNodeAllBuffersUsingScan(RelFileNode *rnodes, int rnode_len)
pg_qsort(rnodes, (size_t)rnode_len, sizeof(RelFileNode), compare_rnode_func);
Assert((g_instance.attr.attr_storage.NBuffers % 4) == 0);
Assert((SegmentBufferStartID % 4) == 0);
int i;
for (i = 0; i < g_instance.attr.attr_storage.NBuffers; i += 4) {
for (i = 0; i < SegmentBufferStartID; i += 4) {
ScanCompareAndInvalidateBuffer(rnodes, rnode_len, i);
ScanCompareAndInvalidateBuffer(rnodes, rnode_len, i + 1);
@ -5144,7 +5130,7 @@ void flush_all_buffers(Relation rel, Oid db_id, HTAB *hashtbl)
// @Temp Table. no relation use local buffer. Temp table now use shared buffer.
/* Make sure we can handle the pin inside the loop */
ResourceOwnerEnlargeBuffers(t_thrd.utils_cxt.CurrentResourceOwner);
for (i = 0; i < NORMAL_SHARED_BUFFER_NUM; i++) {
for (i = 0; i < SegmentBufferStartID; i++) {
buf_desc = GetBufferDescriptor(i);
/*
* As in DropRelFileNodeBuffers, an unlocked precheck should be safe
@ -5852,7 +5838,7 @@ bool IsBufferCleanupOK(Buffer buffer)
/*
* WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
*/
static void WaitIO(BufferDesc *buf)
void WaitIO(BufferDesc *buf)
{
/*
* Changed to wait until there's no IO - Inoue 01/13/2000
@ -6299,7 +6285,7 @@ bool retryLockBufHdr(BufferDesc *desc, uint32 *buf_state)
* Obviously the buffer could be locked by the time the value is returned, so
* this is primarily useful in CAS style loops.
*/
static uint32 WaitBufHdrUnlocked(BufferDesc *buf)
uint32 WaitBufHdrUnlocked(BufferDesc *buf)
{
#ifndef ENABLE_THREAD_CHECK
SpinDelayStatus delay_status = init_spin_delay(buf);
@ -6833,3 +6819,16 @@ void PartitionInsertTdeInfoToCache(Relation reln, Partition p)
}
}
void LockTwoLWLock(LWLock *new_partition_lock, LWLock *old_partition_lock)
{
if (old_partition_lock < new_partition_lock) {
(void)LWLockAcquire(old_partition_lock, LW_EXCLUSIVE);
(void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE);
} else if (old_partition_lock > new_partition_lock) {
(void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE);
(void)LWLockAcquire(old_partition_lock, LW_EXCLUSIVE);
} else {
/* only one partition, only one lock */
(void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE);
}
}

View File

@ -608,7 +608,7 @@ RETRY:
if (pg_atomic_read_u32(&buf->state) & (BM_DIRTY | BM_IS_META)) {
if (retry_times < Min(MAX_RETRY_RING_TIMES, strategy->ring_size * MAX_RETRY_RING_PCT)) {
goto RETRY;
} else if (get_curr_candidate_nums(false) >= (uint32)g_instance.attr.attr_storage.NBuffers *
} else if (get_curr_candidate_nums(CAND_LIST_NORMAL) >= (uint32)g_instance.attr.attr_storage.NBuffers *
u_sess->attr.attr_storage.candidate_buf_percent_target){
strategy->current_was_in_ring = false;
return NULL;
@ -731,7 +731,7 @@ static BufferDesc* get_buf_from_candidate_list(BufferAccessStrategy strategy, ui
/* the pagewriter sub thread store normal buffer pool, sub thread starts from 1 */
int thread_id = (list_id + i) % list_num + 1;
Assert(thread_id > 0 && thread_id <= list_num);
while (candidate_buf_pop(&buf_id, thread_id)) {
while (candidate_buf_pop(&g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id].normal_list, &buf_id)) {
Assert(buf_id < SegmentBufferStartID);
buf = GetBufferDescriptor(buf_id);
local_buf_state = LockBufHdr(buf);

View File

@ -0,0 +1,21 @@
#This is the main CMAKE for build bin.
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} TGT_nvm_SRC)
set(TGT_nvm_INC
${PROJECT_OPENGS_DIR}/contrib/log_fdw
${PROJECT_TRUNK_DIR}/distribute/bin/gds
${PROJECT_SRC_DIR}/include/libcomm
${PROJECT_SRC_DIR}/include
${PROJECT_SRC_DIR}/lib/gstrace
${LZ4_INCLUDE_PATH}
${LIBCGROUP_INCLUDE_PATH}
${LIBORC_INCLUDE_PATH}
${EVENT_INCLUDE_PATH}
${PROTOBUF_INCLUDE_PATH}
${ZLIB_INCLUDE_PATH}
)
set(nvm_DEF_OPTIONS ${MACRO_OPTIONS})
set(nvm_COMPILE_OPTIONS ${OPTIMIZE_OPTIONS} ${OS_OPTIONS} ${PROTECT_OPTIONS} ${WARNING_OPTIONS} ${BIN_SECURE_OPTIONS} ${CHECK_OPTIONS})
set(nvm_LINK_OPTIONS ${BIN_LINK_OPTIONS})
add_static_objtarget(gausskernel_storage_nvm TGT_nvm_SRC TGT_nvm_INC "${nvm_DEF_OPTIONS}" "${nvm_COMPILE_OPTIONS}" "${nvm_LINK_OPTIONS}")

View File

@ -0,0 +1,14 @@
subdir = src/gausskernel/storage/nvm
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
ifneq "$(MAKECMDGOALS)" "clean"
ifneq "$(MAKECMDGOALS)" "distclean"
ifneq "$(shell which g++ |grep hutaf_llt |wc -l)" "1"
-include $(DEPEND)
endif
endif
endif
OBJS = nvm.o nvmbuffer.o
include $(top_srcdir)/src/gausskernel/common.mk

View File

@ -0,0 +1,102 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* nvm.cpp
*
* IDENTIFICATION
* src/gausskernel/storage/nvm/nvm.cpp
*
* ---------------------------------------------------------------------------------------
*/
#include "storage/nvm/nvm.h"
#include "postgres.h"
#include "utils/guc.h"
#include "knl/knl_variable.h"
#include "storage/buf/bufmgr.h"
extern bool check_special_character(char c);
bool check_nvm_path(char** newval, void** extra, GucSource source)
{
char *absPath;
absPath = pstrdup(*newval);
int len = strlen(absPath);
for (int i = 0; i < len; i++) {
if (!check_special_character(absPath[i]) || isspace(absPath[i])) {
return false;
}
}
return true;
}
static bool LockNvmFile(int fd)
{
struct flock lock;
lock.l_type = F_WRLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
lock.l_pid = getpid();
if (fcntl(fd, F_SETLK, &lock) == 0) {
return false;
}
return true;
}
void nvm_init(void)
{
LWLockAcquire(ShmemIndexLock, LW_EXCLUSIVE);
if (NVM_BUFFER_NUM == 0) {
LWLockRelease(ShmemIndexLock);
ereport(WARNING, (errmsg("nvm_buffers is not set.")));
return;
}
if (g_instance.attr.attr_storage.nvm_attr.nvmBlocks == NULL) {
int nvmBufFd = open(g_instance.attr.attr_storage.nvm_attr.nvm_file_path, O_RDWR);
if (nvmBufFd < 0) {
LWLockRelease(ShmemIndexLock);
ereport(FATAL, (errmsg("can not open nvm file.")));
}
if (LockNvmFile(nvmBufFd)) {
LWLockRelease(ShmemIndexLock);
ereport(FATAL, (errmsg("can not lock nvm file.")));
}
size_t nvmBufferSize = (NVM_BUFFER_NUM) * (Size)BLCKSZ;
g_instance.attr.attr_storage.nvm_attr.nvmBlocks = (char *)mmap(NULL, nvmBufferSize,
PROT_READ | PROT_WRITE, MAP_SHARED, nvmBufFd, 0);
if (g_instance.attr.attr_storage.nvm_attr.nvmBlocks == NULL) {
LWLockRelease(ShmemIndexLock);
ereport(FATAL,
(errmsg("could not create nvm buffer"),
errdetail("Failed system call was mmap(size=%lu)", nvmBufferSize),
errhint("This error usually means that openGauss's request for a"
"nvm shared buffer exceeded your nvm's file size. you can either"
"reduce the request the request size or resize the nvm file.")));
}
}
t_thrd.storage_cxt.NvmBufferBlocks = g_instance.attr.attr_storage.nvm_attr.nvmBlocks;
LWLockRelease(ShmemIndexLock);
}

View File

@ -0,0 +1,870 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* nvmbuffer.cpp
*
* IDENTIFICATION
* src/gausskernel/storage/nvm/nvmbuffer.cpp
*
* ------------------------------------------------------------------------
*/
#include "postgres.h"
#include "utils/dynahash.h"
#include "access/double_write.h"
#include "knl/knl_variable.h"
#include "storage/buf/buf_internals.h"
#include "storage/buf/bufmgr.h"
#include "storage/smgr/smgr.h"
#include "storage/smgr/segment_internal.h"
#include "utils/resowner.h"
#include "pgstat.h"
static BufferDesc *NvmStrategyGetBuffer(uint32 *buf_state);
extern PrivateRefCountEntry *GetPrivateRefCountEntry(Buffer buffer, bool create, bool do_move);
static const int MILLISECOND_TO_MICROSECOND = 1000;
static const int TEN_MILLISECOND = 10;
static inline bool BypassNvm()
{
return ((double)gs_random() / (double)MAX_RANDOM_VALUE) > g_instance.attr.attr_storage.nvm_attr.bypassNvm;
}
static inline bool BypassDram()
{
return ((double)gs_random() / (double)MAX_RANDOM_VALUE) > g_instance.attr.attr_storage.nvm_attr.bypassDram;
}
static BufferLookupEnt* NvmBufTableLookup(BufferTag *tag, uint32 hashcode)
{
return (BufferLookupEnt *)buf_hash_operate<HASH_FIND>(t_thrd.storage_cxt.SharedBufHash, tag, hashcode, NULL);
}
static bool NvmPinBuffer(BufferDesc *buf, bool *migrate)
{
int b = BufferDescriptorGetBuffer(buf);
bool result = false;
uint32 buf_state;
uint32 old_buf_state;
*migrate = false;
old_buf_state = pg_atomic_read_u32(&buf->state);
for (;;) {
if (unlikely(old_buf_state & BM_IN_MIGRATE)) {
*migrate = true;
return result;
} else {
if (old_buf_state & BM_LOCKED) {
old_buf_state = WaitBufHdrUnlocked(buf);
}
buf_state = old_buf_state;
/* increase refcount */
buf_state += BUF_REFCOUNT_ONE;
/* increase usagecount unless already max */
if (BUF_STATE_GET_USAGECOUNT(buf_state) != BM_MAX_USAGE_COUNT) {
buf_state += BUF_USAGECOUNT_ONE;
}
if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, buf_state)) {
result = (buf_state & BM_VALID) != 0;
break;
}
}
}
PrivateRefCountEntry *ref = GetPrivateRefCountEntry(b, true, true);
Assert(ref != NULL);
ref->refcount++;
Assert(ref->refcount > 0);
ResourceOwnerRememberBuffer(t_thrd.utils_cxt.CurrentResourceOwner, b);
return result;
}
static bool NvmPinBufferFast(BufferDesc *buf)
{
int b = BufferDescriptorGetBuffer(buf);
PrivateRefCountEntry *ref = NULL;
/* When the secondly and thirdly parameter all both true, the ret value must not be NULL. */
ref = GetPrivateRefCountEntry(b, false, false);
if (ref == NULL) {
return false;
}
ref->refcount++;
Assert(ref->refcount > 0);
ResourceOwnerRememberBuffer(t_thrd.utils_cxt.CurrentResourceOwner, b);
return true;
}
/*
* We must set timeout here to avoid deadlock. If
* backend thread A has pinned nvm buf1 and want to migrate nvm buf2,
* backend thread B has pinned nvm buf2 and want to migrate nvm buf1,
* it will result in deadlock.
*/
static bool WaitUntilUnPin(BufferDesc *buf)
{
uint32 old_buf_state;
int waits = 0;
for (;;) {
old_buf_state = pg_atomic_read_u32(&buf->state);
if (BUF_STATE_GET_REFCOUNT(old_buf_state) == 1) {
return true;
} else {
pg_usleep(MILLISECOND_TO_MICROSECOND);
waits++;
if (waits >= TEN_MILLISECOND) {
return false;
}
}
}
}
/*
* Wait until the BM_IN_MIGRATE flag isn't set anymore
*/
static void WaitBufHdrUnMigrate(BufferDesc *buf)
{
#ifndef ENABLE_THREAD_CHECK
SpinDelayStatus delay_status = init_spin_delay(buf);
#endif
uint32 buf_state;
buf_state = pg_atomic_read_u32(&buf->state);
while (buf_state & BM_IN_MIGRATE) {
#ifndef ENABLE_THREAD_CHECK
perform_spin_delay(&delay_status);
#endif
buf_state = pg_atomic_read_u32(&buf->state);
}
#ifndef ENABLE_THREAD_CHECK
finish_spin_delay(&delay_status);
#endif
}
static bool SetBufferMigrateFlag(Buffer buffer)
{
BufferDesc *buf = GetBufferDescriptor(buffer - 1);
uint32 bufState;
uint32 oldBufState;
for (;;) {
oldBufState = pg_atomic_read_u32(&buf->state);
if (oldBufState & BM_LOCKED) {
oldBufState = WaitBufHdrUnlocked(buf);
}
if (oldBufState & BM_IN_MIGRATE) {
return false;
}
bufState = oldBufState;
bufState |= BM_IN_MIGRATE;
ereport(DEBUG1, (errmsg("mark buffer %d migrate buffer stat %u.", buffer, bufState)));
if (pg_atomic_compare_exchange_u32(&buf->state, &oldBufState, bufState)) {
return true;
}
}
}
static void UnSetBufferMigrateFlag(Buffer buffer)
{
BufferDesc *buf = GetBufferDescriptor(buffer - 1);
uint32 bufState;
uint32 oldBufState;
for (;;) {
oldBufState = pg_atomic_read_u32(&buf->state);
if (oldBufState & BM_LOCKED) {
oldBufState = WaitBufHdrUnlocked(buf);
}
bufState = oldBufState;
bufState &= ~(BM_IN_MIGRATE);
ereport(DEBUG1, (errmsg("unmark buffer %d migrate buffer stat %u.", buffer, bufState)));
if (pg_atomic_compare_exchange_u32(&buf->state, &oldBufState, bufState)) {
break;
}
}
}
static void NvmWaitBufferIO(BufferDesc *buf)
{
uint32 buf_state;
Assert(!t_thrd.storage_cxt.InProgressBuf);
/* To check the InProgressBuf must be NULL. */
if (t_thrd.storage_cxt.InProgressBuf) {
ereport(PANIC, (errmsg("InProgressBuf not null: id %d flags %u, buf: id %d flags %u",
t_thrd.storage_cxt.InProgressBuf->buf_id,
pg_atomic_read_u32(&t_thrd.storage_cxt.InProgressBuf->state) & BUF_FLAG_MASK,
buf->buf_id, pg_atomic_read_u32(&buf->state) & BUF_FLAG_MASK)));
}
bool ioDone = false;
restart:
for (;;) {
(void)LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
buf_state = LockBufHdr(buf);
if (buf_state & BM_IO_IN_PROGRESS) {
ioDone = true;
UnlockBufHdr(buf, buf_state);
LWLockRelease(buf->io_in_progress_lock);
WaitIO(buf);
} else {
break;
}
}
if (buf_state & BM_VALID) {
/* someone else already did the I/O */
UnlockBufHdr(buf, buf_state);
LWLockRelease(buf->io_in_progress_lock);
return;
} else {
if (!ioDone) {
UnlockBufHdr(buf, buf_state);
LWLockRelease(buf->io_in_progress_lock);
goto restart;
} else {
ereport(PANIC, (errmsg("ioDone is true but buf_state is not valid ")));
}
}
return;
}
BufferDesc *NvmBufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber fork_num,
BlockNumber block_num, BufferAccessStrategy strategy, bool *found, const XLogPhyBlock *pblk)
{
Assert(!IsSegmentPhysicalRelNode(smgr->smgr_rnode.node));
BufferTag new_tag; /* identity of requested block */
uint32 new_hash; /* hash value for newTag */
LWLock *new_partition_lock = NULL; /* buffer partition lock for it */
BufferTag old_tag; /* previous identity of selected buffer */
uint32 old_hash; /* hash value for oldTag */
LWLock *old_partition_lock = NULL; /* buffer partition lock for it */
uint32 old_flags;
int buf_id;
BufferDesc *buf = NULL;
BufferDesc *nvmBuf = NULL;
bool valid = false;
uint32 buf_state, nvm_buf_state;
bool migrate = false;
errno_t rc;
/* create a tag so we can lookup the buffer */
INIT_BUFFERTAG(new_tag, smgr->smgr_rnode.node, fork_num, block_num);
/* determine its hash code and partition lock ID */
new_hash = BufTableHashCode(&new_tag);
new_partition_lock = BufMappingPartitionLock(new_hash);
restart:
*found = FALSE;
/* see if the block is in the buffer pool already */
(void)LWLockAcquire(new_partition_lock, LW_SHARED);
pgstat_report_waitevent(WAIT_EVENT_BUF_HASH_SEARCH);
BufferLookupEnt *entry = NvmBufTableLookup(&new_tag, new_hash);
pgstat_report_waitevent(WAIT_EVENT_END);
if (entry != NULL) {
*found = TRUE;
/*
* Found it. Now, pin the buffer so no one can steal it from the
* buffer pool, and check to see if the correct data has been loaded
* into the buffer.
*/
buf_id = pg_atomic_read_u32((volatile uint32*)&entry->id);
if (IsNormalBufferID(buf_id)) {
buf = GetBufferDescriptor(buf_id);
valid = PinBuffer(buf, strategy);
/* Can release the mapping lock as soon as we've pinned it */
LWLockRelease(new_partition_lock);
if (!valid) {
/*
* We can only get here if (a) someone else is still reading in
* the page, or (b) a previous read attempt failed. We have to
* wait for any active read attempt to finish, and then set up our
* own read attempt if the page is still not BM_VALID.
* StartBufferIO does it all.
*/
if (StartBufferIO(buf, true)) {
/*
* If we get here, previous attempts to read the buffer must
* have failed ... but we shall bravely try again.
*/
*found = FALSE;
}
}
return buf;
}
if (IsNvmBufferID(buf_id)) {
nvmBuf = GetBufferDescriptor(buf_id);
/* Return buffer immediately if I have pinned the buffer before */
if (NvmPinBufferFast(nvmBuf)) {
LWLockRelease(new_partition_lock);
return nvmBuf;
}
/* Haven't pinned the buffer ever */
if (BypassDram()) {
/* want to return nvm buffer directly */
valid = NvmPinBuffer(nvmBuf, &migrate);
if (migrate) {
LWLockRelease(new_partition_lock);
WaitBufHdrUnMigrate(nvmBuf);
goto restart;
} else {
buf_id = pg_atomic_read_u32((volatile uint32 *)&entry->id);
if (IsNormalBufferID(buf_id)) {
UnpinBuffer(nvmBuf, true);
buf = GetBufferDescriptor(buf_id);
valid = PinBuffer(buf, strategy);
LWLockRelease(new_partition_lock);
Assert(valid);
return buf;
}
/* Can release the mapping lock as soon as we've pinned it */
LWLockRelease(new_partition_lock);
Assert(nvmBuf->buf_id == buf_id);
if (!valid) {
if (StartBufferIO(nvmBuf, true)) {
*found = FALSE;
}
}
return nvmBuf;
}
} else {
/* wanna migrate */
if (SetBufferMigrateFlag(buf_id + 1)) {
buf_id = pg_atomic_read_u32((volatile uint32 *)&entry->id);
if (IsNormalBufferID(buf_id)) {
UnSetBufferMigrateFlag(nvmBuf->buf_id + 1);
buf = GetBufferDescriptor(buf_id);
valid = PinBuffer(buf, strategy);
LWLockRelease(new_partition_lock);
Assert(valid);
return buf;
}
Assert(nvmBuf->buf_id == buf_id);
valid = PinBuffer(nvmBuf, strategy);
if (!valid) {
/* corner case: the migration thread can not do I/O */
NvmWaitBufferIO(nvmBuf);
}
LWLockRelease(new_partition_lock);
if (!WaitUntilUnPin(nvmBuf)) {
UnSetBufferMigrateFlag(buf_id + 1);
return nvmBuf;
}
nvm_buf_state = LockBufHdr(nvmBuf);
if (nvm_buf_state & BM_DIRTY) {
UnlockBufHdr(nvmBuf, nvm_buf_state);
UnSetBufferMigrateFlag(buf_id + 1);
return nvmBuf;
} else {
for (;;) {
buf = (BufferDesc *)StrategyGetBuffer(strategy, &buf_state);
old_flags = buf_state & BUF_FLAG_MASK;
if ((old_flags & BM_DIRTY) || (old_flags & BM_IS_META)) {
UnlockBufHdr(buf, buf_state);
(void)sched_yield();
continue;
}
if (old_flags & BM_TAG_VALID) {
PinBuffer_Locked(buf);
old_tag = ((BufferDesc *)buf)->tag;
old_hash = BufTableHashCode(&old_tag);
old_partition_lock = BufMappingPartitionLock(old_hash);
(void)LWLockAcquire(old_partition_lock, LW_EXCLUSIVE);
buf_state = LockBufHdr(buf);
old_flags = buf_state & BUF_FLAG_MASK;
if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(old_flags & BM_DIRTY) &&
!(old_flags & BM_IS_META)) {
/* todo DMS */
buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED |
BM_IO_ERROR | BM_PERMANENT | BUF_USAGECOUNT_MASK);
if (relpersistence == RELPERSISTENCE_PERMANENT || fork_num == INIT_FORKNUM ||
((relpersistence == RELPERSISTENCE_TEMP) && STMT_RETRY_ENABLED)) {
buf_state |= BM_VALID | BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE;
} else {
buf_state |= BM_VALID | BM_TAG_VALID | BUF_USAGECOUNT_ONE;
}
UnlockBufHdr(buf, buf_state);
BufTableDelete(&old_tag, old_hash);
/* immediately after deleteing old_hash to avoid ABA Problem */
LWLockRelease(old_partition_lock);
rc = memcpy_s(BufferGetBlock(buf->buf_id + 1), BLCKSZ,
BufferGetBlock(nvmBuf->buf_id + 1), BLCKSZ);
securec_check(rc, "\0", "\0");
buf->tag = nvmBuf->tag;
buf->seg_fileno = nvmBuf->seg_fileno;
buf->seg_blockno = nvmBuf->seg_blockno;
buf->lsn_on_disk = nvmBuf->lsn_on_disk;
/* Assert nvmBuf is not dirty \ cas without buffer header lock */
nvm_buf_state &= ~(BM_TAG_VALID);
UnlockBufHdr(nvmBuf, nvm_buf_state);
UnpinBuffer(nvmBuf, true);
pg_atomic_write_u32((volatile uint32 *)&entry->id, buf->buf_id);
UnSetBufferMigrateFlag(nvmBuf->buf_id + 1);
return buf;
}
UnlockBufHdr(buf, buf_state);
LWLockRelease(old_partition_lock);
UnpinBuffer(buf, true);
} else {
/* this buf is not in hashtable */
buf->state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED |
BM_IO_ERROR | BM_PERMANENT | BUF_USAGECOUNT_MASK);
if (relpersistence == RELPERSISTENCE_PERMANENT || fork_num == INIT_FORKNUM ||
((relpersistence == RELPERSISTENCE_TEMP) && STMT_RETRY_ENABLED)) {
buf->state |= BM_VALID | BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE;
} else {
buf->state |= BM_VALID | BM_TAG_VALID | BUF_USAGECOUNT_ONE;
}
PinBuffer_Locked(buf);
rc = memcpy_s(BufferGetBlock(buf->buf_id + 1), BLCKSZ,
BufferGetBlock(nvmBuf->buf_id + 1), BLCKSZ);
securec_check(rc, "\0", "\0");
buf->tag = nvmBuf->tag;
buf->seg_fileno = nvmBuf->seg_fileno;
buf->seg_blockno = nvmBuf->seg_blockno;
buf->lsn_on_disk = nvmBuf->lsn_on_disk;
// Assert nvmBuf is not dirty
nvm_buf_state &= ~(BM_TAG_VALID);
UnlockBufHdr(nvmBuf, nvm_buf_state);
UnpinBuffer(nvmBuf, true);
pg_atomic_write_u32((volatile uint32 *)&entry->id, buf->buf_id);
UnSetBufferMigrateFlag(nvmBuf->buf_id + 1);
return buf;
}
}
}
} else {
LWLockRelease(new_partition_lock);
goto restart;
}
}
}
}
/*
* Didn't find it in the buffer pool. We'll have to initialize a new
* buffer. Remember to unlock the mapping lock while doing the work.
*/
LWLockRelease(new_partition_lock);
/* Loop here in case we have to try another victim buffer */
for (;;) {
bool needGetLock = false;
/*
* Select a victim buffer. The buffer is returned with its header
* spinlock still held!
*/
pgstat_report_waitevent(WAIT_EVENT_BUF_STRATEGY_GET);
if (BypassNvm()) {
buf = (BufferDesc *)StrategyGetBuffer(strategy, &buf_state);
} else {
buf = (BufferDesc *)NvmStrategyGetBuffer(&buf_state);
}
pgstat_report_waitevent(WAIT_EVENT_END);
Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0);
/* Must copy buffer flags while we still hold the spinlock */
old_flags = buf_state & BUF_FLAG_MASK;
/* Pin the buffer and then release the buffer spinlock */
PinBuffer_Locked(buf);
PageCheckIfCanEliminate(buf, &old_flags, &needGetLock);
/*
* If the buffer was dirty, try to write it out. There is a race
* condition here, in that someone might dirty it after we released it
* above, or even while we are writing it out (since our share-lock
* won't prevent hint-bit updates). We will recheck the dirty bit
* after re-locking the buffer header.
*/
if (old_flags & BM_DIRTY) {
/* backend should not flush dirty pages if working version less than DW_SUPPORT_NEW_SINGLE_FLUSH */
if (!backend_can_flush_dirty_page()) {
UnpinBuffer(buf, true);
(void)sched_yield();
continue;
}
/*
* We need a share-lock on the buffer contents to write it out
* (else we might write invalid data, eg because someone else is
* compacting the page contents while we write). We must use a
* conditional lock acquisition here to avoid deadlock. Even
* though the buffer was not pinned (and therefore surely not
* locked) when StrategyGetBuffer returned it, someone else could
* have pinned and exclusive-locked it by the time we get here. If
* we try to get the lock unconditionally, we'd block waiting for
* them; if they later block waiting for us, deadlock ensues.
* (This has been observed to happen when two backends are both
* trying to split btree index pages, and the second one just
* happens to be trying to split the page the first one got from
* StrategyGetBuffer.)
*/
bool needDoFlush = false;
if (!needGetLock) {
needDoFlush = LWLockConditionalAcquire(buf->content_lock, LW_SHARED);
} else {
LWLockAcquire(buf->content_lock, LW_SHARED);
needDoFlush = true;
}
if (needDoFlush) {
/*
* If using a nondefault strategy, and writing the buffer
* would require a WAL flush, let the strategy decide whether
* to go ahead and write/reuse the buffer or to choose another
* victim. We need lock to inspect the page LSN, so this
* can't be done inside StrategyGetBuffer.
*/
if (strategy != NULL) {
XLogRecPtr lsn;
/* Read the LSN while holding buffer header lock */
buf_state = LockBufHdr(buf);
lsn = BufferGetLSN(buf);
UnlockBufHdr(buf, buf_state);
if (XLogNeedsFlush(lsn) && StrategyRejectBuffer(strategy, buf)) {
/* Drop lock/pin and loop around for another buffer */
LWLockRelease(buf->content_lock);
UnpinBuffer(buf, true);
continue;
}
}
/* during initdb, not need flush dw file */
if (dw_enabled() && pg_atomic_read_u32(&g_instance.ckpt_cxt_ctl->current_page_writer_count) > 0) {
if (!free_space_enough(buf->buf_id)) {
LWLockRelease(buf->content_lock);
UnpinBuffer(buf, true);
continue;
}
uint32 pos = 0;
pos = first_version_dw_single_flush(buf);
t_thrd.proc->dw_pos = pos;
FlushBuffer(buf, NULL);
g_instance.dw_single_cxt.single_flush_state[pos] = true;
t_thrd.proc->dw_pos = -1;
} else {
FlushBuffer(buf, NULL);
}
LWLockRelease(buf->content_lock);
ScheduleBufferTagForWriteback(t_thrd.storage_cxt.BackendWritebackContext, &buf->tag);
} else {
/*
* Someone else has locked the buffer, so give it up and loop
* back to get another one.
*/
UnpinBuffer(buf, true);
continue;
}
}
/*
* To change the association of a valid buffer, we'll need to have
* exclusive lock on both the old and new mapping partitions.
*/
if (old_flags & BM_TAG_VALID) {
/*
* Need to compute the old tag's hashcode and partition lock ID.
* XXX is it worth storing the hashcode in BufferDesc so we need
* not recompute it here? Probably not.
*/
old_tag = ((BufferDesc *)buf)->tag;
old_hash = BufTableHashCode(&old_tag);
old_partition_lock = BufMappingPartitionLock(old_hash);
/*
* Must lock the lower-numbered partition first to avoid
* deadlocks.
*/
LockTwoLWLock(new_partition_lock, old_partition_lock);
} else {
/* if it wasn't valid, we need only the new partition */
(void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE);
/* these just keep the compiler quiet about uninit variables */
old_hash = 0;
old_partition_lock = NULL;
}
/*
* Try to make a hashtable entry for the buffer under its new tag.
* This could fail because while we were writing someone else
* allocated another buffer for the same block we want to read in.
* Note that we have not yet removed the hashtable entry for the old
* tag.
*/
buf_id = BufTableInsert(&new_tag, new_hash, buf->buf_id);
if (buf_id >= 0) {
/*
* Got a collision. Someone has already done what we were about to
* do. We'll just handle this as if it were found in the buffer
* pool in the first place. First, give up the buffer we were
* planning to use.
*/
UnpinBuffer(buf, true);
/* Can give up that buffer's mapping partition lock now */
if ((old_flags & BM_TAG_VALID) && old_partition_lock != new_partition_lock)
LWLockRelease(old_partition_lock);
/* remaining code should match code at top of routine */
buf = GetBufferDescriptor(buf_id);
valid = PinBuffer(buf, strategy);
/* Can release the mapping lock as soon as we've pinned it */
LWLockRelease(new_partition_lock);
*found = TRUE;
if (!valid) {
/*
* We can only get here if (a) someone else is still reading
* in the page, or (b) a previous read attempt failed. We
* have to wait for any active read attempt to finish, and
* then set up our own read attempt if the page is still not
* BM_VALID. StartBufferIO does it all.
*/
if (StartBufferIO(buf, true)) {
/*
* If we get here, previous attempts to read the buffer
* must have failed ... but we shall bravely try again.
*/
*found = FALSE;
}
}
return buf;
}
/*
* Need to lock the buffer header too in order to change its tag.
*/
buf_state = LockBufHdr(buf);
/*
* Somebody could have pinned or re-dirtied the buffer while we were
* doing the I/O and making the new hashtable entry. If so, we can't
* recycle this buffer; we must undo everything we've done and start
* over with a new victim buffer.
*/
old_flags = buf_state & BUF_FLAG_MASK;
if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(old_flags & BM_DIRTY)
&& !(old_flags & BM_IS_META)) {
break;
}
UnlockBufHdr(buf, buf_state);
BufTableDelete(&new_tag, new_hash);
if ((old_flags & BM_TAG_VALID) && old_partition_lock != new_partition_lock) {
LWLockRelease(old_partition_lock);
}
LWLockRelease(new_partition_lock);
UnpinBuffer(buf, true);
}
#ifdef USE_ASSERT_CHECKING
PageCheckWhenChosedElimination(buf, old_flags);
#endif
/*
* Okay, it's finally safe to rename the buffer.
*
* Clearing BM_VALID here is necessary, clearing the dirtybits is just
* paranoia. We also reset the usage_count since any recency of use of
* the old content is no longer relevant. (The usage_count starts out at
* 1 so that the buffer can survive one clock-sweep pass.)
*
* Make sure BM_PERMANENT is set for buffers that must be written at every
* checkpoint. Unlogged buffers only need to be written at shutdown
* checkpoints, except for their "init" forks, which need to be treated
* just like permanent relations.
*/
((BufferDesc *)buf)->tag = new_tag;
buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT |
BUF_USAGECOUNT_MASK);
if (relpersistence == RELPERSISTENCE_PERMANENT || fork_num == INIT_FORKNUM ||
((relpersistence == RELPERSISTENCE_TEMP) && STMT_RETRY_ENABLED)) {
buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE;
} else {
buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
}
UnlockBufHdr(buf, buf_state);
if (old_flags & BM_TAG_VALID) {
BufTableDelete(&old_tag, old_hash);
if (old_partition_lock != new_partition_lock) {
LWLockRelease(old_partition_lock);
}
}
/* set Physical segment file. */
if (pblk != NULL) {
Assert(PhyBlockIsValid(*pblk));
buf->seg_fileno = pblk->relNode;
buf->seg_blockno = pblk->block;
} else {
buf->seg_fileno = EXTENT_INVALID;
buf->seg_blockno = InvalidBlockNumber;
}
LWLockRelease(new_partition_lock);
/*
* Buffer contents are currently invalid. Try to get the io_in_progress
* lock. If StartBufferIO returns false, then someone else managed to
* read it before we did, so there's nothing left for BufferAlloc() to do.
*/
if (StartBufferIO(buf, true)) {
*found = FALSE;
} else {
*found = TRUE;
}
return buf;
}
static uint32 next_victim_buffer = 0;
static inline uint32 NvmClockSweepTick(void)
{
uint32 victim = pg_atomic_fetch_add_u32(&next_victim_buffer, 1);
if (victim >= (uint32)NVM_BUFFER_NUM) {
victim = victim % NVM_BUFFER_NUM;
}
return victim;
}
static BufferDesc* get_nvm_buf_from_candidate_list(uint32* buf_state)
{
BufferDesc* buf = NULL;
uint32 local_buf_state;
int buf_id = 0;
int list_num = g_instance.ckpt_cxt_ctl->pgwr_procs.sub_num;
volatile PgBackendStatus* beentry = t_thrd.shemem_ptr_cxt.MyBEEntry;
int list_id = beentry->st_tid > 0 ? (beentry->st_tid % list_num) : (beentry->st_sessionid % list_num);
for (int i = 0; i < list_num; i++) {
/* the pagewriter sub thread store normal buffer pool, sub thread starts from 1 */
int thread_id = (list_id + i) % list_num + 1;
Assert(thread_id > 0 && thread_id <= list_num);
while (candidate_buf_pop(&g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id].nvm_list, &buf_id)) {
Assert(buf_id >= NvmBufferStartID && buf_id < SegmentBufferStartID);
buf = GetBufferDescriptor(buf_id);
local_buf_state = LockBufHdr(buf);
if (g_instance.ckpt_cxt_ctl->candidate_free_map[buf_id]) {
g_instance.ckpt_cxt_ctl->candidate_free_map[buf_id] = false;
bool available_buffer = BUF_STATE_GET_REFCOUNT(local_buf_state) == 0
&& !(local_buf_state & BM_IS_META)
&& !(local_buf_state & BM_DIRTY);
if (available_buffer) {
*buf_state = local_buf_state;
return buf;
}
}
UnlockBufHdr(buf, local_buf_state);
}
}
wakeup_pagewriter_thread();
return NULL;
}
const int RETRY_COUNT = 3;
static BufferDesc *NvmStrategyGetBuffer(uint32* buf_state)
{
BufferDesc *buf = NULL;
int try_counter = NVM_BUFFER_NUM * RETRY_COUNT;
uint32 local_buf_state = 0; /* to avoid repeated (de-)referencing */
/* Check the Candidate list */
if (ENABLE_INCRE_CKPT && pg_atomic_read_u32(&g_instance.ckpt_cxt_ctl->current_page_writer_count) > 1) {
buf = get_nvm_buf_from_candidate_list(buf_state);
if (buf != NULL) {
(void)pg_atomic_fetch_add_u64(&g_instance.ckpt_cxt_ctl->nvm_get_buf_num_candidate_list, 1);
return buf;
}
}
for (;;) {
int buf_id = BufferIdOfNvmBuffer(NvmClockSweepTick());
buf = GetBufferDescriptor(buf_id);
local_buf_state = LockBufHdr(buf);
if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0 && !(local_buf_state & BM_IS_META) &&
(backend_can_flush_dirty_page() || !(local_buf_state & BM_DIRTY))) {
*buf_state = local_buf_state;
(void)pg_atomic_fetch_add_u64(&g_instance.ckpt_cxt_ctl->nvm_get_buf_num_clock_sweep, 1);
return buf;
} else if (--try_counter == 0) {
UnlockBufHdr(buf, local_buf_state);
ereport(ERROR, (errcode(ERRCODE_INVALID_BUFFER), (errmsg("no unpinned buffers available"))));
}
UnlockBufHdr(buf, local_buf_state);
}
/* not reached */
return NULL;
}

View File

@ -31,7 +31,6 @@
#include "storage/smgr/segment.h"
#include "storage/smgr/smgr.h"
#include "utils/resowner.h"
#include "tsan_annotation.h"
#include "pgstat.h"
/*
@ -63,23 +62,6 @@ void AbortSegBufferIO(void)
}
}
static void WaitIO(BufferDesc *buf)
{
while (true) {
uint32 buf_state;
buf_state = LockBufHdr(buf);
UnlockBufHdr(buf, buf_state);
if (!(buf_state & BM_IO_IN_PROGRESS)) {
break;
}
LWLockAcquire(buf->io_in_progress_lock, LW_SHARED);
LWLockRelease(buf->io_in_progress_lock);
}
}
static bool SegStartBufferIO(BufferDesc *buf, bool forInput)
{
uint32 buf_state;
@ -157,32 +139,6 @@ static void SegTerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_f
LWLockRelease(buf->io_in_progress_lock);
}
static uint32 SegWaitBufHdrUnlocked(BufferDesc *buf)
{
#ifndef ENABLE_THREAD_CHECK
SpinDelayStatus delayStatus = init_spin_delay(buf);
#endif
uint32 buf_state;
buf_state = pg_atomic_read_u32(&buf->state);
while (buf_state & BM_LOCKED) {
#ifndef ENABLE_THREAD_CHECK
perform_spin_delay(&delayStatus);
#endif
buf_state = pg_atomic_read_u32(&buf->state);
}
#ifndef ENABLE_THREAD_CHECK
finish_spin_delay(&delayStatus);
#endif
/* ENABLE_THREAD_CHECK only, acqurie semantic */
TsAnnotateHappensAfter(&buf->state);
return buf_state;
}
bool SegPinBuffer(BufferDesc *buf)
{
ereport(DEBUG5, (errmodule(MOD_SEGMENT_PAGE),
@ -198,7 +154,7 @@ bool SegPinBuffer(BufferDesc *buf)
for (;;) {
if (old_buf_state & BM_LOCKED) {
old_buf_state = SegWaitBufHdrUnlocked(buf);
old_buf_state = WaitBufHdrUnlocked(buf);
}
buf_state = old_buf_state;
@ -263,7 +219,7 @@ void SegUnpinBuffer(BufferDesc *buf)
old_buf_state = pg_atomic_read_u32(&buf->state);
for (;;) {
if (old_buf_state & BM_LOCKED) {
old_buf_state = SegWaitBufHdrUnlocked(buf);
old_buf_state = WaitBufHdrUnlocked(buf);
}
buf_state = old_buf_state;
@ -459,20 +415,6 @@ Buffer ReadBufferFast(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, Bloc
return BufferDescriptorGetBuffer(bufHdr);
}
void LockTwoLWLock(LWLock *new_partition_lock, LWLock *old_partition_lock)
{
if (old_partition_lock < new_partition_lock) {
(void)LWLockAcquire(old_partition_lock, LW_EXCLUSIVE);
(void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE);
} else if (old_partition_lock > new_partition_lock) {
(void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE);
(void)LWLockAcquire(old_partition_lock, LW_EXCLUSIVE);
} else {
/* only one partition, only one lock */
(void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE);
}
}
BufferDesc * FoundBufferInHashTable(int buf_id, LWLock *new_partition_lock, bool *foundPtr)
{
BufferDesc *buf = GetBufferDescriptor(buf_id);
@ -626,7 +568,7 @@ static BufferDesc* get_segbuf_from_candidate_list(uint32* buf_state)
/* the pagewriter sub thread store normal buffer pool, sub thread starts from 1 */
int thread_id = (list_id + i) % list_num + 1;
Assert(thread_id > 0 && thread_id <= list_num);
while (seg_candidate_buf_pop(&buf_id, thread_id)) {
while (candidate_buf_pop(&g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id].seg_list, &buf_id)) {
buf = GetBufferDescriptor(buf_id);
local_buf_state = LockBufHdr(buf);
SegmentCheck(buf_id >= SegmentBufferStartID);

View File

@ -81,6 +81,14 @@ typedef struct knl_instance_attr_dcf {
int dcf_mec_batch_size;
} knl_instance_attr_dcf;
typedef struct knl_instance_attr_nvm {
bool enable_nvm;
char* nvm_file_path;
char *nvmBlocks;
double bypassDram;
double bypassNvm;
} knl_instance_attr_nvm;
typedef struct knl_instance_attr_storage {
bool wal_log_hints;
bool EnableHotStandby;
@ -100,6 +108,7 @@ typedef struct knl_instance_attr_storage {
int WalReceiverBufSize;
int DataQueueBufSize;
int NBuffers;
int NNvmBuffers;
int NPcaBuffers;
int NSegBuffers;
int cstore_buffers;
@ -144,6 +153,7 @@ typedef struct knl_instance_attr_storage {
int max_concurrent_autonomous_transactions;
char* available_zone;
knl_instance_attr_dcf dcf_attr;
knl_instance_attr_nvm nvm_attr;
int num_internal_lock_partitions[LWLOCK_PART_KIND];
char* num_internal_lock_partitions_str;
int wal_insert_status_entries_power;

View File

@ -540,8 +540,10 @@ typedef struct knl_g_ckpt_context {
/* pagewriter thread view information */
uint64 page_writer_actual_flush;
volatile uint64 get_buf_num_candidate_list;
volatile uint64 nvm_get_buf_num_candidate_list;
volatile uint64 seg_get_buf_num_candidate_list;
volatile uint64 get_buf_num_clock_sweep;
volatile uint64 nvm_get_buf_num_clock_sweep;
volatile uint64 seg_get_buf_num_clock_sweep;
/* checkpoint view information */

View File

@ -2549,6 +2549,7 @@ typedef struct knl_t_storage_context {
TransactionId* xminArray;
union BufferDescPadded* BufferDescriptors;
char* BufferBlocks;
char* NvmBufferBlocks;
struct WritebackContext* BackendWritebackContext;
struct HTAB* SharedBufHash;
struct HTAB* BufFreeListHash;

View File

@ -31,6 +31,12 @@
#define ENABLE_INCRE_CKPT g_instance.attr.attr_storage.enableIncrementalCheckpoint
#define NEED_CONSIDER_USECOUNT u_sess->attr.attr_storage.enable_candidate_buf_usage_count
#define INIT_CANDIDATE_LIST(L, list, size, xx_head, xx_tail) \
((L).cand_buf_list = (list), \
(L).cand_list_size = (size), \
(L).head = (xx_head), \
(L).tail = (xx_tail))
typedef struct PGPROC PGPROC;
typedef struct BufferDesc BufferDesc;
typedef struct CkptSortItem CkptSortItem;
@ -42,6 +48,22 @@ typedef struct ThrdDwCxt {
bool is_new_relfilenode;
} ThrdDwCxt;
typedef enum CandListType {
CAND_LIST_NORMAL,
CAND_LIST_NVM,
CAND_LIST_SEG
} CandListType;
typedef struct CandidateList {
Buffer *cand_buf_list;
volatile int cand_list_size;
pg_atomic_uint64 head;
pg_atomic_uint64 tail;
volatile int buf_id_start;
int32 next_scan_loc;
int32 next_scan_ratio_loc;
} CandidateList;
typedef struct PageWriterProc {
PGPROC* proc;
/* dirty page queue */
@ -56,24 +78,10 @@ typedef struct PageWriterProc {
CkptSortItem *dirty_buf_list;
uint32 dirty_list_size;
volatile int buf_id_start; /* buffer id start loc */
int32 next_scan_normal_loc;
int32 next_scan_ratio_loc;
/* thread candidate list, main thread store the segment buffer information */
Buffer *cand_buf_list; /* thread candidate buffer list */
volatile int cand_list_size; /* thread candidate list max size, */
pg_atomic_uint64 head;
pg_atomic_uint64 tail;
/* thread seg buffer information */
Buffer *seg_cand_buf_list;
volatile int seg_cand_list_size;
pg_atomic_uint64 seg_head;
pg_atomic_uint64 seg_tail;
volatile int seg_id_start; /* buffer id start loc */
int32 next_scan_seg_loc;
CandidateList normal_list;
CandidateList nvm_list;
CandidateList seg_list;
} PageWriterProc;
typedef struct PageWriterProcs {
@ -163,11 +171,10 @@ extern const incre_ckpt_view_col g_ckpt_view_col[INCRE_CKPT_VIEW_COL_NUM];
extern const incre_ckpt_view_col g_pagewriter_view_col[PAGEWRITER_VIEW_COL_NUM];
extern const incre_ckpt_view_col g_pagewirter_view_two_col[CANDIDATE_VIEW_COL_NUM];
extern bool candidate_buf_pop(int *buf_id, int thread_id);
extern bool seg_candidate_buf_pop(int *buf_id, int thread_id);
extern bool candidate_buf_pop(CandidateList *list, int *buf_id);
extern void candidate_buf_init(void);
extern uint32 get_curr_candidate_nums(bool segment);
extern uint32 get_curr_candidate_nums(CandListType type);
extern void PgwrAbsorbFsyncRequests(void);
extern Size PageWriterShmemSize(void);
extern void PageWriterSyncShmemInit(void);

View File

@ -39,7 +39,7 @@
* The definition of buffer state components is below.
*/
#define BUF_REFCOUNT_ONE 1
#define BUF_REFCOUNT_MASK ((1U << 17) - 1)
#define BUF_REFCOUNT_MASK ((1U << 16) - 1)
#define BUF_USAGECOUNT_MASK 0x003C0000U
#define BUF_USAGECOUNT_ONE (1U << 18)
#define BUF_USAGECOUNT_SHIFT 18
@ -55,6 +55,7 @@
* Note: TAG_VALID essentially means that there is a buffer hashtable
* entry associated with the buffer's tag.
*/
#define BM_IN_MIGRATE (1U << 16) /* buffer is migrating */
#define BM_IS_META (1U << 17)
#define BM_LOCKED (1U << 22) /* buffer header is locked */
#define BM_DIRTY (1U << 23) /* data needs writing */
@ -109,6 +110,11 @@ typedef struct buftagnohbkt {
BlockNumber blockNum; /* blknum relative to begin of reln */
} BufferTagFirstVer;
/* entry for buffer lookup hashtable */
typedef struct {
BufferTag key; /* Tag of a disk page */
int id; /* Associated buffer ID */
} BufferLookupEnt;
#define CLEAR_BUFFERTAG(a) \
((a).rnode.spcNode = InvalidOid, \

View File

@ -22,13 +22,18 @@
#include "utils/relcache.h"
#include "postmaster/pagerepair.h"
/* [ dram buffer | nvm buffer | segment buffer] */
#define NVM_BUFFER_NUM (g_instance.attr.attr_storage.NNvmBuffers)
#define SEGMENT_BUFFER_NUM (g_instance.attr.attr_storage.NSegBuffers) // 1GB
#define SegmentBufferStartID (g_instance.attr.attr_storage.NBuffers)
#define NORMAL_SHARED_BUFFER_NUM (SegmentBufferStartID)
#define TOTAL_BUFFER_NUM (SEGMENT_BUFFER_NUM + NORMAL_SHARED_BUFFER_NUM)
#define BufferIdOfSegmentBuffer(id) ((id) + SegmentBufferStartID)
#define NvmBufferStartID (g_instance.attr.attr_storage.NBuffers)
#define SegmentBufferStartID (g_instance.attr.attr_storage.NBuffers + g_instance.attr.attr_storage.NNvmBuffers)
#define NORMAL_SHARED_BUFFER_NUM (NvmBufferStartID)
#define TOTAL_BUFFER_NUM (SEGMENT_BUFFER_NUM + NORMAL_SHARED_BUFFER_NUM + NVM_BUFFER_NUM)
#define BufferIdOfSegmentBuffer(id) ((id) + SegmentBufferStartID)
#define BufferIdOfNvmBuffer(id) ((id) + NvmBufferStartID)
#define IsSegmentBufferID(id) ((id) >= SegmentBufferStartID)
#define SharedBufferNumber (SegmentBufferStartID)
#define IsNvmBufferID(id) ((id) >= NvmBufferStartID && (id) < SegmentBufferStartID)
#define IsNormalBufferID(id) ((id) >= 0 && (id) < NvmBufferStartID)
#define USE_CKPT_THREAD_SYNC (!g_instance.attr.attr_storage.enableIncrementalCheckpoint || \
IsBootstrapProcessingMode() || \
@ -181,6 +186,8 @@ struct WritebackContext;
*/
#define BufferGetBlock(buffer) \
(BufferIsLocal(buffer) ? u_sess->storage_cxt.LocalBufferBlockPointers[-(buffer)-1] \
: IsNvmBufferID((buffer-1)) ? (Block)(t_thrd.storage_cxt.NvmBufferBlocks + ((Size)((uint)(buffer)-1-NvmBufferStartID)) * BLCKSZ) \
: IsSegmentBufferID(buffer-1) ? (Block)(t_thrd.storage_cxt.BufferBlocks + ((Size)((uint)(buffer)-NVM_BUFFER_NUM-1)) * BLCKSZ) \
: (Block)(t_thrd.storage_cxt.BufferBlocks + ((Size)((uint)(buffer)-1)) * BLCKSZ))
#define ADDR_IN_LOCAL_BUFFER_CONTENT(block) \
@ -194,6 +201,11 @@ struct WritebackContext;
(static_cast<const char *>(block) >= static_cast<const char *>(BufferGetBlock(1)) \
&& static_cast<const char *>(block) <= static_cast<const char *>(BufferGetBlock(TOTAL_BUFFER_NUM))))
#define ADDR_IN_NVM_BUFFER_CONTENT(block) \
( NVM_BUFFER_NUM > 0 && \
(static_cast<const char *>(block) >= static_cast<const char *>(BufferGetBlock(NvmBufferStartID + 1)) \
&& static_cast<const char *>(block) <= static_cast<const char *>(BufferGetBlock(SegmentBufferStartID))))
static inline Buffer BlockGetBuffer(const char *block)
{
if (ADDR_IN_LOCAL_BUFFER_CONTENT(block)) {
@ -201,7 +213,15 @@ static inline Buffer BlockGetBuffer(const char *block)
}
if (ADDR_IN_SHARED_BUFFER_CONTENT(block)) {
return 1 + ((block - (const char*)BufferGetBlock(1))/BLCKSZ);
Buffer buffer = 1 + ((block - (const char*)BufferGetBlock(1))/BLCKSZ);
if (buffer > NvmBufferStartID) {
buffer += NVM_BUFFER_NUM;
}
return buffer;
}
if (ADDR_IN_NVM_BUFFER_CONTENT(block)) {
return 1 + ((block - (const char*)BufferGetBlock(NvmBufferStartID + 1))/BLCKSZ) + NvmBufferStartID;
}
return InvalidBuffer;
@ -227,7 +247,10 @@ static inline Buffer BlockGetBuffer(const char *block)
#define BufferGetPage(buffer) ((Page)BufferGetBlock(buffer))
/* Note: these two macros only work on shared buffers, not local ones! */
#define BufHdrGetBlock(bufHdr) ((Block)(t_thrd.storage_cxt.BufferBlocks + ((Size)(uint32)(bufHdr)->buf_id) * BLCKSZ))
#define BufHdrGetBlock(bufHdr) \
(IsNvmBufferID((bufHdr)->buf_id) ? ((Block)(t_thrd.storage_cxt.NvmBufferBlocks + ((Size)(uint32)(bufHdr)->buf_id - NvmBufferStartID) * BLCKSZ)) \
: IsSegmentBufferID((bufHdr)->buf_id) ? ((Block)(t_thrd.storage_cxt.BufferBlocks + ((Size)(uint32)(bufHdr)->buf_id - NVM_BUFFER_NUM) * BLCKSZ)) \
: ((Block)(t_thrd.storage_cxt.BufferBlocks + ((Size)(uint32)(bufHdr)->buf_id) * BLCKSZ)))
#define BufferGetLSN(bufHdr) (PageGetLSN(BufHdrGetBlock(bufHdr)))
/* Note: this macro only works on local buffers, not shared ones! */
@ -260,7 +283,14 @@ extern void UnlockReleaseBuffer(Buffer buffer);
extern void MarkBufferDirty(Buffer buffer);
extern void IncrBufferRefCount(Buffer buffer);
extern Buffer ReleaseAndReadBuffer(Buffer buffer, Relation relation, BlockNumber blockNum);
void PageCheckIfCanEliminate(BufferDesc *buf, uint32 *oldFlags, bool *needGetLock);
#ifdef USE_ASSERT_CHECKING
void PageCheckWhenChosedElimination(const BufferDesc *buf, uint32 oldFlags);
#endif
uint32 WaitBufHdrUnlocked(BufferDesc* buf);
void WaitIO(BufferDesc *buf);
void InvalidateBuffer(BufferDesc *buf);
void LockTwoLWLock(LWLock *new_partition_lock, LWLock *old_partition_lock);
extern void InitBufferPool(void);
extern void pca_buf_init_ctx();

View File

@ -0,0 +1,34 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* nvm.h
*
*
* IDENTIFICATION
* src/include/storage/nvm/nvm.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef NVM_H
#define NVM_H
#include "storage/smgr/smgr.h"
void nvm_init(void);
BufferDesc *NvmBufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber fork_num,
BlockNumber block_num, BufferAccessStrategy strategy, bool *found, const XLogPhyBlock *pblk);
#endif

View File

@ -365,6 +365,9 @@ extern void assign_xlog_sync_method(int new_sync_method, void* extra);
/* in tcop/stmt_retry.cpp */
extern bool check_errcode_list(char** newval, void** extra, GucSource source);
/* in nvm.cpp */
extern bool check_nvm_path(char** newval, void** extra, GucSource source);
extern struct config_generic* find_option(const char* name, bool create_placeholders, int elevel);
/*
* Error code for config file

View File

@ -89,6 +89,8 @@ select name,vartype,unit,min_val,max_val from pg_settings where name <> 'qunit_c
block_size | integer | | 8192 | 8192
bulk_read_ring_size | integer | kB | 256 | 2147483647
bulk_write_ring_size | integer | kB | 16384 | 2147483647
bypass_dram | real | | 0 | 1
bypass_nvm | real | | 0 | 1
bytea_output | enum | | |
cache_connection | bool | | |
candidate_buf_percent_target | real | | 0.1 | 0.85
@ -276,6 +278,7 @@ select name,vartype,unit,min_val,max_val from pg_settings where name <> 'qunit_c
enable_mix_replication | bool | | |
enable_nestloop | bool | | |
enable_nonsysadmin_execute_direct | bool | | |
enable_nvm | bool | | |
enable_online_ddl_waitlock | bool | | |
enable_opfusion | bool | | |
enable_orc_cache | bool | | |
@ -474,6 +477,8 @@ select name,vartype,unit,min_val,max_val from pg_settings where name <> 'qunit_c
nls_timestamp_format | string | | |
numa_distribute_mode | string | | |
num_internal_lock_partitions | string | | |
nvm_buffers | integer | 8kB | 0 | 1073741823
nvm_file_path | string | | |
omit_encoding_error | bool | | |
operation_mode | bool | | |
opfusion_debug_mode | enum | | |