streamDesc use normal hash table

This commit is contained in:
jin.zhao
2024-08-15 18:01:19 +08:00
committed by zhaosen
parent cf050ccb7b
commit ee0023f8a6
3 changed files with 43 additions and 33 deletions

View File

@ -74,6 +74,7 @@ MemoryContext StreamNodeGroup::m_memoryGlobalCxt = NULL;
pthread_mutex_t StreamNodeGroup::m_streamNodeGroupLock;
HTAB* StreamNodeGroup::m_streamNodeGroupTbl = NULL;
HTAB* StreamNodeGroup::m_streamConnectSyncTbl = NULL;
HTAB* StreamNodeGroup::m_streamDescHashTbl = NULL;
pthread_mutex_t StreamNodeGroup::m_streamConnectSyncLock;
static void ConsumerNodeSyncUpMessage(RecursiveUnionController* controller, int step, StreamState* node);
@ -439,6 +440,15 @@ void StreamNodeGroup::StartUp()
hash_create("stream connect sync hash", 256, &nodectl, HASH_ELEM | HASH_FUNCTION | HASH_SHRCTX);
pthread_mutex_init(&m_streamConnectSyncLock, NULL);
rc = memset_s(&nodectl, sizeof(nodectl), 0, sizeof(nodectl));
securec_check(rc, "\0", "\0");
nodectl.keysize = sizeof(StreamKey);
nodectl.entrysize = sizeof(StreamDescElement);
nodectl.hash = tag_hash;
nodectl.hcxt = m_memoryGlobalCxt;
m_streamDescHashTbl =
hash_create("stream desc hash", STREAM_DESC_HASH_NUMBER, &nodectl, HASH_ELEM | HASH_FUNCTION | HASH_SHRCTX);
}
/*
@ -1908,6 +1918,7 @@ void StreamNodeGroup::MarkRecursiveVfdInvalid()
void StreamNodeGroup::BuildStreamDesc(const uint64& queryId, Plan* node)
{
StreamKey streamKey;
memset_s(&streamKey, sizeof(streamKey), 0, sizeof(streamKey));
streamKey.queryId = queryId;
streamKey.planNodeId = node->plan_node_id;
@ -1927,31 +1938,34 @@ void StreamNodeGroup::BuildStreamDesc(const uint64& queryId, Plan* node)
if (!parallelDesc) {
return;
}
m_streamDesc.insert({streamKey, parallelDesc});
bool found = false;
StreamDescElement* element = (StreamDescElement*)hash_search(m_streamDescHashTbl, &streamKey, HASH_ENTER, &found);
if (found != false) {
ereport(ERROR, (errcode(ERRCODE_SYSTEM_ERROR), errmsg("streamKey of stream nodegroup id is duplicated")));
}
element->key = streamKey;
element->parallelDesc = (ParallelIndexScanDescData*)parallelDesc;
}
void StreamNodeGroup::DestroyStreamDesc(const uint64& queryId, Plan* node)
{
StreamKey streamKey;
memset_s(&streamKey, sizeof(streamKey), 0, sizeof(streamKey));
streamKey.queryId = queryId;
streamKey.planNodeId = node->plan_node_id;
std::unordered_map<StreamKey, void*, KeyHash, KeyEqual>::iterator iter;
bool found = false;
StreamDescElement* element = NULL;
switch (nodeTag(node)) {
case T_IndexScan:
iter = m_streamDesc.find(streamKey);
if (m_streamDesc.end() == iter) {
return;
}
if (iter->second) {
if (((ParallelIndexScanDescData*)iter->second)->psBtpscan) {
delete ((ParallelIndexScanDescData*)iter->second)->psBtpscan;
element = (StreamDescElement*)hash_search(m_streamDescHashTbl, &streamKey, HASH_FIND, &found);
if (found == true) {
if (((ParallelIndexScanDescData*)element->parallelDesc)->psBtpscan) {
delete ((ParallelIndexScanDescData*)element->parallelDesc)->psBtpscan;
}
pfree(iter->second);
pfree(element->parallelDesc);
(StreamDescElement*)hash_search(m_streamDescHashTbl, &streamKey, HASH_REMOVE, NULL);
}
m_streamDesc.erase(streamKey);
break;
default:
break;
@ -1961,10 +1975,16 @@ void StreamNodeGroup::DestroyStreamDesc(const uint64& queryId, Plan* node)
void* StreamNodeGroup::GetParalleDesc(const uint64& queryId, const uint64& planNodeId)
{
StreamKey key;
memset_s(&key, sizeof(key), 0, sizeof(key));
key.queryId = queryId;
key.planNodeId = planNodeId;
std::unordered_map<StreamKey, void*, KeyHash, KeyEqual>::iterator iter = m_streamDesc.find(key);
return (m_streamDesc.end() == iter) ? NULL : iter->second;
bool found = false;
StreamDescElement* element = (StreamDescElement*)hash_search(m_streamDescHashTbl, &key, HASH_FIND, &found);
if (found == false) {
return NULL;
} else {
return element->parallelDesc;
}
}
#ifndef ENABLE_MULTIPLE_NODES

View File

@ -28,7 +28,6 @@
#define SRC_INCLUDE_DISTRIBUTELAYER_STREAMCORE_H_
#include <signal.h>
#include <unordered_map>
#include "postgres.h"
#include "knl/knl_variable.h"
@ -59,6 +58,8 @@
#define TupleVectorMaxSize 100
#define STREAM_DESC_HASH_NUMBER 256
#define IS_STREAM_PORTAL (!StreamThreadAmI() && portal->streamInfo.streamGroup != NULL)
struct StreamState;
@ -104,6 +105,11 @@ typedef struct {
uint64 key;
} StreamConnectSyncElement;
typedef struct {
StreamKey key;
ParallelIndexScanDescData* parallelDesc;
} StreamDescElement;
enum StreamObjType {
STREAM_PRODUCER,
STREAM_CONSUMER,
@ -526,21 +532,7 @@ private:
/* Mark Stream query quit status. */
StreamObjStatus m_quitStatus;
#endif
struct KeyHash {
std::size_t operator()(const StreamKey& k) const
{
return std::hash<uint>()(k.queryId) ^
(std::hash<uint>()(k.planNodeId) << 1);
}
};
struct KeyEqual {
bool operator()(const StreamKey& lhs, const StreamKey& rhs) const
{
return lhs.queryId == rhs.queryId && lhs.planNodeId == rhs.planNodeId;
}
};
std::unordered_map<StreamKey, void*, KeyHash, KeyEqual> m_streamDesc;
static HTAB* m_streamDescHashTbl;
};
extern bool IsThreadProcessStreamRecursive();

View File

@ -29,6 +29,4 @@ extern int TableScanBitmapNextTargetRel(TableScanDesc scan, BitmapHeapScanState
extern TupleTableSlot* ExecBitmapHeapScan(PlanState* state);
extern void ExecInitPartitionForBitmapHeapScan(BitmapHeapScanState* scanstate, EState* estate);
#define BITMAP_PREFETCH_PAGE_RATIO 2
#endif /* NODEBITMAPHEAPSCAN_H */