diff --git a/src/gausskernel/process/stream/streamCore.cpp b/src/gausskernel/process/stream/streamCore.cpp index 25fbd65d3..e39dcebe4 100755 --- a/src/gausskernel/process/stream/streamCore.cpp +++ b/src/gausskernel/process/stream/streamCore.cpp @@ -74,6 +74,7 @@ MemoryContext StreamNodeGroup::m_memoryGlobalCxt = NULL; pthread_mutex_t StreamNodeGroup::m_streamNodeGroupLock; HTAB* StreamNodeGroup::m_streamNodeGroupTbl = NULL; HTAB* StreamNodeGroup::m_streamConnectSyncTbl = NULL; +HTAB* StreamNodeGroup::m_streamDescHashTbl = NULL; pthread_mutex_t StreamNodeGroup::m_streamConnectSyncLock; static void ConsumerNodeSyncUpMessage(RecursiveUnionController* controller, int step, StreamState* node); @@ -439,6 +440,15 @@ void StreamNodeGroup::StartUp() hash_create("stream connect sync hash", 256, &nodectl, HASH_ELEM | HASH_FUNCTION | HASH_SHRCTX); pthread_mutex_init(&m_streamConnectSyncLock, NULL); + rc = memset_s(&nodectl, sizeof(nodectl), 0, sizeof(nodectl)); + securec_check(rc, "\0", "\0"); + nodectl.keysize = sizeof(StreamKey); + nodectl.entrysize = sizeof(StreamDescElement); + nodectl.hash = tag_hash; + nodectl.hcxt = m_memoryGlobalCxt; + + m_streamDescHashTbl = + hash_create("stream desc hash", STREAM_DESC_HASH_NUMBER, &nodectl, HASH_ELEM | HASH_FUNCTION | HASH_SHRCTX); } /* @@ -1908,6 +1918,7 @@ void StreamNodeGroup::MarkRecursiveVfdInvalid() void StreamNodeGroup::BuildStreamDesc(const uint64& queryId, Plan* node) { StreamKey streamKey; + memset_s(&streamKey, sizeof(streamKey), 0, sizeof(streamKey)); streamKey.queryId = queryId; streamKey.planNodeId = node->plan_node_id; @@ -1927,31 +1938,34 @@ void StreamNodeGroup::BuildStreamDesc(const uint64& queryId, Plan* node) if (!parallelDesc) { return; } - - m_streamDesc.insert({streamKey, parallelDesc}); + bool found = false; + StreamDescElement* element = (StreamDescElement*)hash_search(m_streamDescHashTbl, &streamKey, HASH_ENTER, &found); + if (found != false) { + ereport(ERROR, (errcode(ERRCODE_SYSTEM_ERROR), errmsg("streamKey of stream nodegroup id is duplicated"))); + } + element->key = streamKey; + element->parallelDesc = (ParallelIndexScanDescData*)parallelDesc; } void StreamNodeGroup::DestroyStreamDesc(const uint64& queryId, Plan* node) { StreamKey streamKey; + memset_s(&streamKey, sizeof(streamKey), 0, sizeof(streamKey)); streamKey.queryId = queryId; streamKey.planNodeId = node->plan_node_id; - - std::unordered_map::iterator iter; + bool found = false; + StreamDescElement* element = NULL; switch (nodeTag(node)) { case T_IndexScan: - iter = m_streamDesc.find(streamKey); - if (m_streamDesc.end() == iter) { - return; - } - if (iter->second) { - if (((ParallelIndexScanDescData*)iter->second)->psBtpscan) { - delete ((ParallelIndexScanDescData*)iter->second)->psBtpscan; + element = (StreamDescElement*)hash_search(m_streamDescHashTbl, &streamKey, HASH_FIND, &found); + if (found == true) { + if (((ParallelIndexScanDescData*)element->parallelDesc)->psBtpscan) { + delete ((ParallelIndexScanDescData*)element->parallelDesc)->psBtpscan; } - pfree(iter->second); + pfree(element->parallelDesc); + (StreamDescElement*)hash_search(m_streamDescHashTbl, &streamKey, HASH_REMOVE, NULL); } - m_streamDesc.erase(streamKey); break; default: break; @@ -1961,10 +1975,16 @@ void StreamNodeGroup::DestroyStreamDesc(const uint64& queryId, Plan* node) void* StreamNodeGroup::GetParalleDesc(const uint64& queryId, const uint64& planNodeId) { StreamKey key; + memset_s(&key, sizeof(key), 0, sizeof(key)); key.queryId = queryId; key.planNodeId = planNodeId; - std::unordered_map::iterator iter = m_streamDesc.find(key); - return (m_streamDesc.end() == iter) ? NULL : iter->second; + bool found = false; + StreamDescElement* element = (StreamDescElement*)hash_search(m_streamDescHashTbl, &key, HASH_FIND, &found); + if (found == false) { + return NULL; + } else { + return element->parallelDesc; + } } #ifndef ENABLE_MULTIPLE_NODES diff --git a/src/include/distributelayer/streamCore.h b/src/include/distributelayer/streamCore.h index b4dacb7e4..d7f191780 100755 --- a/src/include/distributelayer/streamCore.h +++ b/src/include/distributelayer/streamCore.h @@ -28,7 +28,6 @@ #define SRC_INCLUDE_DISTRIBUTELAYER_STREAMCORE_H_ #include -#include #include "postgres.h" #include "knl/knl_variable.h" @@ -59,6 +58,8 @@ #define TupleVectorMaxSize 100 +#define STREAM_DESC_HASH_NUMBER 256 + #define IS_STREAM_PORTAL (!StreamThreadAmI() && portal->streamInfo.streamGroup != NULL) struct StreamState; @@ -104,6 +105,11 @@ typedef struct { uint64 key; } StreamConnectSyncElement; +typedef struct { + StreamKey key; + ParallelIndexScanDescData* parallelDesc; +} StreamDescElement; + enum StreamObjType { STREAM_PRODUCER, STREAM_CONSUMER, @@ -526,21 +532,7 @@ private: /* Mark Stream query quit status. */ StreamObjStatus m_quitStatus; #endif - struct KeyHash { - std::size_t operator()(const StreamKey& k) const - { - return std::hash()(k.queryId) ^ - (std::hash()(k.planNodeId) << 1); - } - }; - - struct KeyEqual { - bool operator()(const StreamKey& lhs, const StreamKey& rhs) const - { - return lhs.queryId == rhs.queryId && lhs.planNodeId == rhs.planNodeId; - } - }; - std::unordered_map m_streamDesc; + static HTAB* m_streamDescHashTbl; }; extern bool IsThreadProcessStreamRecursive(); diff --git a/src/include/executor/node/nodeBitmapHeapscan.h b/src/include/executor/node/nodeBitmapHeapscan.h index 9f6560123..0b0f1a411 100644 --- a/src/include/executor/node/nodeBitmapHeapscan.h +++ b/src/include/executor/node/nodeBitmapHeapscan.h @@ -29,6 +29,4 @@ extern int TableScanBitmapNextTargetRel(TableScanDesc scan, BitmapHeapScanState extern TupleTableSlot* ExecBitmapHeapScan(PlanState* state); extern void ExecInitPartitionForBitmapHeapScan(BitmapHeapScanState* scanstate, EState* estate); -#define BITMAP_PREFETCH_PAGE_RATIO 2 - #endif /* NODEBITMAPHEAPSCAN_H */