From b68fd12170e03d4d117235d692581f356e6fff4d Mon Sep 17 00:00:00 2001 From: Mijamind Date: Wed, 10 Jan 2024 10:44:30 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E3=80=90=E8=B5=84=E6=BA=90=E6=B1=A0?= =?UTF-8?q?=E5=8C=96=E3=80=91SPQ=E6=94=AF=E6=8C=81Direct=20Read=E7=89=B9?= =?UTF-8?q?=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/common/backend/nodes/copyfuncs.cpp | 1 + src/common/backend/nodes/outfuncs.cpp | 1 + src/common/backend/nodes/readfuncs.cpp | 1 + .../backend/pgxc_single/pool/execRemote.cpp | 79 +++++++++++++++++- src/gausskernel/process/tcop/postgres.cpp | 29 +++++++ .../process/threadpool/knl_session.cpp | 1 + .../storage/smgr/segment/data_file.cpp | 23 ++++++ src/gausskernel/storage/smgr/segstore.cpp | 80 +++++++++++++++++++ .../knl/knl_guc/knl_session_attr_spq.h | 2 + src/include/knl/knl_session.h | 7 ++ src/include/nodes/plannodes.h | 1 + src/include/storage/smgr/segment.h | 3 + src/include/storage/smgr/segment_internal.h | 1 + 13 files changed, 228 insertions(+), 1 deletion(-) diff --git a/src/common/backend/nodes/copyfuncs.cpp b/src/common/backend/nodes/copyfuncs.cpp index ca8ce120c..2eae76039 100644 --- a/src/common/backend/nodes/copyfuncs.cpp +++ b/src/common/backend/nodes/copyfuncs.cpp @@ -7462,6 +7462,7 @@ static SpqSeqScan* _copySpqSeqScan(const SpqSeqScan* from) newnode->isFullTableScan = from->isFullTableScan; newnode->isAdaptiveScan = from->isAdaptiveScan; newnode->isDirectRead = from->isDirectRead; + newnode->DirectReadBlkNum = from->DirectReadBlkNum; return newnode; } diff --git a/src/common/backend/nodes/outfuncs.cpp b/src/common/backend/nodes/outfuncs.cpp index a7072876c..e7b32dda7 100755 --- a/src/common/backend/nodes/outfuncs.cpp +++ b/src/common/backend/nodes/outfuncs.cpp @@ -1444,6 +1444,7 @@ static void _outSpqSeqScan(StringInfo str, SpqSeqScan* node) WRITE_BOOL_FIELD(isFullTableScan); WRITE_BOOL_FIELD(isAdaptiveScan); WRITE_BOOL_FIELD(isDirectRead); + WRITE_UINT_FIELD(DirectReadBlkNum); } static void _outAssertOp(StringInfo str, const AssertOp *node) diff --git a/src/common/backend/nodes/readfuncs.cpp b/src/common/backend/nodes/readfuncs.cpp index 242379419..f23375953 100755 --- a/src/common/backend/nodes/readfuncs.cpp +++ b/src/common/backend/nodes/readfuncs.cpp @@ -4782,6 +4782,7 @@ static SpqSeqScan* _readSpqSeqScan(void) READ_BOOL_FIELD(isFullTableScan); READ_BOOL_FIELD(isAdaptiveScan); READ_BOOL_FIELD(isDirectRead); + READ_UINT_FIELD(DirectReadBlkNum); READ_END(); } diff --git a/src/common/backend/pgxc_single/pool/execRemote.cpp b/src/common/backend/pgxc_single/pool/execRemote.cpp index 8c7352725..b2fa2cb3e 100755 --- a/src/common/backend/pgxc_single/pool/execRemote.cpp +++ b/src/common/backend/pgxc_single/pool/execRemote.cpp @@ -481,6 +481,39 @@ static void HandleLibcommPort(PGXCNodeHandle *conn, const char *msg_body, size_t conn->listenPort = ntohs(n16); elog(DEBUG1, "spq HandleLibcommPort get [%d:%d]", conn->tcpCtlPort, conn->listenPort); } +static void HandleDirectRead(PGXCNodeHandle *conn, const char *msg_body, size_t len) +{ + Assert(msg_body != NULL); + Assert(len >= 4); + errno_t rc = 0; + uint32 n32; + int readlen = 0; + rc = memcpy_s(&n32, sizeof(uint32), msg_body, sizeof(uint32)); + securec_check(rc, "", ""); + int oidcount = ntohl(n32); + readlen += sizeof(uint32); + int checkLen = sizeof(uint32) + (sizeof(uint32) + sizeof(uint32)) * oidcount; + if (len != checkLen) { + elog(LOG, "HandleDirectRead len not match [%d:%d]", len, checkLen); + return; + } + ListCell *cell; + foreach (cell, u_sess->spq_cxt.direct_read_map) { + rc = memcpy_s(&n32, sizeof(uint32), msg_body + readlen, sizeof(uint32)); + securec_check(rc, "", ""); + Oid oid = ntohl(n32); + readlen += sizeof(uint32); + + rc = memcpy_s(&n32, sizeof(uint32), msg_body + readlen, sizeof(uint32)); + securec_check(rc, "", ""); + uint32 ReadBlkNum = ntohl(n32); + readlen += sizeof(uint32); + SpqDirectReadEntry *entry = (SpqDirectReadEntry *)lfirst(cell); + if (entry->rel_id == oid) { + entry->nums = ReadBlkNum; + } + } +} static void HandleLocalCsnMin(PGXCNodeHandle* conn, const char* msg_body, size_t len) { Assert(msg_body != NULL); @@ -1009,6 +1042,9 @@ int spq_handle_response(PGXCNodeHandle* conn, RemoteQueryState* combiner, bool i case 'l': /* libcomm port infomation */ HandleLibcommPort(conn, msg, msg_len); break; + case 'i': /* Direct Read */ + HandleDirectRead(conn, msg, msg_len); + break; case 'g': /* DN top xid */ HandleDatanodeGxid(conn, msg, msg_len); break; @@ -1583,6 +1619,27 @@ void spq_cancel_query(void) } } +static void spq_do_direct_read() +{ + if (u_sess->spq_cxt.direct_read_map != NIL) { + ListCell *cell; + foreach (cell, u_sess->spq_cxt.direct_read_map) { + SpqDirectReadEntry *entry = (SpqDirectReadEntry *)lfirst(cell); + if (entry->nums != InvalidBlockNumber) { + ListCell *nodecell; + foreach (nodecell, entry->spq_seq_scan_node_list) { + SpqSeqScan *node = (SpqSeqScan *)lfirst(nodecell); + node->isDirectRead = true; + node->DirectReadBlkNum = entry->nums; + } + list_free(entry->spq_seq_scan_node_list); + } + } + list_free(u_sess->spq_cxt.direct_read_map); + u_sess->spq_cxt.direct_read_map = NIL; + } +} + void spq_do_query(RemoteQueryState* node) { RemoteQuery* step = (RemoteQuery*)node->ss.ps.plan; @@ -1621,6 +1678,9 @@ void spq_do_query(RemoteQueryState* node) * Current it should be RO transaction */ pgxc_node_send_queryid_with_sync(connections, regular_conn_count, node->queryId); + if (u_sess->attr.attr_spq.spq_enable_direct_read) { + spq_do_direct_read(); + } update_spq_port(connections, planstmt, regular_conn_count); spq_startQcThread(node); @@ -5834,8 +5894,9 @@ static void pgxc_node_send_queryid_with_sync(PGXCNodeHandle** connections, int c Assert(queryId != 0); + int countOID = list_length(u_sess->spq_cxt.direct_read_map); for (i = 0; i < conn_count; i++) { - int msglen = 12; + int msglen = sizeof(int) + sizeof(uint64) + sizeof(int) + sizeof(Oid) * countOID; if (temp_connections[i]->state == DN_CONNECTION_STATE_QUERY) BufferConnection(temp_connections[i]); @@ -5865,6 +5926,22 @@ static void pgxc_node_send_queryid_with_sync(PGXCNodeHandle** connections, int c securec_check(ss_rc, "\0", "\0"); temp_connections[i]->outEnd += sizeof(uint64); + ss_rc = memcpy_s(temp_connections[i]->outBuffer + temp_connections[i]->outEnd, + temp_connections[i]->outSize - temp_connections[i]->outEnd, + &countOID, + sizeof(int)); + securec_check(ss_rc, "\0", "\0"); + temp_connections[i]->outEnd += sizeof(int); + ListCell *cell; + foreach (cell, u_sess->spq_cxt.direct_read_map) { + SpqDirectReadEntry *entry = (SpqDirectReadEntry *)lfirst(cell); + ss_rc = memcpy_s(temp_connections[i]->outBuffer + temp_connections[i]->outEnd, + temp_connections[i]->outSize - temp_connections[i]->outEnd, + &(entry->rel_id), sizeof(Oid)); + securec_check(ss_rc, "\0", "\0"); + temp_connections[i]->outEnd += sizeof(Oid); + } + if (pgxc_node_flush(temp_connections[i]) != 0) { temp_connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL; ereport(ERROR, diff --git a/src/gausskernel/process/tcop/postgres.cpp b/src/gausskernel/process/tcop/postgres.cpp index 51446cfb4..069f38f82 100755 --- a/src/gausskernel/process/tcop/postgres.cpp +++ b/src/gausskernel/process/tcop/postgres.cpp @@ -9965,6 +9965,8 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam /* Set top consumer at the very beginning. */ StreamTopConsumerIam(); + List* oidlist = NIL; + int oidcount = 0; /* Set the query id we were passed down */ errno_t rc = memcpy_s(&u_sess->debug_query_id, @@ -9973,6 +9975,17 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam sizeof(uint64)); securec_check(rc, "\0", "\0"); ereport(DEBUG1, (errmsg("Received new query id %lu", u_sess->debug_query_id))); + rc = memcpy_s(&oidcount, sizeof(int), + pq_getmsgbytes(&input_message, sizeof(int)), sizeof(int)); + securec_check(rc,"\0","\0"); + for (int i = 0; i < oidcount; i++) { + Oid tmp = InvalidOid; + rc = memcpy_s(&tmp, sizeof(Oid), + pq_getmsgbytes(&input_message, sizeof(Oid)), sizeof(Oid)); + securec_check(rc,"\0","\0"); + oidlist = lappend_oid(oidlist, tmp); + } + pq_getmsgend(&input_message); /* @@ -9988,6 +10001,22 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam pq_sendint16(&buf, g_instance.attr.attr_network.comm_control_port); pq_sendint16(&buf, g_instance.attr.attr_network.comm_sctp_port); pq_endmessage(&buf); + if (SS_PRIMARY_MODE && oidcount > 0) { + StringInfoData bufoid; + pq_beginmessage(&bufoid, 'i'); + pq_sendint(&bufoid, oidcount, 4); + ListCell *oid; + foreach (oid, oidlist) { + Oid relOid = lfirst_oid(oid); + Relation rel = heap_open(relOid, AccessShareLock); + BlockNumber ReadBlkNum = RelationGetNumberOfBlocks(rel); + heap_sync(rel); + heap_close(rel, AccessShareLock); + pq_sendint(&bufoid, (int)relOid, 4); + pq_sendint(&bufoid, (int)ReadBlkNum, 4); + } + pq_endmessage(&bufoid); + } pq_putemptymessage('O'); /* PlanIdComplete */ pq_flush(); } break; diff --git a/src/gausskernel/process/threadpool/knl_session.cpp b/src/gausskernel/process/threadpool/knl_session.cpp index 689d46da3..e8d5ea9f2 100755 --- a/src/gausskernel/process/threadpool/knl_session.cpp +++ b/src/gausskernel/process/threadpool/knl_session.cpp @@ -1432,6 +1432,7 @@ static void knl_u_spq_init(knl_u_spq_context* spq_cxt) spq_cxt->mdcache_invalidation_counter_registered = false; spq_cxt->mdcache_invalidation_counter = 0; spq_cxt->last_mdcache_invalidation_counter = 0; + spq_cxt->direct_read_map = NIL; } #endif diff --git a/src/gausskernel/storage/smgr/segment/data_file.cpp b/src/gausskernel/storage/smgr/segment/data_file.cpp index 4d88ed328..f400d2626 100644 --- a/src/gausskernel/storage/smgr/segment/data_file.cpp +++ b/src/gausskernel/storage/smgr/segment/data_file.cpp @@ -657,6 +657,29 @@ void df_pread_block(SegLogicFile *sf, char *buffer, BlockNumber blocknum) } } +void df_direct_pread_block(SegLogicFile *sf, char *buffer, BlockNumber blocknum, BlockNumber *blocknums) +{ + off_t offset = ((off_t)blocknum) * BLCKSZ; + int sliceno = DF_OFFSET_TO_SLICENO(offset); + off_t roffset = DF_OFFSET_TO_SLICE_OFFSET(offset); + + pgstat_report_waitevent(WAIT_EVENT_DATA_FILE_READ); + SegPhysicalFile spf = df_get_physical_file(sf, sliceno, blocknum); + int nbytes = pread(spf.fd, buffer, (*blocknums) * BLCKSZ, roffset); + pgstat_report_waitevent(WAIT_EVENT_END); + if (nbytes > ((uint64)(*blocknums) * BLCKSZ) || nbytes < 0) { + ereport(ERROR, + (errcode(MOD_SEGMENT_PAGE), + errcode_for_file_access(), + errmsg("could not direct read segment block %d to block %d in file %s and read size = %d", + blocknum, blocknum + (*blocknums), sf->filename, nbytes), + errdetail("errno: %d", errno))); + } else if (nbytes < ((uint64)(*blocknums) * BLCKSZ)) { + ereport(DEBUG1, (errmsg("Direct read has been cut off from %d to %d", *blocknums, nbytes / BLCKSZ))); + *blocknums = nbytes / BLCKSZ; + } +} + void df_pwrite_block(SegLogicFile *sf, const char *buffer, BlockNumber blocknum) { off_t offset = ((off_t)blocknum) * BLCKSZ; diff --git a/src/gausskernel/storage/smgr/segstore.cpp b/src/gausskernel/storage/smgr/segstore.cpp index 0c8bef948..811421e5e 100755 --- a/src/gausskernel/storage/smgr/segstore.cpp +++ b/src/gausskernel/storage/smgr/segstore.cpp @@ -1965,3 +1965,83 @@ bool repair_check_physical_type(uint32 spcNode, uint32 dbNode, int32 forkNum, ui return false; } +/* + * to get direct read info + */ +BlockNumber seg_direct_read_get_range(BlockNumber logic_id) +{ + uint32 offset; + ExtentSize extent_size; + if (logic_id < EXT_SIZE_8_TOTAL_PAGES) { + extent_size = EXT_SIZE_8; + offset = logic_id % EXT_SIZE_8; + } else if (logic_id < EXT_SIZE_128_TOTAL_PAGES) { + extent_size = EXT_SIZE_128; + logic_id -= EXT_SIZE_8_TOTAL_PAGES; + offset = logic_id % EXT_SIZE_128; + } else if (logic_id < EXT_SIZE_1024_TOTAL_PAGES) { + extent_size = EXT_SIZE_1024; + logic_id -= EXT_SIZE_128_TOTAL_PAGES; + offset = logic_id % EXT_SIZE_1024; + } else { + extent_size = EXT_SIZE_8192; + logic_id -= EXT_SIZE_1024_TOTAL_PAGES; + offset = logic_id % EXT_SIZE_8192; + } + return extent_size - offset; +} + +/* + * IMPORTANT: + * pages during (blocknum) to (blocknum + blocknums) should be in one extent + * pages should be checked by PageIsVerified in future works + */ +void seg_direct_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber *blocknums, char *buffer, + BlockNumber *locBlock) +{ + LOG_SMGR_API(reln->smgr_rnode, forknum, blocknum, "seg_direct_read"); + + Buffer seg_buffer = read_head_buffer(reln, forknum, false); + if (ENABLE_DMS) { + LockBuffer(seg_buffer, BUFFER_LOCK_SHARE); + } + SegmentCheck(BufferIsValid(seg_buffer)); + SegmentHead *seg_head = (SegmentHead *)PageGetContents(BufferGetBlock(seg_buffer)); + SegmentCheck(IsNormalSegmentHead(seg_head)); + + if (seg_head->nblocks < blocknum + (*blocknums)) { + if (ENABLE_DMS) { + LockBuffer(seg_buffer, BUFFER_LOCK_UNLOCK); + } + SegReleaseBuffer(seg_buffer); + ereport(ERROR, (errmodule(MOD_SEGMENT_PAGE), errcode(ERRCODE_DATA_CORRUPTED), + errmsg("seg_direct_read blocknum exceeds segment size"), + errdetail("segment %s, head: %u, read block %u, read nums %u, but nblocks is %u", + relpathperm(reln->smgr_rnode.node, forknum), reln->seg_desc[forknum]->head_blocknum, + blocknum, *blocknums, seg_head->nblocks))); + } + + if (ENABLE_DMS) { + LockBuffer(seg_buffer, BUFFER_LOCK_UNLOCK); + } + + LockSegmentHeadPartition(reln->seg_space->spcNode, reln->seg_space->dbNode, reln->seg_desc[forknum]->head_blocknum, + LW_SHARED); + + SegPageLocation loc = seg_logic_to_physic_mapping(reln, seg_head, blocknum); + SegmentCheck(loc.blocknum != InvalidBlockNumber); + + SegSpace *spc = reln->seg_space; + int egid = EXTENT_TYPE_TO_GROUPID(EXTENT_GROUP_RNODE(spc, loc.extent_size).relNode); + SegExtentGroup *seg = &spc->extent_group[egid][forknum]; + *locBlock = loc.blocknum; + df_direct_pread_block(seg->segfile, buffer, loc.blocknum, blocknums); + + UnlockSegmentHeadPartition(reln->seg_space->spcNode, reln->seg_space->dbNode, + reln->seg_desc[forknum]->head_blocknum); + + /* extent_size == 1 means segment metadata, which should not be used as heap data */ + SegmentCheck(loc.extent_size != 1); + + SegReleaseBuffer(seg_buffer); +} \ No newline at end of file diff --git a/src/include/knl/knl_guc/knl_session_attr_spq.h b/src/include/knl/knl_guc/knl_session_attr_spq.h index fbebbcf6d..a063be893 100644 --- a/src/include/knl/knl_guc/knl_session_attr_spq.h +++ b/src/include/knl/knl_guc/knl_session_attr_spq.h @@ -203,6 +203,8 @@ typedef struct knl_session_attr_spq { int spq_scan_unit_size; int spq_scan_unit_bit; char *gauss_cluster_map; + double spq_small_table_threshold; + bool spq_enable_direct_read; /* enable spq btbuild */ bool spq_enable_btbuild; diff --git a/src/include/knl/knl_session.h b/src/include/knl/knl_session.h index 1eccc530c..a8cb3e45b 100644 --- a/src/include/knl/knl_session.h +++ b/src/include/knl/knl_session.h @@ -2756,6 +2756,12 @@ namespace spqopt { class CXformFactory; } +typedef struct SpqDirectReadEntry { + Oid rel_id; + BlockNumber nums; + List *spq_seq_scan_node_list; +} SpqDirectReadEntry; + typedef struct knl_u_spq_context { /* dxl information */ spqdxl::CDXLMemoryManager* dxl_memory_manager; @@ -2793,6 +2799,7 @@ typedef struct knl_u_spq_context { List *remoteQuerys; List *adp_connections; struct SnapshotData* snapshot; + List *direct_read_map; } knl_u_spq_context; #endif diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 3dd1cc596..077c77c2d 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -662,6 +662,7 @@ typedef struct SpqSeqScan { bool isFullTableScan; bool isAdaptiveScan; bool isDirectRead; + uint32 DirectReadBlkNum; } SpqSeqScan; #endif /* diff --git a/src/include/storage/smgr/segment.h b/src/include/storage/smgr/segment.h index adc02f33e..68351e5ae 100644 --- a/src/include/storage/smgr/segment.h +++ b/src/include/storage/smgr/segment.h @@ -49,6 +49,9 @@ void seg_async_read(SMgrRelation reln, ForkNumber forknum, AioDispatchDesc_t **d void seg_async_write(SMgrRelation reln, ForkNumber forknum, AioDispatchDesc_t **dList, int32 dn); void seg_move_buckets(const RelFileNodeBackend &dest, const RelFileNodeBackend &src, List *bucketList); bool seg_fork_exists(SegSpace *spc, SMgrRelation reln, ForkNumber forknum, const XLogPhyBlock *pblk); +void seg_direct_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber *blocknums, char *buffer, + BlockNumber *locBlock); +BlockNumber seg_direct_read_get_range(BlockNumber logic_id); /* Read/write by physical block number; used for segment meta data */ void seg_physical_read(SegSpace *spc, RelFileNode &rNode, ForkNumber forknum, BlockNumber blocknum, char *buffer); diff --git a/src/include/storage/smgr/segment_internal.h b/src/include/storage/smgr/segment_internal.h index 4861caaaa..a29a415c5 100644 --- a/src/include/storage/smgr/segment_internal.h +++ b/src/include/storage/smgr/segment_internal.h @@ -96,6 +96,7 @@ void df_ctrl_init(SegLogicFile *sf, RelFileNode relNode, ForkNumber forknum); void df_open_files(SegLogicFile *sf); void df_extend(SegLogicFile *sf, BlockNumber target_blocks); void df_pread_block(SegLogicFile *sf, char *buffer, BlockNumber blocknum); +void df_direct_pread_block(SegLogicFile *sf, char *buffer, BlockNumber blocknum, BlockNumber *blocknums); void df_pwrite_block(SegLogicFile *sf, const char *buffer, BlockNumber blocknum); void df_fsync(SegLogicFile *sf); void df_unlink(SegLogicFile *sf); From a08439c65547c4e3a08339a4418c40ef7fc3dee7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=98=99=E9=B8=A3=E5=81=A5?= Date: Thu, 11 Jan 2024 01:47:59 +0000 Subject: [PATCH 2/2] update src/gausskernel/storage/smgr/segstore.cpp. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 阙鸣健 --- src/gausskernel/storage/smgr/segstore.cpp | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/src/gausskernel/storage/smgr/segstore.cpp b/src/gausskernel/storage/smgr/segstore.cpp index 811421e5e..7ff5c9893 100755 --- a/src/gausskernel/storage/smgr/segstore.cpp +++ b/src/gausskernel/storage/smgr/segstore.cpp @@ -1970,24 +1970,11 @@ bool repair_check_physical_type(uint32 spcNode, uint32 dbNode, int32 forkNum, ui */ BlockNumber seg_direct_read_get_range(BlockNumber logic_id) { + uint32 extent_id; uint32 offset; ExtentSize extent_size; - if (logic_id < EXT_SIZE_8_TOTAL_PAGES) { - extent_size = EXT_SIZE_8; - offset = logic_id % EXT_SIZE_8; - } else if (logic_id < EXT_SIZE_128_TOTAL_PAGES) { - extent_size = EXT_SIZE_128; - logic_id -= EXT_SIZE_8_TOTAL_PAGES; - offset = logic_id % EXT_SIZE_128; - } else if (logic_id < EXT_SIZE_1024_TOTAL_PAGES) { - extent_size = EXT_SIZE_1024; - logic_id -= EXT_SIZE_128_TOTAL_PAGES; - offset = logic_id % EXT_SIZE_1024; - } else { - extent_size = EXT_SIZE_8192; - logic_id -= EXT_SIZE_1024_TOTAL_PAGES; - offset = logic_id % EXT_SIZE_8192; - } + + SegLogicPageIdToExtentId(logic_id, &extent_id, &offset, &extent_size); return extent_size - offset; }