From fca1e8932a292d655c4f9fd0b3958fd453444058 Mon Sep 17 00:00:00 2001 From: Mijamind Date: Fri, 12 Aug 2022 17:57:53 +0800 Subject: [PATCH] nvm buffer manager --- .../backend/utils/misc/guc/guc_storage.cpp | 66 +- src/gausskernel/CMakeLists.txt | 1 + .../process/postmaster/bgwriter.cpp | 11 +- .../process/postmaster/pagewriter.cpp | 280 +++--- .../process/threadpool/knl_thread.cpp | 1 + src/gausskernel/storage/CMakeLists.txt | 2 + src/gausskernel/storage/Makefile | 2 +- src/gausskernel/storage/buffer/buf_init.cpp | 11 +- src/gausskernel/storage/buffer/buf_table.cpp | 5 - src/gausskernel/storage/buffer/bufmgr.cpp | 65 +- src/gausskernel/storage/buffer/freelist.cpp | 4 +- src/gausskernel/storage/nvm/CMakeLists.txt | 21 + src/gausskernel/storage/nvm/Makefile | 14 + src/gausskernel/storage/nvm/nvm.cpp | 56 ++ src/gausskernel/storage/nvm/nvmbuffer.cpp | 881 ++++++++++++++++++ .../storage/smgr/segment/segbuffer.cpp | 64 +- .../knl/knl_guc/knl_instance_attr_storage.h | 10 + src/include/knl/knl_instance.h | 2 + src/include/knl/knl_thread.h | 1 + src/include/postmaster/pagewriter.h | 45 +- src/include/storage/buf/buf_internals.h | 8 +- src/include/storage/buf/bufmgr.h | 44 +- src/include/storage/nvm/nvm.h | 34 + 23 files changed, 1320 insertions(+), 308 deletions(-) create mode 100755 src/gausskernel/storage/nvm/CMakeLists.txt create mode 100644 src/gausskernel/storage/nvm/Makefile create mode 100644 src/gausskernel/storage/nvm/nvm.cpp create mode 100644 src/gausskernel/storage/nvm/nvmbuffer.cpp create mode 100644 src/include/storage/nvm/nvm.h diff --git a/src/common/backend/utils/misc/guc/guc_storage.cpp b/src/common/backend/utils/misc/guc/guc_storage.cpp index 4ce320051..cde0c256e 100755 --- a/src/common/backend/utils/misc/guc/guc_storage.cpp +++ b/src/common/backend/utils/misc/guc/guc_storage.cpp @@ -903,6 +903,19 @@ static void InitStorageConfigureNamesBool() NULL, NULL, NULL}, + + {{"enable_nvm", + PGC_POSTMASTER, + NODE_ALL, + DEVELOPER_OPTIONS, + gettext_noop("Enable nvm buffer manager."), + NULL}, + &g_instance.attr.attr_storage.nvm_attr.enable_nvm, + false, + NULL, + NULL, + NULL}, + #ifdef USE_ASSERT_CHECKING {{"enable_segment", PGC_SIGHUP, @@ -1037,6 +1050,20 @@ static void InitStorageConfigureNamesInt() NULL, NULL, NULL}, + {{"nvm_buffers", + PGC_POSTMASTER, + NODE_ALL, + RESOURCES_MEM, + gettext_noop("Sets the number of shared memory buffers used by the server."), + NULL, + GUC_UNIT_BLOCKS}, + &g_instance.attr.attr_storage.NNvmBuffers, + 0, + 0, + INT_MAX / 2, + NULL, + NULL, + NULL}, {{"segment_buffers", PGC_POSTMASTER, NODE_ALL, @@ -3334,7 +3361,32 @@ static void InitStorageConfigureNamesReal() NULL, NULL, NULL}, - + {{"bypass_dram", + PGC_SIGHUP, + NODE_ALL, + DEVELOPER_OPTIONS, + gettext_noop("bypass_dram."), + NULL}, + &g_instance.attr.attr_storage.nvm_attr.bypassDram, + 0.01, + 0.0, + 1.0, + NULL, + NULL, + NULL}, + {{"bypass_nvm", + PGC_SIGHUP, + NODE_ALL, + DEVELOPER_OPTIONS, + gettext_noop("bypass_nvm."), + NULL}, + &g_instance.attr.attr_storage.nvm_attr.bypassNvm, + 0.5, + 0.0, + 1.0, + NULL, + NULL, + NULL}, /* End-of-list marker */ {{NULL, (GucContext)0, @@ -3521,6 +3573,18 @@ static void InitStorageConfigureNamesString() NULL, NULL}, + {{"nvm_file_path", + PGC_POSTMASTER, + NODE_SINGLENODE, + DEVELOPER_OPTIONS, + gettext_noop("Sets file path of nvm."), + NULL}, + &g_instance.attr.attr_storage.nvm_attr.nvm_file_path, + "", + NULL, + NULL, + NULL}, + #ifndef ENABLE_MULTIPLE_NODES {{"dcf_config", PGC_POSTMASTER, diff --git a/src/gausskernel/CMakeLists.txt b/src/gausskernel/CMakeLists.txt index 491cf45ee..70ab73078 100755 --- a/src/gausskernel/CMakeLists.txt +++ b/src/gausskernel/CMakeLists.txt @@ -181,6 +181,7 @@ list(APPEND gaussdb_objects $ $ $ + $ $ $ $ diff --git a/src/gausskernel/process/postmaster/bgwriter.cpp b/src/gausskernel/process/postmaster/bgwriter.cpp index 1c8edde45..3f4cb22f5 100755 --- a/src/gausskernel/process/postmaster/bgwriter.cpp +++ b/src/gausskernel/process/postmaster/bgwriter.cpp @@ -115,7 +115,7 @@ static void setup_bgwriter_signalhook(void) sigdelset(&t_thrd.libpq_cxt.BlockSig, SIGQUIT); } -static void bgwriter_handle_exceptions(WritebackContext wb_context, MemoryContext bgwriter_cxt) +static void bgwriter_handle_exceptions(WritebackContext *wb_context, MemoryContext bgwriter_cxt) { /* * Close all open files after any error. This is helpful on Windows, @@ -167,7 +167,7 @@ static void bgwriter_handle_exceptions(WritebackContext wb_context, MemoryContex MemoryContextResetAndDeleteChildren(bgwriter_cxt); /* re-initialize to avoid repeated errors causing problems */ - WritebackContextInit(&wb_context, &u_sess->attr.attr_storage.bgwriter_flush_after); + WritebackContextInit(wb_context, &u_sess->attr.attr_storage.bgwriter_flush_after); /* Now we can allow interrupts again */ RESUME_INTERRUPTS(); @@ -236,7 +236,7 @@ void BackgroundWriterMain(void) int* oldTryCounter = NULL; if (sigsetjmp(local_sigjmp_buf, 1) != 0) { gstrace_tryblock_exit(true, oldTryCounter); - bgwriter_handle_exceptions(wb_context, bgwriter_context); + bgwriter_handle_exceptions(&wb_context, bgwriter_context); /* Report wait end here, when there is no further possibility of wait */ pgstat_report_waitevent(WAIT_EVENT_END); @@ -513,7 +513,8 @@ Datum bgwriter_view_get_last_flush_num() Datum bgwriter_view_get_candidate_nums() { - int candidate_num = get_curr_candidate_nums(true) + get_curr_candidate_nums(false); + int candidate_num = get_curr_candidate_nums(CAND_LIST_NORMAL) + get_curr_candidate_nums(CAND_LIST_NVM) + + get_curr_candidate_nums(CAND_LIST_SEG); return Int32GetDatum(candidate_num); } @@ -569,7 +570,7 @@ void invalid_buffer_bgwriter_main() if (sigsetjmp(localSigjmpBuf, 1) != 0) { ereport(WARNING, (errmsg("invalidate buffer bgwriter exception occured."))); - bgwriter_handle_exceptions(wb_context, bgwriter_context); + bgwriter_handle_exceptions(&wb_context, bgwriter_context); } /* We can now handle ereport(ERROR) */ diff --git a/src/gausskernel/process/postmaster/pagewriter.cpp b/src/gausskernel/process/postmaster/pagewriter.cpp index 7baf220ed..b0951d023 100755 --- a/src/gausskernel/process/postmaster/pagewriter.cpp +++ b/src/gausskernel/process/postmaster/pagewriter.cpp @@ -82,18 +82,16 @@ static void ckpt_try_prune_dirty_page_queue(); /* candidate buffer list handle function */ static uint32 calculate_pagewriter_flush_num(); -static void candidate_buf_push(int buf_id, int thread_id); -static void seg_candidate_buf_push(int buf_id, int thread_id); +static void candidate_buf_push(CandidateList *list, int buf_id); static void init_candidate_list(); -static uint32 incre_ckpt_pgwr_flush_dirty_page(WritebackContext wb_context, +static uint32 incre_ckpt_pgwr_flush_dirty_page(WritebackContext *wb_context, const CkptSortItem *dirty_buf_list, int start, int batch_num); -static void incre_ckpt_pgwr_flush_dirty_queue(WritebackContext wb_context); -static void incre_ckpt_pgwr_scan_buf_pool(WritebackContext wb_context); +static void incre_ckpt_pgwr_flush_dirty_queue(WritebackContext *wb_context); +static void incre_ckpt_pgwr_scan_buf_pool(WritebackContext *wb_context); static void push_to_candidate_list(BufferDesc *buf_desc); static uint32 get_candidate_buf_and_flush_list(uint32 start, uint32 end, uint32 max_flush_num, bool *contain_hashbucket); -static int64 get_thread_candidate_nums(int thread_id); -static int64 get_thread_seg_candidate_nums(int thread_id); +static int64 get_thread_candidate_nums(CandidateList *list); const int XLOG_LSN_SWAP = 32; Datum ckpt_view_get_node_name() @@ -208,7 +206,7 @@ Datum ckpt_view_get_twophase_flush_num() Datum ckpt_view_get_candidate_nums() { - int candidate_num = get_curr_candidate_nums(false); + int candidate_num = get_curr_candidate_nums(CAND_LIST_NORMAL); return Int32GetDatum(candidate_num); } @@ -224,7 +222,7 @@ Datum ckpt_view_get_num_clock_sweep() Datum ckpt_view_get_seg_candidate_nums() { - int candidate_num = get_curr_candidate_nums(true); + int candidate_num = get_curr_candidate_nums(CAND_LIST_SEG); return Int32GetDatum(candidate_num); } @@ -352,45 +350,38 @@ void candidate_buf_init(void) static void init_candidate_list() { int thread_num = g_instance.ckpt_cxt_ctl->pgwr_procs.sub_num; - int normal_avg_num = SharedBufferNumber / thread_num; - int seg_avr_num = SEGMENT_BUFFER_NUM / thread_num; + int normal_avg_num = NORMAL_SHARED_BUFFER_NUM / thread_num; + int nvm_avg_num = NVM_BUFFER_NUM / thread_num; + int seg_avg_num = SEGMENT_BUFFER_NUM / thread_num; PageWriterProc *pgwr = NULL; + Buffer *cand_buffers = g_instance.ckpt_cxt_ctl->candidate_buffers; /* Init main thread, the candidate list only store segment buffer */ pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[0]; - pgwr->cand_buf_list = NULL; - pgwr->cand_list_size = 0; - pgwr->head = 0; - pgwr->tail = 0; - pgwr->seg_head = 0; - pgwr->seg_tail = 0; - pgwr->seg_cand_list_size = 0; - pgwr->seg_cand_buf_list = NULL; + INIT_CANDIDATE_LIST(pgwr->normal_list, NULL, 0, 0, 0); + INIT_CANDIDATE_LIST(pgwr->nvm_list, NULL, 0, 0, 0); + INIT_CANDIDATE_LIST(pgwr->seg_list, NULL, 0, 0, 0); for (int i = 1; i <= thread_num; i++) { pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[i]; int start = normal_avg_num * (i - 1); int end = start + normal_avg_num; - int seg_start = SegmentBufferStartID + seg_avr_num * (i - 1); - int seg_end = seg_start + seg_avr_num; + int nvm_start = NvmBufferStartID + nvm_avg_num * (i - 1); + int nvm_end = nvm_start + nvm_avg_num; + int seg_start = SegmentBufferStartID + seg_avg_num * (i - 1); + int seg_end = seg_start + seg_avg_num; if (i == thread_num) { - end += SharedBufferNumber % thread_num; + end += NORMAL_SHARED_BUFFER_NUM % thread_num; + nvm_end += NVM_BUFFER_NUM % thread_num; seg_end += SEGMENT_BUFFER_NUM % thread_num; } - /* init normal candidat list */ - pgwr->head = 0; - pgwr->tail = 0; - pgwr->buf_id_start = start; - pgwr->cand_list_size = end - start; - pgwr->cand_buf_list = &g_instance.ckpt_cxt_ctl->candidate_buffers[start]; - - /* init segment candidat list */ - pgwr->seg_head = 0; - pgwr->seg_tail = 0; - pgwr->seg_cand_list_size = seg_end - seg_start; - pgwr->seg_cand_buf_list = &g_instance.ckpt_cxt_ctl->candidate_buffers[seg_start]; - pgwr->seg_id_start = seg_start; + INIT_CANDIDATE_LIST(pgwr->normal_list, &cand_buffers[start], end - start, 0, 0); + INIT_CANDIDATE_LIST(pgwr->nvm_list, &cand_buffers[nvm_start], nvm_end - nvm_start, 0, 0); + INIT_CANDIDATE_LIST(pgwr->seg_list, &cand_buffers[seg_start], seg_end - seg_start, 0, 0); + pgwr->normal_list.buf_id_start = start; + pgwr->nvm_list.buf_id_start = nvm_start; + pgwr->seg_list.buf_id_start = seg_start; } } @@ -1000,7 +991,7 @@ static uint32 calculate_pagewriter_flush_num() counter = 0; } - dirty_page_pct = g_instance.ckpt_cxt_ctl->actual_dirty_page_num / (float)(g_instance.attr.attr_storage.NBuffers); + dirty_page_pct = g_instance.ckpt_cxt_ctl->actual_dirty_page_num / (float)(SegmentBufferStartID); dirty_slot_pct = get_dirty_page_num() / (float)(g_instance.ckpt_cxt_ctl->dirty_page_queue_size); dirty_percent = MAX(dirty_page_pct, dirty_slot_pct) / u_sess->attr.attr_storage.dirty_page_percent_max; @@ -1267,7 +1258,8 @@ static void ckpt_pagewriter_main_thread_loop(void) HandlePageWriterMainInterrupts(); - candidate_num = get_curr_candidate_nums(false) + get_curr_candidate_nums(true); + candidate_num = get_curr_candidate_nums(CAND_LIST_NORMAL) + get_curr_candidate_nums(CAND_LIST_NVM) + + get_curr_candidate_nums(CAND_LIST_SEG); while (get_dirty_page_num() == 0 && candidate_num == (uint32)TOTAL_BUFFER_NUM && !t_thrd.pagewriter_cxt.shutdown_requested) { rc = WaitLatch(&t_thrd.proc->procLatch, WL_TIMEOUT | WL_POSTMASTER_DEATH, (long)TEN_MILLISECOND); @@ -1277,7 +1269,8 @@ static void ckpt_pagewriter_main_thread_loop(void) HandlePageWriterMainInterrupts(); - candidate_num = get_curr_candidate_nums(false) + get_curr_candidate_nums(true); + candidate_num = get_curr_candidate_nums(CAND_LIST_NORMAL) + get_curr_candidate_nums(CAND_LIST_NVM) + + get_curr_candidate_nums(CAND_LIST_SEG); if (candidate_num == 0) { /* wakeup sub thread scan the buffer pool, init the candidate list */ wakeup_sub_thread(); @@ -1390,7 +1383,7 @@ static void ckpt_pagewriter_sub_thread_loop() /* scan buffer pool, get flush list and candidate list */ now = get_time_ms(); if (t_thrd.pagewriter_cxt.next_scan_time <= now) { - incre_ckpt_pgwr_scan_buf_pool(wb_context); + incre_ckpt_pgwr_scan_buf_pool(&wb_context); now = get_time_ms(); t_thrd.pagewriter_cxt.next_scan_time = now + MAX(u_sess->attr.attr_storage.BgWriterDelay, u_sess->attr.attr_storage.pageWriterSleep); @@ -1412,7 +1405,7 @@ static void ckpt_pagewriter_sub_thread_loop() /* flush one batch dirty pages */ ResourceOwnerEnlargeBuffers(t_thrd.utils_cxt.CurrentResourceOwner); - incre_ckpt_pgwr_flush_dirty_queue(wb_context); + incre_ckpt_pgwr_flush_dirty_queue(&wb_context); /* add up completed pages */ completed_pages = pg_atomic_add_fetch_u32( @@ -1923,7 +1916,7 @@ static void ckpt_try_skip_invalid_elem_in_queue_head() return; } -static uint32 incre_ckpt_pgwr_flush_dirty_page(WritebackContext wb_context, +static uint32 incre_ckpt_pgwr_flush_dirty_page(WritebackContext *wb_context, const CkptSortItem *dirty_buf_list, int start, int batch_num) { uint32 num_actual_flush = 0; @@ -1943,7 +1936,7 @@ static uint32 incre_ckpt_pgwr_flush_dirty_page(WritebackContext wb_context, if ((buf_state & BM_CHECKPOINT_NEEDED) && (buf_state & BM_DIRTY)) { UnlockBufHdr(buf_desc, buf_state); - sync_state = SyncOneBuffer(buf_id, false, &wb_context, true); + sync_state = SyncOneBuffer(buf_id, false, wb_context, true); if ((sync_state & BUF_WRITTEN)) { num_actual_flush++; } @@ -1954,7 +1947,7 @@ static uint32 incre_ckpt_pgwr_flush_dirty_page(WritebackContext wb_context, return num_actual_flush; } -static void incre_ckpt_pgwr_flush_dirty_queue(WritebackContext wb_context) +static void incre_ckpt_pgwr_flush_dirty_queue(WritebackContext *wb_context) { int thread_id = t_thrd.pagewriter_cxt.pagewriter_id; PageWriterProc* pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id]; @@ -1993,7 +1986,7 @@ static void incre_ckpt_pgwr_flush_dirty_queue(WritebackContext wb_context) return; } -static void incre_ckpt_pgwr_flush_dirty_list(WritebackContext wb_context, uint32 need_flush_num, +static void incre_ckpt_pgwr_flush_dirty_list(WritebackContext *wb_context, uint32 need_flush_num, bool is_new_relfilenode) { int thread_id = t_thrd.pagewriter_cxt.pagewriter_id; @@ -2082,15 +2075,18 @@ static bool check_buffer_dirty_flag(BufferDesc* buf_desc) return false; } -static uint32 get_list_flush_max_num(bool is_segment) +static uint32 get_list_flush_max_num(CandListType type) { int thread_num = g_instance.ckpt_cxt_ctl->pgwr_procs.sub_num; uint32 max_io = g_instance.ckpt_cxt_ctl->pgwr_procs.list_flush_max / thread_num; uint32 dirty_list_size = MAX_DIRTY_LIST_FLUSH_NUM / thread_num; - if (is_segment) { + if (type == CAND_LIST_SEG) { double seg_percent = ((double)(SEGMENT_BUFFER_NUM) / (double)(TOTAL_BUFFER_NUM)); max_io = max_io * seg_percent; + } else if (type == CAND_LIST_NVM) { + double nvm_percent = ((double)(NVM_BUFFER_NUM) / (double)(TOTAL_BUFFER_NUM)); + max_io = max_io * nvm_percent; } else { double buffer_percent = ((double)(NORMAL_SHARED_BUFFER_NUM) / (double)(TOTAL_BUFFER_NUM)); max_io = max_io * buffer_percent; @@ -2102,7 +2098,7 @@ static uint32 get_list_flush_max_num(bool is_segment) } const float GAP_PERCENT = 0.15; -static uint32 get_list_flush_num(bool is_segment) +static uint32 get_list_flush_num(CandListType type) { double percent_target = u_sess->attr.attr_storage.candidate_buf_percent_target; uint32 cur_candidate_num; @@ -2110,11 +2106,18 @@ static uint32 get_list_flush_num(bool is_segment) uint32 high_water_mark; uint32 flush_num = 0; uint32 min_io = DW_DIRTY_PAGE_MAX_FOR_NOHBK; - uint32 max_io = get_list_flush_max_num(is_segment); - uint32 buffer_num = (is_segment ? SEGMENT_BUFFER_NUM : NORMAL_SHARED_BUFFER_NUM); + uint32 max_io = get_list_flush_max_num(type); + uint32 buffer_num; + if (type == CAND_LIST_SEG) { + buffer_num = SEGMENT_BUFFER_NUM; + } else if (type == CAND_LIST_NVM) { + buffer_num = NVM_BUFFER_NUM; + } else { + buffer_num = NORMAL_SHARED_BUFFER_NUM; + } total_target = buffer_num * percent_target; high_water_mark = buffer_num * (percent_target / HIGH_WATER); - cur_candidate_num = get_curr_candidate_nums(is_segment); + cur_candidate_num = get_curr_candidate_nums(type); /* If the slots are sufficient, the standby DN does not need to flush too many pages. */ if (RecoveryInProgress() && cur_candidate_num >= total_target / 2) { @@ -2144,10 +2147,9 @@ static uint32 get_list_flush_num(bool is_segment) * then scan the segment buffer pool. */ const int MAX_SCAN_BATCH_NUM = 131072 * 10; /* 10GB buffers */ -static void incre_ckpt_pgwr_scan_buf_pool(WritebackContext wb_context) + +static void incre_ckpt_pgwr_scan_candidate_list(WritebackContext *wb_context, CandidateList *list, CandListType type) { - int thread_id = t_thrd.pagewriter_cxt.pagewriter_id; - PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id]; uint32 need_flush_num = 0; int start = 0; int end = 0; @@ -2155,43 +2157,34 @@ static void incre_ckpt_pgwr_scan_buf_pool(WritebackContext wb_context) int batch_scan_num = 0; uint32 max_flush_num = 0; - /* handle the normal buffer pool */ - if (get_thread_candidate_nums(thread_id) < pgwr->cand_list_size) { - start = MAX(pgwr->buf_id_start, pgwr->next_scan_normal_loc); - end = pgwr->buf_id_start + pgwr->cand_list_size; - batch_scan_num = MIN(pgwr->cand_list_size, MAX_SCAN_BATCH_NUM); + if (get_thread_candidate_nums(list) < list->cand_list_size) { + start = MAX(list->buf_id_start, list->next_scan_loc); + end = list->buf_id_start + list->cand_list_size; + batch_scan_num = MIN(list->cand_list_size, MAX_SCAN_BATCH_NUM); end = MIN(start + batch_scan_num, end); - max_flush_num = get_list_flush_num(false); + max_flush_num = get_list_flush_num(type); need_flush_num = get_candidate_buf_and_flush_list(start, end, max_flush_num, &is_new_relfilenode); - if (end >= pgwr->buf_id_start + pgwr->cand_list_size) { - pgwr->next_scan_normal_loc = pgwr->buf_id_start; + if (end >= list->buf_id_start + list->cand_list_size) { + list->next_scan_loc = list->buf_id_start; } else { - pgwr->next_scan_normal_loc = end; + list->next_scan_loc = end; } if (need_flush_num > 0) { incre_ckpt_pgwr_flush_dirty_list(wb_context, need_flush_num, is_new_relfilenode); } } +} - /* handle the segment buffer pool */ - if (get_thread_seg_candidate_nums(thread_id) < pgwr->seg_cand_list_size) { - start = MAX(pgwr->seg_id_start, pgwr->next_scan_seg_loc); - end = pgwr->seg_id_start + pgwr->seg_cand_list_size; - batch_scan_num = MIN(pgwr->seg_cand_list_size, MAX_SCAN_BATCH_NUM); - end = MIN(start + batch_scan_num, end); - max_flush_num = get_list_flush_num(true); +static void incre_ckpt_pgwr_scan_buf_pool(WritebackContext *wb_context) +{ + int thread_id = t_thrd.pagewriter_cxt.pagewriter_id; + PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id]; - need_flush_num = get_candidate_buf_and_flush_list(start, end, max_flush_num, &is_new_relfilenode); - if (end >= pgwr->seg_id_start + pgwr->seg_cand_list_size) { - pgwr->next_scan_seg_loc = pgwr->seg_id_start; - } else { - pgwr->next_scan_seg_loc = end; - } - if (need_flush_num > 0) { - incre_ckpt_pgwr_flush_dirty_list(wb_context, need_flush_num, is_new_relfilenode); - } - } + /* handle the normal\nvm\segment buffer pool */ + incre_ckpt_pgwr_scan_candidate_list(wb_context, &pgwr->normal_list, CAND_LIST_NORMAL); + incre_ckpt_pgwr_scan_candidate_list(wb_context, &pgwr->nvm_list, CAND_LIST_NVM); + incre_ckpt_pgwr_scan_candidate_list(wb_context, &pgwr->seg_list, CAND_LIST_SEG); return; } @@ -2254,10 +2247,12 @@ static uint32 get_candidate_buf_and_flush_list(uint32 start, uint32 end, uint32 /* Not dirty, put directly into flushed candidates */ if (!(local_buf_state & BM_DIRTY)) { if (g_instance.ckpt_cxt_ctl->candidate_free_map[buf_id] == false) { - if (buf_id < (uint32)SegmentBufferStartID) { - candidate_buf_push(buf_id, thread_id); + if (buf_id < (uint32)NvmBufferStartID) { + candidate_buf_push(&pgwr->normal_list, buf_id); + } else if (buf_id < (uint32)SegmentBufferStartID) { + candidate_buf_push(&pgwr->nvm_list, buf_id); } else { - seg_candidate_buf_push(buf_id, thread_id); + candidate_buf_push(&pgwr->seg_list, buf_id); } g_instance.ckpt_cxt_ctl->candidate_free_map[buf_id] = true; candidates++; @@ -2299,6 +2294,7 @@ UNLOCK: static void push_to_candidate_list(BufferDesc *buf_desc) { uint32 thread_id = t_thrd.pagewriter_cxt.pagewriter_id; + PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id]; int buf_id = buf_desc->buf_id; uint32 buf_state = pg_atomic_read_u32(&buf_desc->state); bool emptyUsageCount = (!NEED_CONSIDER_USECOUNT || BUF_STATE_GET_USAGECOUNT(buf_state) == 0); @@ -2312,10 +2308,12 @@ static void push_to_candidate_list(BufferDesc *buf_desc) if (g_instance.ckpt_cxt_ctl->candidate_free_map[buf_id] == false) { emptyUsageCount = (!NEED_CONSIDER_USECOUNT || BUF_STATE_GET_USAGECOUNT(buf_state) == 0); if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 && emptyUsageCount && !(buf_state & BM_DIRTY)) { - if (buf_id < SegmentBufferStartID) { - candidate_buf_push(buf_id, thread_id); + if (buf_id < NvmBufferStartID) { + candidate_buf_push(&pgwr->normal_list, buf_id); + } else if (buf_id < SegmentBufferStartID) { + candidate_buf_push(&pgwr->nvm_list, buf_id); } else { - seg_candidate_buf_push(buf_id, thread_id); + candidate_buf_push(&pgwr->seg_list, buf_id); } g_instance.ckpt_cxt_ctl->candidate_free_map[buf_id] = true; } @@ -2331,44 +2329,23 @@ static void push_to_candidate_list(BufferDesc *buf_desc) * @in: buf_id, buffer id which need push to the list * @in: thread_id, pagewriter thread id. */ -static void candidate_buf_push(int buf_id, int thread_id) +static void candidate_buf_push(CandidateList *list, int buf_id) { - PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id]; - uint32 list_size = pgwr->cand_list_size; + uint32 list_size = list->cand_list_size; uint32 tail_loc; pg_memory_barrier(); - volatile uint64 head = pg_atomic_read_u64(&pgwr->head); + volatile uint64 head = pg_atomic_read_u64(&list->head); pg_memory_barrier(); - volatile uint64 tail = pg_atomic_read_u64(&pgwr->tail); + volatile uint64 tail = pg_atomic_read_u64(&list->tail); if (unlikely(tail - head >= list_size)) { Assert(0); return; } tail_loc = tail % list_size; - pgwr->cand_buf_list[tail_loc] = buf_id; - (void)pg_atomic_fetch_add_u64(&pgwr->tail, 1); -} - -static void seg_candidate_buf_push(int buf_id, int thread_id) -{ - PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id]; - uint32 list_size = pgwr->seg_cand_list_size; - uint32 tail_loc; - - pg_memory_barrier(); - volatile uint64 head = pg_atomic_read_u64(&pgwr->seg_head); - pg_memory_barrier(); - volatile uint64 tail = pg_atomic_read_u64(&pgwr->seg_tail); - - if (unlikely(tail - head >= list_size)) { - Assert(0); - return; - } - tail_loc = tail % list_size; - pgwr->seg_cand_buf_list[tail_loc] = buf_id; - (void)pg_atomic_fetch_add_u64(&pgwr->seg_tail, 1); + list->cand_buf_list[tail_loc] = buf_id; + (void)pg_atomic_fetch_add_u64(&list->tail, 1); } /** @@ -2376,71 +2353,34 @@ static void seg_candidate_buf_push(int buf_id, int thread_id) * @in: buf_id, store the buffer id from the list. * @in: thread_id, pagewriter thread id */ -bool candidate_buf_pop(int *buf_id, int thread_id) +bool candidate_buf_pop(CandidateList *list, int *buf_id) { - PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id]; - uint32 list_size = pgwr->cand_list_size; + uint32 list_size = list->cand_list_size; uint32 head_loc; while (true) { pg_memory_barrier(); - uint64 head = pg_atomic_read_u64(&pgwr->head); + uint64 head = pg_atomic_read_u64(&list->head); pg_memory_barrier(); - volatile uint64 tail = pg_atomic_read_u64(&pgwr->tail); + volatile uint64 tail = pg_atomic_read_u64(&list->tail); if (unlikely(head >= tail)) { return false; /* candidate list is empty */ } head_loc = head % list_size; - *buf_id = pgwr->cand_buf_list[head_loc]; - if (pg_atomic_compare_exchange_u64(&pgwr->head, &head, head + 1)) { + *buf_id = list->cand_buf_list[head_loc]; + if (pg_atomic_compare_exchange_u64(&list->head, &head, head + 1)) { return true; } } } -bool seg_candidate_buf_pop(int *buf_id, int thread_id) +static int64 get_thread_candidate_nums(CandidateList *list) { - PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id]; - uint32 list_size = pgwr->seg_cand_list_size; - uint32 head_loc; - - while (true) { - pg_memory_barrier(); - uint64 head = pg_atomic_read_u64(&pgwr->seg_head); - pg_memory_barrier(); - volatile uint64 tail = pg_atomic_read_u64(&pgwr->seg_tail); - - if (unlikely(head >= tail)) { - return false; /* candidate list is empty */ - } - - head_loc = head % list_size; - *buf_id = pgwr->seg_cand_buf_list[head_loc]; - if (pg_atomic_compare_exchange_u64(&pgwr->seg_head, &head, head + 1)) { - return true; - } - } -} - -static int64 get_thread_candidate_nums(int thread_id) -{ - PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id]; - volatile uint64 head = pg_atomic_read_u64(&pgwr->head); + volatile uint64 head = pg_atomic_read_u64(&list->head); pg_memory_barrier(); - volatile uint64 tail = pg_atomic_read_u64(&pgwr->tail); - int64 curr_cand_num = tail - head; - Assert(curr_cand_num >= 0); - return curr_cand_num; -} - -static int64 get_thread_seg_candidate_nums(int thread_id) -{ - PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id]; - volatile uint64 head = pg_atomic_read_u64(&pgwr->seg_head); - pg_memory_barrier(); - volatile uint64 tail = pg_atomic_read_u64(&pgwr->seg_tail); + volatile uint64 tail = pg_atomic_read_u64(&list->tail); int64 curr_cand_num = tail - head; Assert(curr_cand_num >= 0); return curr_cand_num; @@ -2449,25 +2389,25 @@ static int64 get_thread_seg_candidate_nums(int thread_id) /** * @Description: Return a rough estimate of the current number of buffers in the candidate list. */ -uint32 get_curr_candidate_nums(bool segment) +uint32 get_curr_candidate_nums(CandListType type) { uint32 currCandidates = 0; PageWriterProc *pgwr = NULL; - - if (segment) { - for (int i = 1; i < g_instance.ckpt_cxt_ctl->pgwr_procs.num; i++) { - pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[i]; - if (pgwr->proc != NULL) { - currCandidates += get_thread_seg_candidate_nums(i); - } - } - return currCandidates; - } + CandidateList *list = NULL; for (int i = 1; i < g_instance.ckpt_cxt_ctl->pgwr_procs.num; i++) { pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[i]; + + if (type == CAND_LIST_NORMAL) { + list = &pgwr->normal_list; + } else if (type == CAND_LIST_NVM) { + list = &pgwr->nvm_list; + } else { + list = &pgwr->seg_list; + } + if (pgwr->proc != NULL) { - currCandidates += get_thread_candidate_nums(i); + currCandidates += get_thread_candidate_nums(list); } } return currCandidates; diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index 9dd53d605..cb5a4de9c 100755 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -1290,6 +1290,7 @@ static void knl_t_storage_init(knl_t_storage_context* storage_cxt) storage_cxt->BufferDescriptors = NULL; storage_cxt->BufferBlocks = NULL; + storage_cxt->NvmBufferBlocks = NULL; storage_cxt->BackendWritebackContext = (WritebackContext*)palloc0(sizeof(WritebackContext)); storage_cxt->SharedBufHash = NULL; storage_cxt->InProgressBuf = NULL; diff --git a/src/gausskernel/storage/CMakeLists.txt b/src/gausskernel/storage/CMakeLists.txt index a7d93a8d5..a47dfe864 100755 --- a/src/gausskernel/storage/CMakeLists.txt +++ b/src/gausskernel/storage/CMakeLists.txt @@ -13,6 +13,7 @@ set(CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/replication ${CMAKE_CURRENT_SOURCE_DIR}/sync ${CMAKE_CURRENT_SOURCE_DIR}/dfs + ${CMAKE_CURRENT_SOURCE_DIR}/nvm ${CMAKE_CURRENT_SOURCE_DIR}/file ${CMAKE_CURRENT_SOURCE_DIR}/freespace ${CMAKE_CURRENT_SOURCE_DIR}/ipc @@ -42,6 +43,7 @@ add_subdirectory(cstore) add_subdirectory(replication) add_subdirectory(sync) add_subdirectory(dfs) +add_subdirectory(nvm) add_subdirectory(file) add_subdirectory(freespace) add_subdirectory(ipc) diff --git a/src/gausskernel/storage/Makefile b/src/gausskernel/storage/Makefile index d310747e1..83cf5f43b 100644 --- a/src/gausskernel/storage/Makefile +++ b/src/gausskernel/storage/Makefile @@ -25,7 +25,7 @@ subdir = src/gausskernel/storage top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -SUBDIRS = access bulkload replication buffer cmgr cstore dfs file freespace ipc large_object lmgr page remote smgr tcap sync dorado_operation xlog_share_storage +SUBDIRS = access bulkload replication buffer cmgr cstore dfs nvm file freespace ipc large_object lmgr page remote smgr tcap sync dorado_operation xlog_share_storage ifeq ($(enable_mot), yes) SUBDIRS += mot diff --git a/src/gausskernel/storage/buffer/buf_init.cpp b/src/gausskernel/storage/buffer/buf_init.cpp index 6956b35d7..b71b09fc1 100644 --- a/src/gausskernel/storage/buffer/buf_init.cpp +++ b/src/gausskernel/storage/buffer/buf_init.cpp @@ -14,6 +14,7 @@ * ------------------------------------------------------------------------- */ #include "storage/dfs/dfscache_mgr.h" +#include "storage/nvm/nvm.h" #include "postgres.h" #include "knl/knl_variable.h" @@ -82,14 +83,18 @@ void InitBufferPool(void) candidate_buf_init(); #ifdef __aarch64__ - buffer_size = TOTAL_BUFFER_NUM * (Size)BLCKSZ + PG_CACHE_LINE_SIZE; + buffer_size = (TOTAL_BUFFER_NUM - NVM_BUFFER_NUM) * (Size)BLCKSZ + PG_CACHE_LINE_SIZE; t_thrd.storage_cxt.BufferBlocks = (char *)CACHELINEALIGN(ShmemInitStruct("Buffer Blocks", buffer_size, &found_bufs)); #else - buffer_size = TOTAL_BUFFER_NUM * (Size)BLCKSZ; + buffer_size = (TOTAL_BUFFER_NUM - NVM_BUFFER_NUM) * (Size)BLCKSZ; t_thrd.storage_cxt.BufferBlocks = (char *)ShmemInitStruct("Buffer Blocks", buffer_size, &found_bufs); #endif + if (g_instance.attr.attr_storage.nvm_attr.enable_nvm) { + nvm_init(); + } + if (BBOX_BLACKLIST_SHARE_BUFFER) { /* Segment Buffer is exclued from the black list, as it contains many critical information for debug */ bbox_blacklist_add(SHARED_BUFFER, t_thrd.storage_cxt.BufferBlocks, NORMAL_SHARED_BUFFER_NUM * (Size)BLCKSZ); @@ -185,7 +190,7 @@ Size BufferShmemSize(void) size = add_size(size, PG_CACHE_LINE_SIZE); /* size of data pages */ - size = add_size(size, mul_size(TOTAL_BUFFER_NUM, BLCKSZ)); + size = add_size(size, mul_size((NORMAL_SHARED_BUFFER_NUM + SEGMENT_BUFFER_NUM), BLCKSZ)); #ifdef __aarch64__ size = add_size(size, PG_CACHE_LINE_SIZE); #endif diff --git a/src/gausskernel/storage/buffer/buf_table.cpp b/src/gausskernel/storage/buffer/buf_table.cpp index 2d69f0fd0..04e79f473 100644 --- a/src/gausskernel/storage/buffer/buf_table.cpp +++ b/src/gausskernel/storage/buffer/buf_table.cpp @@ -30,11 +30,6 @@ #include "gstrace/storage_gstrace.h" extern uint32 hashquickany(uint32 seed, register const unsigned char *data, register int len); -/* entry for buffer lookup hashtable */ -typedef struct { - BufferTag key; /* Tag of a disk page */ - int id; /* Associated buffer ID */ -} BufferLookupEnt; /* * Estimate space needed for mapping hashtable diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index 28efb9167..1cafb6d48 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -62,6 +62,7 @@ #include "storage/ipc.h" #include "storage/proc.h" #include "storage/smgr/segment.h" +#include "storage/nvm/nvm.h" #include "storage/standby.h" #include "utils/aiomem.h" #include "utils/guc.h" @@ -354,8 +355,6 @@ void ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref) } static void BufferSync(int flags); -static uint32 WaitBufHdrUnlocked(BufferDesc* buf); -static void WaitIO(BufferDesc* buf); static void TerminateBufferIO_common(BufferDesc* buf, bool clear_dirty, uint32 set_flag_bits); void shared_buffer_write_error_callback(void* arg); static BufferDesc* BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, BlockNumber blockNum, @@ -655,15 +654,7 @@ static volatile BufferDesc *PageListBufferAlloc(SMgrRelation smgr, char relpersi old_hash = BufTableHashCode(&old_tag); old_partition_lock = BufMappingPartitionLock(old_hash); /* Must lock the lower-numbered partition first to avoid deadlocks. */ - if (old_partition_lock < new_partition_lock) { - (void)LWLockAcquire(old_partition_lock, LW_EXCLUSIVE); - (void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE); - } else if (old_partition_lock > new_partition_lock) { - (void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE); - (void)LWLockAcquire(old_partition_lock, LW_EXCLUSIVE); - } else { - (void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE); - } + LockTwoLWLock(new_partition_lock, old_partition_lock); } else { /* if it wasn't valid, we need only the new partition */ (void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE); @@ -2379,7 +2370,7 @@ void SimpleMarkBufDirty(BufferDesc *buf) } -static void PageCheckIfCanEliminate(BufferDesc *buf, uint32 *oldFlags, bool *needGetLock) +void PageCheckIfCanEliminate(BufferDesc *buf, uint32 *oldFlags, bool *needGetLock) { Block tmpBlock = BufHdrGetBlock(buf); @@ -2403,7 +2394,7 @@ static void PageCheckIfCanEliminate(BufferDesc *buf, uint32 *oldFlags, bool *nee } #ifdef USE_ASSERT_CHECKING -static void PageCheckWhenChosedElimination(const BufferDesc *buf, uint32 oldFlags) +void PageCheckWhenChosedElimination(const BufferDesc *buf, uint32 oldFlags) { if ((oldFlags & BM_TAG_VALID) && RecoveryInProgress()) { if (!XLByteEQ(buf->lsn_dirty, InvalidXLogRecPtr)) { @@ -2436,6 +2427,10 @@ static void PageCheckWhenChosedElimination(const BufferDesc *buf, uint32 oldFlag static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber fork_num, BlockNumber block_num, BufferAccessStrategy strategy, bool *found, const XLogPhyBlock *pblk) { + if (g_instance.attr.attr_storage.nvm_attr.enable_nvm) { + return NvmBufferAlloc(smgr, relpersistence, fork_num, block_num, strategy, found, pblk); + } + Assert(!IsSegmentPhysicalRelNode(smgr->smgr_rnode.node)); BufferTag new_tag; /* identity of requested block */ @@ -2636,16 +2631,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe * Must lock the lower-numbered partition first to avoid * deadlocks. */ - if (old_partition_lock < new_partition_lock) { - (void)LWLockAcquire(old_partition_lock, LW_EXCLUSIVE); - (void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE); - } else if (old_partition_lock > new_partition_lock) { - (void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE); - (void)LWLockAcquire(old_partition_lock, LW_EXCLUSIVE); - } else { - /* only one partition, only one lock */ - (void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE); - } + LockTwoLWLock(new_partition_lock, old_partition_lock); } else { /* if it wasn't valid, we need only the new partition */ (void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE); @@ -4652,7 +4638,7 @@ XLogRecPtr BufferGetLSNAtomic(Buffer buffer) void DropSegRelNodeSharedBuffer(RelFileNode node, ForkNumber forkNum) { - for (int i = 0; i < g_instance.attr.attr_storage.NBuffers; i++) { + for (int i = 0; i < SegmentBufferStartID; i++) { BufferDesc *buf_desc = GetBufferDescriptor(i); uint32 buf_state; @@ -4697,7 +4683,7 @@ void DropSegRelNodeSharedBuffer(RelFileNode node, ForkNumber forkNum) void RangeForgetBuffer(RelFileNode node, ForkNumber forkNum, BlockNumber firstDelBlock, BlockNumber endDelBlock) { - for (int i = 0; i < g_instance.attr.attr_storage.NBuffers; i++) { + for (int i = 0; i < SegmentBufferStartID; i++) { BufferDesc *buf_desc = GetBufferDescriptor(i); uint32 buf_state; @@ -4717,7 +4703,7 @@ void DropRelFileNodeShareBuffers(RelFileNode node, ForkNumber forkNum, BlockNumb { int i; - for (i = 0; i < g_instance.attr.attr_storage.NBuffers; i++) { + for (i = 0; i < SegmentBufferStartID; i++) { BufferDesc *buf_desc = GetBufferDescriptor(i); uint32 buf_state; /* @@ -4818,7 +4804,7 @@ void DropRelFileNodeAllBuffersUsingHash(HTAB *relfilenode_hashtbl) { int i; - for (i = 0; i < g_instance.attr.attr_storage.NBuffers; i++) { + for (i = 0; i < SegmentBufferStartID; i++) { BufferDesc *buf_desc = GetBufferDescriptor(i); uint32 buf_state; bool found = false; @@ -4870,7 +4856,7 @@ void DropRelFileNodeAllBuffersUsingHash(HTAB *relfilenode_hashtbl) void DropRelFileNodeOneForkAllBuffersUsingHash(HTAB *relfilenode_hashtbl) { int i; - for (i = 0; i < g_instance.attr.attr_storage.NBuffers; i++) { + for (i = 0; i < SegmentBufferStartID; i++) { BufferDesc *buf_desc = GetBufferDescriptor(i); uint32 buf_state; bool found = false; @@ -5006,9 +4992,9 @@ void DropRelFileNodeAllBuffersUsingScan(RelFileNode *rnodes, int rnode_len) pg_qsort(rnodes, (size_t)rnode_len, sizeof(RelFileNode), compare_rnode_func); - Assert((g_instance.attr.attr_storage.NBuffers % 4) == 0); + Assert((SegmentBufferStartID % 4) == 0); int i; - for (i = 0; i < g_instance.attr.attr_storage.NBuffers; i += 4) { + for (i = 0; i < SegmentBufferStartID; i += 4) { ScanCompareAndInvalidateBuffer(rnodes, rnode_len, i); ScanCompareAndInvalidateBuffer(rnodes, rnode_len, i + 1); @@ -5145,7 +5131,7 @@ void flush_all_buffers(Relation rel, Oid db_id, HTAB *hashtbl) // @Temp Table. no relation use local buffer. Temp table now use shared buffer. /* Make sure we can handle the pin inside the loop */ ResourceOwnerEnlargeBuffers(t_thrd.utils_cxt.CurrentResourceOwner); - for (i = 0; i < NORMAL_SHARED_BUFFER_NUM; i++) { + for (i = 0; i < SegmentBufferStartID; i++) { buf_desc = GetBufferDescriptor(i); /* * As in DropRelFileNodeBuffers, an unlocked precheck should be safe @@ -5853,7 +5839,7 @@ bool IsBufferCleanupOK(Buffer buffer) /* * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared. */ -static void WaitIO(BufferDesc *buf) +void WaitIO(BufferDesc *buf) { /* * Changed to wait until there's no IO - Inoue 01/13/2000 @@ -6297,7 +6283,7 @@ bool retryLockBufHdr(BufferDesc *desc, uint32 *buf_state) * Obviously the buffer could be locked by the time the value is returned, so * this is primarily useful in CAS style loops. */ -static uint32 WaitBufHdrUnlocked(BufferDesc *buf) +uint32 WaitBufHdrUnlocked(BufferDesc *buf) { #ifndef ENABLE_THREAD_CHECK SpinDelayStatus delay_status = init_spin_delay(buf); @@ -6831,3 +6817,16 @@ void PartitionInsertTdeInfoToCache(Relation reln, Partition p) } } +void LockTwoLWLock(LWLock *new_partition_lock, LWLock *old_partition_lock) +{ + if (old_partition_lock < new_partition_lock) { + (void)LWLockAcquire(old_partition_lock, LW_EXCLUSIVE); + (void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE); + } else if (old_partition_lock > new_partition_lock) { + (void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE); + (void)LWLockAcquire(old_partition_lock, LW_EXCLUSIVE); + } else { + /* only one partition, only one lock */ + (void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE); + } +} \ No newline at end of file diff --git a/src/gausskernel/storage/buffer/freelist.cpp b/src/gausskernel/storage/buffer/freelist.cpp index af45abcab..ef6d48c48 100644 --- a/src/gausskernel/storage/buffer/freelist.cpp +++ b/src/gausskernel/storage/buffer/freelist.cpp @@ -608,7 +608,7 @@ RETRY: if (pg_atomic_read_u32(&buf->state) & (BM_DIRTY | BM_IS_META)) { if (retry_times < Min(MAX_RETRY_RING_TIMES, strategy->ring_size * MAX_RETRY_RING_PCT)) { goto RETRY; - } else if (get_curr_candidate_nums(false) >= (uint32)g_instance.attr.attr_storage.NBuffers * + } else if (get_curr_candidate_nums(CAND_LIST_NORMAL) >= (uint32)g_instance.attr.attr_storage.NBuffers * u_sess->attr.attr_storage.candidate_buf_percent_target){ strategy->current_was_in_ring = false; return NULL; @@ -731,7 +731,7 @@ static BufferDesc* get_buf_from_candidate_list(BufferAccessStrategy strategy, ui /* the pagewriter sub thread store normal buffer pool, sub thread starts from 1 */ int thread_id = (list_id + i) % list_num + 1; Assert(thread_id > 0 && thread_id <= list_num); - while (candidate_buf_pop(&buf_id, thread_id)) { + while (candidate_buf_pop(&g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id].normal_list, &buf_id)) { Assert(buf_id < SegmentBufferStartID); buf = GetBufferDescriptor(buf_id); local_buf_state = LockBufHdr(buf); diff --git a/src/gausskernel/storage/nvm/CMakeLists.txt b/src/gausskernel/storage/nvm/CMakeLists.txt new file mode 100755 index 000000000..5048ddcfb --- /dev/null +++ b/src/gausskernel/storage/nvm/CMakeLists.txt @@ -0,0 +1,21 @@ +#This is the main CMAKE for build bin. +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} TGT_nvm_SRC) + +set(TGT_nvm_INC + ${PROJECT_OPENGS_DIR}/contrib/log_fdw + ${PROJECT_TRUNK_DIR}/distribute/bin/gds + ${PROJECT_SRC_DIR}/include/libcomm + ${PROJECT_SRC_DIR}/include + ${PROJECT_SRC_DIR}/lib/gstrace + ${LZ4_INCLUDE_PATH} + ${LIBCGROUP_INCLUDE_PATH} + ${LIBORC_INCLUDE_PATH} + ${EVENT_INCLUDE_PATH} + ${PROTOBUF_INCLUDE_PATH} + ${ZLIB_INCLUDE_PATH} +) + +set(nvm_DEF_OPTIONS ${MACRO_OPTIONS}) +set(nvm_COMPILE_OPTIONS ${OPTIMIZE_OPTIONS} ${OS_OPTIONS} ${PROTECT_OPTIONS} ${WARNING_OPTIONS} ${BIN_SECURE_OPTIONS} ${CHECK_OPTIONS}) +set(nvm_LINK_OPTIONS ${BIN_LINK_OPTIONS}) +add_static_objtarget(gausskernel_storage_nvm TGT_nvm_SRC TGT_nvm_INC "${nvm_DEF_OPTIONS}" "${nvm_COMPILE_OPTIONS}" "${nvm_LINK_OPTIONS}") diff --git a/src/gausskernel/storage/nvm/Makefile b/src/gausskernel/storage/nvm/Makefile new file mode 100644 index 000000000..70cd0e600 --- /dev/null +++ b/src/gausskernel/storage/nvm/Makefile @@ -0,0 +1,14 @@ +subdir = src/gausskernel/storage/nvm +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +ifneq "$(MAKECMDGOALS)" "clean" + ifneq "$(MAKECMDGOALS)" "distclean" + ifneq "$(shell which g++ |grep hutaf_llt |wc -l)" "1" + -include $(DEPEND) + endif + endif +endif +OBJS = nvm.o nvmbuffer.o + +include $(top_srcdir)/src/gausskernel/common.mk diff --git a/src/gausskernel/storage/nvm/nvm.cpp b/src/gausskernel/storage/nvm/nvm.cpp new file mode 100644 index 000000000..f1ae0030f --- /dev/null +++ b/src/gausskernel/storage/nvm/nvm.cpp @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2020 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * --------------------------------------------------------------------------------------- + * + * nvm.cpp + * + * IDENTIFICATION + * src/gausskernel/storage/nvm/nvm.cpp + * + * --------------------------------------------------------------------------------------- + */ +#include "storage/nvm/nvm.h" + +#include "postgres.h" +#include "knl/knl_variable.h" +#include "storage/buf/bufmgr.h" + +void nvm_init(void) +{ + LWLockAcquire(ShmemIndexLock, LW_EXCLUSIVE); + + if (NVM_BUFFER_NUM == 0) { + LWLockRelease(ShmemIndexLock); + ereport(FATAL, (errmsg("nvm_buffers is not set.\n"))); + } + + if (g_instance.attr.attr_storage.nvm_attr.nvmBlocks == NULL) { + int nvmBufFd = open(g_instance.attr.attr_storage.nvm_attr.nvm_file_path, O_RDWR); + if (nvmBufFd < 0) { + LWLockRelease(ShmemIndexLock); + ereport(FATAL, (errmsg("can not open nvm file.\n"))); + } + size_t nvmBufferSize = (NVM_BUFFER_NUM) * (Size)BLCKSZ; + + g_instance.attr.attr_storage.nvm_attr.nvmBlocks = (char *)mmap(NULL, nvmBufferSize, + PROT_READ | PROT_WRITE, MAP_SHARED, nvmBufFd, 0); + if (g_instance.attr.attr_storage.nvm_attr.nvmBlocks == NULL) { + LWLockRelease(ShmemIndexLock); + ereport(FATAL, (errmsg("mmap nvm buffer failed.\n"))); + } + } + + t_thrd.storage_cxt.NvmBufferBlocks = g_instance.attr.attr_storage.nvm_attr.nvmBlocks; + LWLockRelease(ShmemIndexLock); +} diff --git a/src/gausskernel/storage/nvm/nvmbuffer.cpp b/src/gausskernel/storage/nvm/nvmbuffer.cpp new file mode 100644 index 000000000..a0519e0ce --- /dev/null +++ b/src/gausskernel/storage/nvm/nvmbuffer.cpp @@ -0,0 +1,881 @@ +/* + * Copyright (c) 2020 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + * + * nvmbuffer.cpp + * + * IDENTIFICATION + * src/gausskernel/storage/nvm/nvmbuffer.cpp + * + * ------------------------------------------------------------------------ + */ + +#include "postgres.h" +#include "utils/dynahash.h" +#include "access/double_write.h" +#include "knl/knl_variable.h" +#include "storage/buf/buf_internals.h" +#include "storage/buf/bufmgr.h" +#include "storage/smgr/smgr.h" +#include "storage/smgr/segment_internal.h" +#include "utils/resowner.h" +#include "pgstat.h" + +static BufferDesc *NvmStrategyGetBuffer(uint32 *buf_state); +extern PrivateRefCountEntry *GetPrivateRefCountEntry(Buffer buffer, bool create, bool do_move); + +static const int MILLISECOND_TO_MICROSECOND = 1000; + +static inline bool BypassNvm() +{ + return ((double)gs_random() / (double)MAX_RANDOM_VALUE) > g_instance.attr.attr_storage.nvm_attr.bypassNvm; +} + +static inline bool BypassDram() +{ + return ((double)gs_random() / (double)MAX_RANDOM_VALUE) > g_instance.attr.attr_storage.nvm_attr.bypassDram; +} + +static BufferLookupEnt* NvmBufTableLookup(BufferTag *tag, uint32 hashcode) +{ + return (BufferLookupEnt *)buf_hash_operate(t_thrd.storage_cxt.SharedBufHash, tag, hashcode, NULL); +} + +static bool NvmPinBuffer(BufferDesc *buf, bool *migrate) +{ + int b = BufferDescriptorGetBuffer(buf); + bool result = false; + uint32 buf_state; + uint32 old_buf_state; + + *migrate = false; + + old_buf_state = pg_atomic_read_u32(&buf->state); + for (;;) { + if (unlikely(old_buf_state & BM_IN_MIGRATE)) { + *migrate = true; + return result; + } else { + if (old_buf_state & BM_LOCKED) { + old_buf_state = WaitBufHdrUnlocked(buf); + } + + buf_state = old_buf_state; + + /* increase refcount */ + buf_state += BUF_REFCOUNT_ONE; + + /* increase usagecount unless already max */ + if (BUF_STATE_GET_USAGECOUNT(buf_state) != BM_MAX_USAGE_COUNT) { + buf_state += BUF_USAGECOUNT_ONE; + } + + if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, buf_state)) { + result = (buf_state & BM_VALID) != 0; + break; + } + } + } + + PrivateRefCountEntry *ref = GetPrivateRefCountEntry(b, true, true); + Assert(ref != NULL); + ref->refcount++; + Assert(ref->refcount > 0); + ResourceOwnerRememberBuffer(t_thrd.utils_cxt.CurrentResourceOwner, b); + return result; +} + +static bool NvmPinBufferFast(BufferDesc *buf) +{ + int b = BufferDescriptorGetBuffer(buf); + PrivateRefCountEntry *ref = NULL; + + /* When the secondly and thirdly parameter all both true, the ret value must not be NULL. */ + ref = GetPrivateRefCountEntry(b, false, false); + + if (ref == NULL) { + return false; + } + + ref->refcount++; + Assert(ref->refcount > 0); + ResourceOwnerRememberBuffer(t_thrd.utils_cxt.CurrentResourceOwner, b); + return true; +} + +/* + * We must set timeout here to avoid deadlock. If + * backend thread A has pinned nvm buf1 and want to migrate nvm buf2, + * backend thread B has pinned nvm buf2 and want to migrate nvm buf1, + * it will result in deadlock. + */ +static bool WaitUntilUnPin(BufferDesc *buf, int *waits) +{ + uint32 old_buf_state; + for (;;) { + old_buf_state = pg_atomic_read_u32(&buf->state); + if (BUF_STATE_GET_REFCOUNT(old_buf_state) == 1) { + return true; + } else { + pg_usleep(MILLISECOND_TO_MICROSECOND); + (*waits)++; + if ((*waits) >= 100) { + return false; + } + } + } + +} + +/* + * Wait until the BM_IN_MIGRATE flag isn't set anymore + */ +static void WaitBufHdrUnMigrate(BufferDesc *buf) +{ +#ifndef ENABLE_THREAD_CHECK + SpinDelayStatus delay_status = init_spin_delay(buf); +#endif + uint32 buf_state; + + buf_state = pg_atomic_read_u32(&buf->state); + + while (buf_state & BM_IN_MIGRATE) { +#ifndef ENABLE_THREAD_CHECK + perform_spin_delay(&delay_status); +#endif + buf_state = pg_atomic_read_u32(&buf->state); + } + +#ifndef ENABLE_THREAD_CHECK + finish_spin_delay(&delay_status); +#endif +} + +static bool SetBufferMigrateFlag(Buffer buffer) +{ + BufferDesc *buf = GetBufferDescriptor(buffer - 1); + uint32 bufState; + uint32 oldBufState; + for (;;) { + oldBufState = pg_atomic_read_u32(&buf->state); + + if (oldBufState & BM_LOCKED) { + oldBufState = WaitBufHdrUnlocked(buf); + } + + if (oldBufState & BM_IN_MIGRATE) { + return false; + } + + bufState = oldBufState; + bufState |= BM_IN_MIGRATE; + ereport(DEBUG1, (errmsg("mark buffer %d migrate buffer stat %u.", buffer, bufState))); + if (pg_atomic_compare_exchange_u32(&buf->state, &oldBufState, bufState)) { + return true; + } + } +} + +static void UnSetBufferMigrateFlag(Buffer buffer) +{ + BufferDesc *buf = GetBufferDescriptor(buffer - 1); + uint32 bufState; + uint32 oldBufState; + for (;;) { + oldBufState = pg_atomic_read_u32(&buf->state); + if (oldBufState & BM_LOCKED) { + oldBufState = WaitBufHdrUnlocked(buf); + } + bufState = oldBufState; + bufState &= ~(BM_IN_MIGRATE); + ereport(DEBUG1, (errmsg("unmark buffer %d migrate buffer stat %u.", buffer, bufState))); + if (pg_atomic_compare_exchange_u32(&buf->state, &oldBufState, bufState)) { + break; + } + } +} + +static void NvmWaitBufferIO(BufferDesc *buf) +{ + uint32 buf_state; + + Assert(!t_thrd.storage_cxt.InProgressBuf); + + /* To check the InProgressBuf must be NULL. */ + if (t_thrd.storage_cxt.InProgressBuf) { + ereport(PANIC, (errmsg("InProgressBuf not null: id %d flags %u, buf: id %d flags %u", + t_thrd.storage_cxt.InProgressBuf->buf_id, + pg_atomic_read_u32(&t_thrd.storage_cxt.InProgressBuf->state) & BUF_FLAG_MASK, + buf->buf_id, pg_atomic_read_u32(&buf->state) & BUF_FLAG_MASK))); + } + + bool ioDone = false; +restart: + for (;;) { + (void)LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE); + + buf_state = LockBufHdr(buf); + if (buf_state & BM_IO_IN_PROGRESS) { + ioDone = true; + UnlockBufHdr(buf, buf_state); + LWLockRelease(buf->io_in_progress_lock); + WaitIO(buf); + } else { + break; + } + } + if (buf_state & BM_VALID) { + /* someone else already did the I/O */ + UnlockBufHdr(buf, buf_state); + LWLockRelease(buf->io_in_progress_lock); + return; + } else { + if (!ioDone) { + UnlockBufHdr(buf, buf_state); + LWLockRelease(buf->io_in_progress_lock); + goto restart; + } else { + ereport(PANIC, (errmsg("ioDone is true but buf_state is not valid "))); + } + } + return; +} + +BufferDesc *NvmBufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber fork_num, + BlockNumber block_num, BufferAccessStrategy strategy, bool *found, const XLogPhyBlock *pblk) +{ + Assert(!IsSegmentPhysicalRelNode(smgr->smgr_rnode.node)); + + BufferTag new_tag; /* identity of requested block */ + uint32 new_hash; /* hash value for newTag */ + LWLock *new_partition_lock = NULL; /* buffer partition lock for it */ + BufferTag old_tag; /* previous identity of selected buffer */ + uint32 old_hash; /* hash value for oldTag */ + LWLock *old_partition_lock = NULL; /* buffer partition lock for it */ + uint32 old_flags; + int buf_id; + BufferDesc *buf = NULL; + BufferDesc *nvmBuf = NULL; + bool valid = false; + uint32 buf_state, nvm_buf_state; + bool migrate = false; + errno_t rc; + + /* create a tag so we can lookup the buffer */ + INIT_BUFFERTAG(new_tag, smgr->smgr_rnode.node, fork_num, block_num); + + /* determine its hash code and partition lock ID */ + new_hash = BufTableHashCode(&new_tag); + new_partition_lock = BufMappingPartitionLock(new_hash); + +restart: + *found = FALSE; + /* see if the block is in the buffer pool already */ + (void)LWLockAcquire(new_partition_lock, LW_SHARED); + pgstat_report_waitevent(WAIT_EVENT_BUF_HASH_SEARCH); + BufferLookupEnt *entry = NvmBufTableLookup(&new_tag, new_hash); + pgstat_report_waitevent(WAIT_EVENT_END); + + if (entry != NULL) { + *found = TRUE; + /* + * Found it. Now, pin the buffer so no one can steal it from the + * buffer pool, and check to see if the correct data has been loaded + * into the buffer. + */ + + buf_id = pg_atomic_read_u32((volatile uint32*)&entry->id); + + if (IsNormalBufferID(buf_id)) { + buf = GetBufferDescriptor(buf_id); + + valid = PinBuffer(buf, strategy); + + /* Can release the mapping lock as soon as we've pinned it */ + LWLockRelease(new_partition_lock); + + if (!valid) { + /* + * We can only get here if (a) someone else is still reading in + * the page, or (b) a previous read attempt failed. We have to + * wait for any active read attempt to finish, and then set up our + * own read attempt if the page is still not BM_VALID. + * StartBufferIO does it all. + */ + if (StartBufferIO(buf, true)) { + /* + * If we get here, previous attempts to read the buffer must + * have failed ... but we shall bravely try again. + */ + *found = FALSE; + } + } + + return buf; + } + + if (IsNvmBufferID(buf_id)) { + nvmBuf = GetBufferDescriptor(buf_id); + + /* Return buffer immediately if I have pinned the buffer before */ + if (NvmPinBufferFast(nvmBuf)) { + LWLockRelease(new_partition_lock); + return nvmBuf; + } + + /* Haven't pinned the buffer ever */ + if (BypassDram()) { + /* want to return nvm buffer directly */ + valid = NvmPinBuffer(nvmBuf, &migrate); + + if (migrate) { + LWLockRelease(new_partition_lock); + WaitBufHdrUnMigrate(nvmBuf); + goto restart; + } else { + buf_id = pg_atomic_read_u32((volatile uint32 *)&entry->id); + if (IsNormalBufferID(buf_id)) { + UnpinBuffer(nvmBuf, true); + buf = GetBufferDescriptor(buf_id); + valid = PinBuffer(buf, strategy); + LWLockRelease(new_partition_lock); + Assert(valid); + return buf; + } + /* Can release the mapping lock as soon as we've pinned it */ + LWLockRelease(new_partition_lock); + Assert(nvmBuf->buf_id == buf_id); + if (!valid) { + if (StartBufferIO(nvmBuf, true)) { + *found = FALSE; + } + } + + return nvmBuf; + } + } else { + // wanna migrate + if (SetBufferMigrateFlag(buf_id + 1)) { + buf_id = pg_atomic_read_u32((volatile uint32 *)&entry->id); + + if (IsNormalBufferID(buf_id)) { + UnSetBufferMigrateFlag(nvmBuf->buf_id + 1); + buf = GetBufferDescriptor(buf_id); + valid = PinBuffer(buf, strategy); + LWLockRelease(new_partition_lock); + Assert(valid); + return buf; + } + Assert(nvmBuf->buf_id == buf_id); + + valid = PinBuffer(nvmBuf, strategy); + if (!valid) { + /* corner case: the migration thread can not do I/O */ + NvmWaitBufferIO(nvmBuf); + } + + LWLockRelease(new_partition_lock); + + int waits = 0; + if (!WaitUntilUnPin(nvmBuf, &waits)) { + UnSetBufferMigrateFlag(buf_id + 1); + return nvmBuf; + } + + nvm_buf_state = LockBufHdr(nvmBuf); + if (nvm_buf_state & BM_DIRTY) { + UnlockBufHdr(nvmBuf, nvm_buf_state); + UnSetBufferMigrateFlag(buf_id + 1); + return nvmBuf; + } else { + for (;;) { + buf = (BufferDesc *)StrategyGetBuffer(strategy, &buf_state); + + old_flags = buf_state & BUF_FLAG_MASK; + + if ((old_flags & BM_DIRTY) || (old_flags & BM_IS_META)) { + UnlockBufHdr(buf, buf_state); + (void)sched_yield(); + continue; + } + + if (old_flags & BM_TAG_VALID) { + PinBuffer_Locked(buf); + + old_tag = ((BufferDesc *)buf)->tag; + old_hash = BufTableHashCode(&old_tag); + old_partition_lock = BufMappingPartitionLock(old_hash); + + if (old_partition_lock == new_partition_lock) { + UnpinBuffer(buf, true); + (void)sched_yield(); + continue; + } + + (void)LWLockAcquire(old_partition_lock, LW_EXCLUSIVE); + + buf_state = LockBufHdr(buf); + old_flags = buf_state & BUF_FLAG_MASK; + if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(old_flags & BM_DIRTY) && + !(old_flags & BM_IS_META)) { + /* todo DMS */ + buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | + BM_IO_ERROR | BM_PERMANENT | BUF_USAGECOUNT_MASK); + if (relpersistence == RELPERSISTENCE_PERMANENT || fork_num == INIT_FORKNUM || + ((relpersistence == RELPERSISTENCE_TEMP) && STMT_RETRY_ENABLED)) { + buf_state |= BM_VALID | BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE; + } else { + buf_state |= BM_VALID | BM_TAG_VALID | BUF_USAGECOUNT_ONE; + } + + BufTableDelete(&old_tag, old_hash); + + /* immediately after deleteing old_hash to avoid ABA Problem */ + LWLockRelease(old_partition_lock); + + rc = memcpy_s(BufferGetBlock(buf->buf_id + 1), BLCKSZ, + BufferGetBlock(nvmBuf->buf_id + 1), BLCKSZ); + securec_check(rc, "\0", "\0"); + buf->tag = nvmBuf->tag; + buf->seg_fileno = nvmBuf->seg_fileno; + buf->seg_blockno = nvmBuf->seg_blockno; + buf->lsn_on_disk = nvmBuf->lsn_on_disk; + + /* Assert nvmBuf is not dirty \ cas without buffer header lock */ + nvm_buf_state &= ~(BM_TAG_VALID); + UnlockBufHdr(buf, buf_state); + UnlockBufHdr(nvmBuf, nvm_buf_state); + + UnpinBuffer(nvmBuf, true); + + pg_atomic_write_u32((volatile uint32 *)&entry->id, buf->buf_id); + UnSetBufferMigrateFlag(nvmBuf->buf_id + 1); + return buf; + } + + UnlockBufHdr(buf, buf_state); + LWLockRelease(old_partition_lock); + UnpinBuffer(buf, true); + } else { + /* this buf is not in hashtable */ + buf->state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | + BM_IO_ERROR | BM_PERMANENT | BUF_USAGECOUNT_MASK); + if (relpersistence == RELPERSISTENCE_PERMANENT || fork_num == INIT_FORKNUM || + ((relpersistence == RELPERSISTENCE_TEMP) && STMT_RETRY_ENABLED)) { + buf->state |= BM_VALID | BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE; + } else { + buf->state |= BM_VALID | BM_TAG_VALID | BUF_USAGECOUNT_ONE; + } + + PinBuffer_Locked(buf); + + rc = memcpy_s(BufferGetBlock(buf->buf_id + 1), BLCKSZ, BufferGetBlock(nvmBuf->buf_id + 1), + BLCKSZ); + securec_check(rc, "\0", "\0"); + buf->tag = nvmBuf->tag; + buf->seg_fileno = nvmBuf->seg_fileno; + buf->seg_blockno = nvmBuf->seg_blockno; + buf->lsn_on_disk = nvmBuf->lsn_on_disk; + + // Assert nvmBuf is not dirty + nvm_buf_state &= ~(BM_TAG_VALID); + + UnlockBufHdr(nvmBuf, nvm_buf_state); + + UnpinBuffer(nvmBuf, true); + + pg_atomic_write_u32((volatile uint32 *)&entry->id, buf->buf_id); + UnSetBufferMigrateFlag(nvmBuf->buf_id + 1); + return buf; + } + } + } + } else { + LWLockRelease(new_partition_lock); + goto restart; + } + } + } else { + Assert(false); + } + } + + /* + * Didn't find it in the buffer pool. We'll have to initialize a new + * buffer. Remember to unlock the mapping lock while doing the work. + */ + LWLockRelease(new_partition_lock); + /* Loop here in case we have to try another victim buffer */ + for (;;) { + bool needGetLock = false; + /* + * Select a victim buffer. The buffer is returned with its header + * spinlock still held! + */ + pgstat_report_waitevent(WAIT_EVENT_BUF_STRATEGY_GET); + if (BypassNvm()) { + buf = (BufferDesc *)StrategyGetBuffer(strategy, &buf_state); + } else { + buf = (BufferDesc *)NvmStrategyGetBuffer(&buf_state); + } + pgstat_report_waitevent(WAIT_EVENT_END); + + Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0); + + /* Must copy buffer flags while we still hold the spinlock */ + old_flags = buf_state & BUF_FLAG_MASK; + + /* Pin the buffer and then release the buffer spinlock */ + PinBuffer_Locked(buf); + + PageCheckIfCanEliminate(buf, &old_flags, &needGetLock); + /* + * If the buffer was dirty, try to write it out. There is a race + * condition here, in that someone might dirty it after we released it + * above, or even while we are writing it out (since our share-lock + * won't prevent hint-bit updates). We will recheck the dirty bit + * after re-locking the buffer header. + */ + if (old_flags & BM_DIRTY) { + /* backend should not flush dirty pages if working version less than DW_SUPPORT_NEW_SINGLE_FLUSH */ + if (!backend_can_flush_dirty_page()) { + UnpinBuffer(buf, true); + (void)sched_yield(); + continue; + } + + /* + * We need a share-lock on the buffer contents to write it out + * (else we might write invalid data, eg because someone else is + * compacting the page contents while we write). We must use a + * conditional lock acquisition here to avoid deadlock. Even + * though the buffer was not pinned (and therefore surely not + * locked) when StrategyGetBuffer returned it, someone else could + * have pinned and exclusive-locked it by the time we get here. If + * we try to get the lock unconditionally, we'd block waiting for + * them; if they later block waiting for us, deadlock ensues. + * (This has been observed to happen when two backends are both + * trying to split btree index pages, and the second one just + * happens to be trying to split the page the first one got from + * StrategyGetBuffer.) + */ + bool needDoFlush = false; + if (!needGetLock) { + needDoFlush = LWLockConditionalAcquire(buf->content_lock, LW_SHARED); + } else { + LWLockAcquire(buf->content_lock, LW_SHARED); + needDoFlush = true; + } + if (needDoFlush) { + /* + * If using a nondefault strategy, and writing the buffer + * would require a WAL flush, let the strategy decide whether + * to go ahead and write/reuse the buffer or to choose another + * victim. We need lock to inspect the page LSN, so this + * can't be done inside StrategyGetBuffer. + */ + if (strategy != NULL) { + XLogRecPtr lsn; + + /* Read the LSN while holding buffer header lock */ + buf_state = LockBufHdr(buf); + lsn = BufferGetLSN(buf); + UnlockBufHdr(buf, buf_state); + + if (XLogNeedsFlush(lsn) && StrategyRejectBuffer(strategy, buf)) { + /* Drop lock/pin and loop around for another buffer */ + LWLockRelease(buf->content_lock); + UnpinBuffer(buf, true); + continue; + } + } + + /* during initdb, not need flush dw file */ + if (dw_enabled() && pg_atomic_read_u32(&g_instance.ckpt_cxt_ctl->current_page_writer_count) > 0) { + if (!free_space_enough(buf->buf_id)) { + LWLockRelease(buf->content_lock); + UnpinBuffer(buf, true); + continue; + } + uint32 pos = 0; + pos = first_version_dw_single_flush(buf); + t_thrd.proc->dw_pos = pos; + FlushBuffer(buf, NULL); + g_instance.dw_single_cxt.single_flush_state[pos] = true; + t_thrd.proc->dw_pos = -1; + } else { + FlushBuffer(buf, NULL); + } + LWLockRelease(buf->content_lock); + + ScheduleBufferTagForWriteback(t_thrd.storage_cxt.BackendWritebackContext, &buf->tag); + } else { + /* + * Someone else has locked the buffer, so give it up and loop + * back to get another one. + */ + UnpinBuffer(buf, true); + continue; + } + } + + /* + * To change the association of a valid buffer, we'll need to have + * exclusive lock on both the old and new mapping partitions. + */ + if (old_flags & BM_TAG_VALID) { + /* + * Need to compute the old tag's hashcode and partition lock ID. + * XXX is it worth storing the hashcode in BufferDesc so we need + * not recompute it here? Probably not. + */ + old_tag = ((BufferDesc *)buf)->tag; + old_hash = BufTableHashCode(&old_tag); + old_partition_lock = BufMappingPartitionLock(old_hash); + /* + * Must lock the lower-numbered partition first to avoid + * deadlocks. + */ + LockTwoLWLock(new_partition_lock, old_partition_lock); + } else { + /* if it wasn't valid, we need only the new partition */ + (void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE); + /* these just keep the compiler quiet about uninit variables */ + old_hash = 0; + old_partition_lock = NULL; + } + + /* + * Try to make a hashtable entry for the buffer under its new tag. + * This could fail because while we were writing someone else + * allocated another buffer for the same block we want to read in. + * Note that we have not yet removed the hashtable entry for the old + * tag. + */ + buf_id = BufTableInsert(&new_tag, new_hash, buf->buf_id); + if (buf_id >= 0) { + /* + * Got a collision. Someone has already done what we were about to + * do. We'll just handle this as if it were found in the buffer + * pool in the first place. First, give up the buffer we were + * planning to use. + */ + UnpinBuffer(buf, true); + + /* Can give up that buffer's mapping partition lock now */ + if ((old_flags & BM_TAG_VALID) && old_partition_lock != new_partition_lock) + LWLockRelease(old_partition_lock); + + /* remaining code should match code at top of routine */ + buf = GetBufferDescriptor(buf_id); + + valid = PinBuffer(buf, strategy); + + /* Can release the mapping lock as soon as we've pinned it */ + LWLockRelease(new_partition_lock); + + *found = TRUE; + + if (!valid) { + /* + * We can only get here if (a) someone else is still reading + * in the page, or (b) a previous read attempt failed. We + * have to wait for any active read attempt to finish, and + * then set up our own read attempt if the page is still not + * BM_VALID. StartBufferIO does it all. + */ + if (StartBufferIO(buf, true)) { + /* + * If we get here, previous attempts to read the buffer + * must have failed ... but we shall bravely try again. + */ + *found = FALSE; + } + } + + return buf; + } + + /* + * Need to lock the buffer header too in order to change its tag. + */ + buf_state = LockBufHdr(buf); + + /* + * Somebody could have pinned or re-dirtied the buffer while we were + * doing the I/O and making the new hashtable entry. If so, we can't + * recycle this buffer; we must undo everything we've done and start + * over with a new victim buffer. + */ + old_flags = buf_state & BUF_FLAG_MASK; + if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(old_flags & BM_DIRTY) + && !(old_flags & BM_IS_META)) { + break; + } + + UnlockBufHdr(buf, buf_state); + BufTableDelete(&new_tag, new_hash); + if ((old_flags & BM_TAG_VALID) && old_partition_lock != new_partition_lock) { + LWLockRelease(old_partition_lock); + } + LWLockRelease(new_partition_lock); + UnpinBuffer(buf, true); + } + +#ifdef USE_ASSERT_CHECKING + PageCheckWhenChosedElimination(buf, old_flags); +#endif + + /* + * Okay, it's finally safe to rename the buffer. + * + * Clearing BM_VALID here is necessary, clearing the dirtybits is just + * paranoia. We also reset the usage_count since any recency of use of + * the old content is no longer relevant. (The usage_count starts out at + * 1 so that the buffer can survive one clock-sweep pass.) + * + * Make sure BM_PERMANENT is set for buffers that must be written at every + * checkpoint. Unlogged buffers only need to be written at shutdown + * checkpoints, except for their "init" forks, which need to be treated + * just like permanent relations. + */ + ((BufferDesc *)buf)->tag = new_tag; + buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT | + BUF_USAGECOUNT_MASK); + if (relpersistence == RELPERSISTENCE_PERMANENT || fork_num == INIT_FORKNUM || + ((relpersistence == RELPERSISTENCE_TEMP) && STMT_RETRY_ENABLED)) { + buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE; + } else { + buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; + } + + UnlockBufHdr(buf, buf_state); + + if (old_flags & BM_TAG_VALID) { + BufTableDelete(&old_tag, old_hash); + if (old_partition_lock != new_partition_lock) { + LWLockRelease(old_partition_lock); + } + } + + /* set Physical segment file. */ + if (pblk != NULL) { + Assert(PhyBlockIsValid(*pblk)); + buf->seg_fileno = pblk->relNode; + buf->seg_blockno = pblk->block; + } else { + buf->seg_fileno = EXTENT_INVALID; + buf->seg_blockno = InvalidBlockNumber; + } + LWLockRelease(new_partition_lock); + + /* + * Buffer contents are currently invalid. Try to get the io_in_progress + * lock. If StartBufferIO returns false, then someone else managed to + * read it before we did, so there's nothing left for BufferAlloc() to do. + */ + if (StartBufferIO(buf, true)) { + *found = FALSE; + } else { + *found = TRUE; + } + + return buf; +} + +static uint32 next_victim_buffer = 0; + +static inline uint32 NvmClockSweepTick(void) +{ + uint32 victim = pg_atomic_fetch_add_u32(&next_victim_buffer, 1); + if (victim >= (uint32)NVM_BUFFER_NUM) { + victim = victim % NVM_BUFFER_NUM; + } + return victim; +} + +static BufferDesc* get_nvm_buf_from_candidate_list(uint32* buf_state) +{ + BufferDesc* buf = NULL; + uint32 local_buf_state; + int buf_id = 0; + int list_num = g_instance.ckpt_cxt_ctl->pgwr_procs.sub_num; + volatile PgBackendStatus* beentry = t_thrd.shemem_ptr_cxt.MyBEEntry; + int list_id = beentry->st_tid > 0 ? (beentry->st_tid % list_num) : (beentry->st_sessionid % list_num); + + for (int i = 0; i < list_num; i++) { + /* the pagewriter sub thread store normal buffer pool, sub thread starts from 1 */ + int thread_id = (list_id + i) % list_num + 1; + Assert(thread_id > 0 && thread_id <= list_num); + while (candidate_buf_pop(&g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id].nvm_list, &buf_id)) { + Assert(buf_id >= NvmBufferStartID && buf_id < SegmentBufferStartID); + buf = GetBufferDescriptor(buf_id); + local_buf_state = LockBufHdr(buf); + + if (g_instance.ckpt_cxt_ctl->candidate_free_map[buf_id]) { + g_instance.ckpt_cxt_ctl->candidate_free_map[buf_id] = false; + bool available_buffer = BUF_STATE_GET_REFCOUNT(local_buf_state) == 0 + && !(local_buf_state & BM_IS_META) + && !(local_buf_state & BM_DIRTY); + if (available_buffer) { + *buf_state = local_buf_state; + return buf; + } + } + UnlockBufHdr(buf, local_buf_state); + } + } + + wakeup_pagewriter_thread(); + return NULL; +} + +const int RETRY_COUNT = 3; +static BufferDesc *NvmStrategyGetBuffer(uint32* buf_state) +{ + BufferDesc *buf = NULL; + int try_counter = NVM_BUFFER_NUM * RETRY_COUNT; + uint32 local_buf_state = 0; /* to avoid repeated (de-)referencing */ + + /* Check the Candidate list */ + if (ENABLE_INCRE_CKPT && pg_atomic_read_u32(&g_instance.ckpt_cxt_ctl->current_page_writer_count) > 1) { + buf = get_nvm_buf_from_candidate_list(buf_state); + if (buf != NULL) { + (void)pg_atomic_fetch_add_u64(&g_instance.ckpt_cxt_ctl->nvm_get_buf_num_candidate_list, 1); + return buf; + } + } + + for (;;) { + int buf_id = BufferIdOfNvmBuffer(NvmClockSweepTick()); + buf = GetBufferDescriptor(buf_id); + + local_buf_state = LockBufHdr(buf); + + if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0 && !(local_buf_state & BM_IS_META) && + (backend_can_flush_dirty_page() || !(local_buf_state & BM_DIRTY))) { + *buf_state = local_buf_state; + (void)pg_atomic_fetch_add_u64(&g_instance.ckpt_cxt_ctl->nvm_get_buf_num_clock_sweep, 1); + return buf; + } else if (--try_counter == 0) { + UnlockBufHdr(buf, local_buf_state); + ereport(ERROR, (errcode(ERRCODE_INVALID_BUFFER), (errmsg("no unpinned buffers available")))); + } + UnlockBufHdr(buf, local_buf_state); + } + + /* not reached */ + return NULL; +} + diff --git a/src/gausskernel/storage/smgr/segment/segbuffer.cpp b/src/gausskernel/storage/smgr/segment/segbuffer.cpp index df84a6ba0..6abe5e4e4 100644 --- a/src/gausskernel/storage/smgr/segment/segbuffer.cpp +++ b/src/gausskernel/storage/smgr/segment/segbuffer.cpp @@ -31,7 +31,6 @@ #include "storage/smgr/segment.h" #include "storage/smgr/smgr.h" #include "utils/resowner.h" -#include "tsan_annotation.h" #include "pgstat.h" /* @@ -63,23 +62,6 @@ void AbortSegBufferIO(void) } } -static void WaitIO(BufferDesc *buf) -{ - while (true) { - uint32 buf_state; - - buf_state = LockBufHdr(buf); - UnlockBufHdr(buf, buf_state); - - if (!(buf_state & BM_IO_IN_PROGRESS)) { - break; - } - - LWLockAcquire(buf->io_in_progress_lock, LW_SHARED); - LWLockRelease(buf->io_in_progress_lock); - } -} - static bool SegStartBufferIO(BufferDesc *buf, bool forInput) { uint32 buf_state; @@ -157,32 +139,6 @@ static void SegTerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_f LWLockRelease(buf->io_in_progress_lock); } -static uint32 SegWaitBufHdrUnlocked(BufferDesc *buf) -{ -#ifndef ENABLE_THREAD_CHECK - SpinDelayStatus delayStatus = init_spin_delay(buf); -#endif - uint32 buf_state; - - buf_state = pg_atomic_read_u32(&buf->state); - - while (buf_state & BM_LOCKED) { -#ifndef ENABLE_THREAD_CHECK - perform_spin_delay(&delayStatus); -#endif - buf_state = pg_atomic_read_u32(&buf->state); - } - -#ifndef ENABLE_THREAD_CHECK - finish_spin_delay(&delayStatus); -#endif - - /* ENABLE_THREAD_CHECK only, acqurie semantic */ - TsAnnotateHappensAfter(&buf->state); - - return buf_state; -} - bool SegPinBuffer(BufferDesc *buf) { ereport(DEBUG5, (errmodule(MOD_SEGMENT_PAGE), @@ -198,7 +154,7 @@ bool SegPinBuffer(BufferDesc *buf) for (;;) { if (old_buf_state & BM_LOCKED) { - old_buf_state = SegWaitBufHdrUnlocked(buf); + old_buf_state = WaitBufHdrUnlocked(buf); } buf_state = old_buf_state; @@ -263,7 +219,7 @@ void SegUnpinBuffer(BufferDesc *buf) old_buf_state = pg_atomic_read_u32(&buf->state); for (;;) { if (old_buf_state & BM_LOCKED) { - old_buf_state = SegWaitBufHdrUnlocked(buf); + old_buf_state = WaitBufHdrUnlocked(buf); } buf_state = old_buf_state; @@ -459,20 +415,6 @@ Buffer ReadBufferFast(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, Bloc return BufferDescriptorGetBuffer(bufHdr); } -void LockTwoLWLock(LWLock *new_partition_lock, LWLock *old_partition_lock) -{ - if (old_partition_lock < new_partition_lock) { - (void)LWLockAcquire(old_partition_lock, LW_EXCLUSIVE); - (void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE); - } else if (old_partition_lock > new_partition_lock) { - (void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE); - (void)LWLockAcquire(old_partition_lock, LW_EXCLUSIVE); - } else { - /* only one partition, only one lock */ - (void)LWLockAcquire(new_partition_lock, LW_EXCLUSIVE); - } -} - BufferDesc * FoundBufferInHashTable(int buf_id, LWLock *new_partition_lock, bool *foundPtr) { BufferDesc *buf = GetBufferDescriptor(buf_id); @@ -626,7 +568,7 @@ static BufferDesc* get_segbuf_from_candidate_list(uint32* buf_state) /* the pagewriter sub thread store normal buffer pool, sub thread starts from 1 */ int thread_id = (list_id + i) % list_num + 1; Assert(thread_id > 0 && thread_id <= list_num); - while (seg_candidate_buf_pop(&buf_id, thread_id)) { + while (candidate_buf_pop(&g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id].seg_list, &buf_id)) { buf = GetBufferDescriptor(buf_id); local_buf_state = LockBufHdr(buf); SegmentCheck(buf_id >= SegmentBufferStartID); diff --git a/src/include/knl/knl_guc/knl_instance_attr_storage.h b/src/include/knl/knl_guc/knl_instance_attr_storage.h index d52797c9b..4e3fda338 100755 --- a/src/include/knl/knl_guc/knl_instance_attr_storage.h +++ b/src/include/knl/knl_guc/knl_instance_attr_storage.h @@ -81,6 +81,14 @@ typedef struct knl_instance_attr_dcf { int dcf_mec_batch_size; } knl_instance_attr_dcf; +typedef struct knl_instance_attr_nvm { + bool enable_nvm; + char* nvm_file_path; + char *nvmBlocks; + double bypassDram; + double bypassNvm; +} knl_instance_attr_nvm; + typedef struct knl_instance_attr_storage { bool wal_log_hints; bool EnableHotStandby; @@ -101,6 +109,7 @@ typedef struct knl_instance_attr_storage { int WalReceiverBufSize; int DataQueueBufSize; int NBuffers; + int NNvmBuffers; int NSegBuffers; int cstore_buffers; int MaxSendSize; @@ -144,6 +153,7 @@ typedef struct knl_instance_attr_storage { int max_concurrent_autonomous_transactions; char* available_zone; knl_instance_attr_dcf dcf_attr; + knl_instance_attr_nvm nvm_attr; int num_internal_lock_partitions[LWLOCK_PART_KIND]; char* num_internal_lock_partitions_str; int wal_insert_status_entries_power; diff --git a/src/include/knl/knl_instance.h b/src/include/knl/knl_instance.h index 7a64d309b..fda260828 100755 --- a/src/include/knl/knl_instance.h +++ b/src/include/knl/knl_instance.h @@ -524,8 +524,10 @@ typedef struct knl_g_ckpt_context { /* pagewriter thread view information */ uint64 page_writer_actual_flush; volatile uint64 get_buf_num_candidate_list; + volatile uint64 nvm_get_buf_num_candidate_list; volatile uint64 seg_get_buf_num_candidate_list; volatile uint64 get_buf_num_clock_sweep; + volatile uint64 nvm_get_buf_num_clock_sweep; volatile uint64 seg_get_buf_num_clock_sweep; /* checkpoint view information */ diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 69a5be22a..4ed9a0584 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -2511,6 +2511,7 @@ typedef struct knl_t_storage_context { struct VirtualTransactionId* proc_vxids; union BufferDescPadded* BufferDescriptors; char* BufferBlocks; + char* NvmBufferBlocks; struct WritebackContext* BackendWritebackContext; struct HTAB* SharedBufHash; struct HTAB* BufFreeListHash; diff --git a/src/include/postmaster/pagewriter.h b/src/include/postmaster/pagewriter.h index a807a9cec..9d159fbb0 100644 --- a/src/include/postmaster/pagewriter.h +++ b/src/include/postmaster/pagewriter.h @@ -31,6 +31,12 @@ #define ENABLE_INCRE_CKPT g_instance.attr.attr_storage.enableIncrementalCheckpoint #define NEED_CONSIDER_USECOUNT u_sess->attr.attr_storage.enable_candidate_buf_usage_count +#define INIT_CANDIDATE_LIST(L, list, size, xx_head, xx_tail) \ + ((L).cand_buf_list = (list), \ + (L).cand_list_size = (size), \ + (L).head = (xx_head), \ + (L).tail = (xx_tail)) + typedef struct PGPROC PGPROC; typedef struct BufferDesc BufferDesc; typedef struct CkptSortItem CkptSortItem; @@ -42,6 +48,21 @@ typedef struct ThrdDwCxt { bool is_new_relfilenode; } ThrdDwCxt; +typedef enum CandListType { + CAND_LIST_NORMAL, + CAND_LIST_NVM, + CAND_LIST_SEG +} CandListType; + +typedef struct CandidateList { + Buffer *cand_buf_list; + volatile int cand_list_size; + pg_atomic_uint64 head; + pg_atomic_uint64 tail; + volatile int buf_id_start; + int32 next_scan_loc; +} CandidateList; + typedef struct PageWriterProc { PGPROC* proc; /* dirty page queue */ @@ -56,23 +77,10 @@ typedef struct PageWriterProc { CkptSortItem *dirty_buf_list; uint32 dirty_list_size; - volatile int buf_id_start; /* buffer id start loc */ - int32 next_scan_normal_loc; - /* thread candidate list, main thread store the segment buffer information */ - Buffer *cand_buf_list; /* thread candidate buffer list */ - volatile int cand_list_size; /* thread candidate list max size, */ - pg_atomic_uint64 head; - pg_atomic_uint64 tail; - - /* thread seg buffer information */ - Buffer *seg_cand_buf_list; - volatile int seg_cand_list_size; - pg_atomic_uint64 seg_head; - pg_atomic_uint64 seg_tail; - - volatile int seg_id_start; /* buffer id start loc */ - int32 next_scan_seg_loc; + CandidateList normal_list; + CandidateList nvm_list; + CandidateList seg_list; } PageWriterProc; typedef struct PageWriterProcs { @@ -162,11 +170,10 @@ extern const incre_ckpt_view_col g_ckpt_view_col[INCRE_CKPT_VIEW_COL_NUM]; extern const incre_ckpt_view_col g_pagewriter_view_col[PAGEWRITER_VIEW_COL_NUM]; extern const incre_ckpt_view_col g_pagewirter_view_two_col[CANDIDATE_VIEW_COL_NUM]; -extern bool candidate_buf_pop(int *buf_id, int thread_id); -extern bool seg_candidate_buf_pop(int *buf_id, int thread_id); +extern bool candidate_buf_pop(CandidateList *list, int *buf_id); extern void candidate_buf_init(void); -extern uint32 get_curr_candidate_nums(bool segment); +extern uint32 get_curr_candidate_nums(CandListType type); extern void PgwrAbsorbFsyncRequests(void); extern Size PageWriterShmemSize(void); extern void PageWriterSyncShmemInit(void); diff --git a/src/include/storage/buf/buf_internals.h b/src/include/storage/buf/buf_internals.h index 3a50a8281..b3e8dfe78 100644 --- a/src/include/storage/buf/buf_internals.h +++ b/src/include/storage/buf/buf_internals.h @@ -39,7 +39,7 @@ * The definition of buffer state components is below. */ #define BUF_REFCOUNT_ONE 1 -#define BUF_REFCOUNT_MASK ((1U << 17) - 1) +#define BUF_REFCOUNT_MASK ((1U << 16) - 1) #define BUF_USAGECOUNT_MASK 0x003C0000U #define BUF_USAGECOUNT_ONE (1U << 18) #define BUF_USAGECOUNT_SHIFT 18 @@ -55,6 +55,7 @@ * Note: TAG_VALID essentially means that there is a buffer hashtable * entry associated with the buffer's tag. */ +#define BM_IN_MIGRATE (1U << 16) /* buffer is migrating */ #define BM_IS_META (1U << 17) #define BM_LOCKED (1U << 22) /* buffer header is locked */ #define BM_DIRTY (1U << 23) /* data needs writing */ @@ -109,6 +110,11 @@ typedef struct buftagnohbkt { BlockNumber blockNum; /* blknum relative to begin of reln */ } BufferTagFirstVer; +/* entry for buffer lookup hashtable */ +typedef struct { + BufferTag key; /* Tag of a disk page */ + int id; /* Associated buffer ID */ +} BufferLookupEnt; #define CLEAR_BUFFERTAG(a) \ ((a).rnode.spcNode = InvalidOid, \ diff --git a/src/include/storage/buf/bufmgr.h b/src/include/storage/buf/bufmgr.h index a2d1f3f3c..0589604d7 100644 --- a/src/include/storage/buf/bufmgr.h +++ b/src/include/storage/buf/bufmgr.h @@ -22,13 +22,18 @@ #include "utils/relcache.h" #include "postmaster/pagerepair.h" +/* [ dram buffer | nvm buffer | segment buffer] */ +#define NVM_BUFFER_NUM (g_instance.attr.attr_storage.NNvmBuffers) #define SEGMENT_BUFFER_NUM (g_instance.attr.attr_storage.NSegBuffers) // 1GB -#define SegmentBufferStartID (g_instance.attr.attr_storage.NBuffers) -#define NORMAL_SHARED_BUFFER_NUM (SegmentBufferStartID) -#define TOTAL_BUFFER_NUM (SEGMENT_BUFFER_NUM + NORMAL_SHARED_BUFFER_NUM) -#define BufferIdOfSegmentBuffer(id) ((id) + SegmentBufferStartID) +#define NvmBufferStartID (g_instance.attr.attr_storage.NBuffers) +#define SegmentBufferStartID (g_instance.attr.attr_storage.NBuffers + g_instance.attr.attr_storage.NNvmBuffers) +#define NORMAL_SHARED_BUFFER_NUM (NvmBufferStartID) +#define TOTAL_BUFFER_NUM (SEGMENT_BUFFER_NUM + NORMAL_SHARED_BUFFER_NUM + NVM_BUFFER_NUM) +#define BufferIdOfSegmentBuffer(id) ((id) + SegmentBufferStartID) +#define BufferIdOfNvmBuffer(id) ((id) + NvmBufferStartID) #define IsSegmentBufferID(id) ((id) >= SegmentBufferStartID) -#define SharedBufferNumber (SegmentBufferStartID) +#define IsNvmBufferID(id) ((id) >= NvmBufferStartID && (id) < SegmentBufferStartID) +#define IsNormalBufferID(id) ((id) >= 0 && (id) < NvmBufferStartID) #define USE_CKPT_THREAD_SYNC (!g_instance.attr.attr_storage.enableIncrementalCheckpoint || \ IsBootstrapProcessingMode() || \ @@ -181,6 +186,8 @@ struct WritebackContext; */ #define BufferGetBlock(buffer) \ (BufferIsLocal(buffer) ? u_sess->storage_cxt.LocalBufferBlockPointers[-(buffer)-1] \ + : IsNvmBufferID((buffer-1)) ? (Block)(t_thrd.storage_cxt.NvmBufferBlocks + ((Size)((uint)(buffer)-1-NvmBufferStartID)) * BLCKSZ) \ + : IsSegmentBufferID(buffer-1) ? (Block)(t_thrd.storage_cxt.BufferBlocks + ((Size)((uint)(buffer)-NVM_BUFFER_NUM-1)) * BLCKSZ) \ : (Block)(t_thrd.storage_cxt.BufferBlocks + ((Size)((uint)(buffer)-1)) * BLCKSZ)) #define ADDR_IN_LOCAL_BUFFER_CONTENT(block) \ @@ -194,6 +201,11 @@ struct WritebackContext; (static_cast(block) >= static_cast(BufferGetBlock(1)) \ && static_cast(block) <= static_cast(BufferGetBlock(TOTAL_BUFFER_NUM)))) +#define ADDR_IN_NVM_BUFFER_CONTENT(block) \ + ( NVM_BUFFER_NUM > 0 && \ + (static_cast(block) >= static_cast(BufferGetBlock(NvmBufferStartID + 1)) \ + && static_cast(block) <= static_cast(BufferGetBlock(SegmentBufferStartID)))) + static inline Buffer BlockGetBuffer(const char *block) { if (ADDR_IN_LOCAL_BUFFER_CONTENT(block)) { @@ -201,7 +213,15 @@ static inline Buffer BlockGetBuffer(const char *block) } if (ADDR_IN_SHARED_BUFFER_CONTENT(block)) { - return 1 + ((block - (const char*)BufferGetBlock(1))/BLCKSZ); + Buffer buffer = 1 + ((block - (const char*)BufferGetBlock(1))/BLCKSZ); + if (buffer > NvmBufferStartID) { + buffer += NVM_BUFFER_NUM; + } + return buffer; + } + + if (ADDR_IN_NVM_BUFFER_CONTENT(block)) { + return 1 + ((block - (const char*)BufferGetBlock(NvmBufferStartID + 1))/BLCKSZ) + NvmBufferStartID; } return InvalidBuffer; @@ -227,7 +247,10 @@ static inline Buffer BlockGetBuffer(const char *block) #define BufferGetPage(buffer) ((Page)BufferGetBlock(buffer)) /* Note: these two macros only work on shared buffers, not local ones! */ -#define BufHdrGetBlock(bufHdr) ((Block)(t_thrd.storage_cxt.BufferBlocks + ((Size)(uint32)(bufHdr)->buf_id) * BLCKSZ)) +#define BufHdrGetBlock(bufHdr) \ + (IsNvmBufferID((bufHdr)->buf_id) ? ((Block)(t_thrd.storage_cxt.NvmBufferBlocks + ((Size)(uint32)(bufHdr)->buf_id - NvmBufferStartID) * BLCKSZ)) \ + : IsSegmentBufferID((bufHdr)->buf_id) ? ((Block)(t_thrd.storage_cxt.BufferBlocks + ((Size)(uint32)(bufHdr)->buf_id - NVM_BUFFER_NUM) * BLCKSZ)) \ + : ((Block)(t_thrd.storage_cxt.BufferBlocks + ((Size)(uint32)(bufHdr)->buf_id) * BLCKSZ))) #define BufferGetLSN(bufHdr) (PageGetLSN(BufHdrGetBlock(bufHdr))) /* Note: this macro only works on local buffers, not shared ones! */ @@ -260,7 +283,14 @@ extern void UnlockReleaseBuffer(Buffer buffer); extern void MarkBufferDirty(Buffer buffer); extern void IncrBufferRefCount(Buffer buffer); extern Buffer ReleaseAndReadBuffer(Buffer buffer, Relation relation, BlockNumber blockNum); +void PageCheckIfCanEliminate(BufferDesc *buf, uint32 *oldFlags, bool *needGetLock); +#ifdef USE_ASSERT_CHECKING +void PageCheckWhenChosedElimination(const BufferDesc *buf, uint32 oldFlags); +#endif +uint32 WaitBufHdrUnlocked(BufferDesc* buf); +void WaitIO(BufferDesc *buf); void InvalidateBuffer(BufferDesc *buf); +void LockTwoLWLock(LWLock *new_partition_lock, LWLock *old_partition_lock); extern void InitBufferPool(void); extern void InitBufferPoolAccess(void); diff --git a/src/include/storage/nvm/nvm.h b/src/include/storage/nvm/nvm.h new file mode 100644 index 000000000..5501081f0 --- /dev/null +++ b/src/include/storage/nvm/nvm.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2020 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * --------------------------------------------------------------------------------------- + * + * nvm.h + * + * + * IDENTIFICATION + * src/include/storage/nvm/nvm.h + * + * --------------------------------------------------------------------------------------- + */ +#ifndef NVM_H +#define NVM_H + +#include "storage/smgr/smgr.h" + +void nvm_init(void); + +BufferDesc *NvmBufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber fork_num, + BlockNumber block_num, BufferAccessStrategy strategy, bool *found, const XLogPhyBlock *pblk); + +#endif