【资源池化】SPQ修复vector stream分发功能

This commit is contained in:
Mijamind
2024-01-30 19:15:03 +08:00
parent de25890e12
commit 6428a15e68

View File

@ -1376,30 +1376,16 @@ void StreamProducer::DispatchBatchRedistrFunctionByRedisType()
case PARALLEL_NONE:
#ifdef ENABLE_MULTIPLE_NODES
case REMOTE_DISTRIBUTE:
if (m_hasExprKey) {
m_channelCalFun = ((list_length(m_consumerNodes->nodeList) == 1) ?
&StreamProducer::redistributeTupleChannelWithExpr<REMOTE_DIRECT_DISTRIBUTE> :
&StreamProducer::redistributeTupleChannelWithExpr<REMOTE_DISTRIBUTE>);
} else {
m_channelCalFun = ((list_length(m_consumerNodes->nodeList) == 1) ?
&StreamProducer::redistributeTupleChannel<len, REMOTE_DIRECT_DISTRIBUTE> :
&StreamProducer::redistributeTupleChannel<len, REMOTE_DISTRIBUTE>);
}
m_channelCalVecFun = (list_length(m_consumerNodes->nodeList) == 1) ?
&StreamProducer::redistributeBatchChannel<len, REMOTE_DIRECT_DISTRIBUTE> :
&StreamProducer::redistributeBatchChannel<len, REMOTE_DISTRIBUTE>;
break;
case REMOTE_SPLIT_DISTRIBUTE:
if (m_hasExprKey) {
m_channelCalFun = &StreamProducer::redistributeTupleChannelWithExpr<REMOTE_SPLIT_DISTRIBUTE>;
} else {
m_channelCalFun = &StreamProducer::redistributeTupleChannel<len, REMOTE_SPLIT_DISTRIBUTE>;
}
m_channelCalVecFun = &StreamProducer::redistributeBatchChannel<len, REMOTE_SPLIT_DISTRIBUTE>;
break;
#endif
case LOCAL_DISTRIBUTE:
if (m_hasExprKey) {
m_channelCalFun = &StreamProducer::redistributeTupleChannelWithExpr<LOCAL_DISTRIBUTE>;
} else {
m_channelCalFun = &StreamProducer::redistributeTupleChannel<len, LOCAL_DISTRIBUTE>;
}
m_channelCalVecFun = &StreamProducer::redistributeBatchChannel<len, LOCAL_DISTRIBUTE>;
break;
default: