From fb4dc850bc0a368b3045ecf605da9920d8ef7479 Mon Sep 17 00:00:00 2001 From: Mijamind Date: Sat, 21 Oct 2023 14:45:33 +0800 Subject: [PATCH] =?UTF-8?q?1.=E5=A4=9A=E6=9C=BA=E5=B9=B6=E8=A1=8C=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=8D=8F=E8=B0=83=E7=BA=BF=E7=A8=8B=E8=87=AA=E9=80=82?= =?UTF-8?q?=E5=BA=94=E6=89=AB=E6=8F=8F=202.libcomm=E5=88=9D=E5=A7=8B?= =?UTF-8?q?=E5=8C=96=E4=BB=85=E5=9C=A8spqplugin=E5=8A=A0=E8=BD=BD=E4=B8=94?= =?UTF-8?q?=E6=98=AF=E8=B5=84=E6=BA=90=E6=B1=A0=E5=8C=96=E5=9C=BA=E6=99=AF?= =?UTF-8?q?=E6=97=B6=E6=89=93=E5=BC=80=203.string=5Fagg=20bugfix=204.?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=AF=B9=E5=BA=94spqscan=E6=98=AF=E5=90=A6?= =?UTF-8?q?=E5=A4=9Adop=E7=9A=84=E8=AE=A1=E7=AE=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/common/backend/nodes/copyfuncs.cpp | 1 + src/common/backend/nodes/outfuncs.cpp | 1 + src/common/backend/nodes/readfuncs.cpp | 1 + .../backend/pgxc_single/pool/execRemote.cpp | 469 +++++++++++++++++- src/gausskernel/cbb/communication/libcomm.cpp | 12 +- .../libcomm_utils/libcomm_adapter.cpp | 62 ++- .../libcomm_utils/libcomm_interface.cpp | 44 +- .../libcomm_utils/libcomm_message.h | 4 + .../libcomm_utils/libcomm_shakehands.cpp | 111 +++++ .../libcomm_utils/libcomm_thread.cpp | 11 +- src/gausskernel/optimizer/plan/planner.cpp | 14 +- .../optimizer/plan/streamplan_single.cpp | 5 +- .../process/postmaster/postmaster.cpp | 22 +- src/gausskernel/process/tcop/postgres.cpp | 3 + .../process/threadpool/knl_instance.cpp | 23 + .../process/threadpool/knl_thread.cpp | 5 + src/gausskernel/runtime/executor/nodeAgg.cpp | 2 +- src/include/gs_thread.h | 3 + .../knl/knl_guc/knl_session_attr_spq.h | 44 +- src/include/knl/knl_instance.h | 10 + src/include/libcomm/libcomm.h | 27 + src/include/nodes/plannodes.h | 1 + src/include/pgxc/execRemote.h | 17 + 23 files changed, 862 insertions(+), 30 deletions(-) diff --git a/src/common/backend/nodes/copyfuncs.cpp b/src/common/backend/nodes/copyfuncs.cpp index 2717b8532..2188b9dfc 100644 --- a/src/common/backend/nodes/copyfuncs.cpp +++ b/src/common/backend/nodes/copyfuncs.cpp @@ -186,6 +186,7 @@ static PlannedStmt* _copyPlannedStmt(const PlannedStmt* from) #ifdef USE_SPQ COPY_SCALAR_FIELD(spq_session_id); COPY_SCALAR_FIELD(current_id); + COPY_SCALAR_FIELD(enable_adaptive_scan); COPY_SCALAR_FIELD(is_spq_optmized); #endif /* diff --git a/src/common/backend/nodes/outfuncs.cpp b/src/common/backend/nodes/outfuncs.cpp index 568599c6a..2082ef61c 100755 --- a/src/common/backend/nodes/outfuncs.cpp +++ b/src/common/backend/nodes/outfuncs.cpp @@ -644,6 +644,7 @@ static void _outPlannedStmt(StringInfo str, PlannedStmt* node) #ifdef USE_SPQ WRITE_UINT64_FIELD(spq_session_id); WRITE_INT_FIELD(current_id); + WRITE_BOOL_FIELD(enable_adaptive_scan); WRITE_BOOL_FIELD(is_spq_optmized); #endif } diff --git a/src/common/backend/nodes/readfuncs.cpp b/src/common/backend/nodes/readfuncs.cpp index f3842a92f..b8eea6647 100755 --- a/src/common/backend/nodes/readfuncs.cpp +++ b/src/common/backend/nodes/readfuncs.cpp @@ -4711,6 +4711,7 @@ static PlannedStmt* _readPlannedStmt(void) #ifdef USE_SPQ READ_UINT64_FIELD(spq_session_id); READ_INT_FIELD(current_id); + READ_BOOL_FIELD(enable_adaptive_scan); READ_BOOL_FIELD(is_spq_optmized); #endif diff --git a/src/common/backend/pgxc_single/pool/execRemote.cpp b/src/common/backend/pgxc_single/pool/execRemote.cpp index 45732b90e..e6bb36df1 100755 --- a/src/common/backend/pgxc_single/pool/execRemote.cpp +++ b/src/common/backend/pgxc_single/pool/execRemote.cpp @@ -102,6 +102,7 @@ #include "executor/node/nodeModifyTable.h" #ifdef USE_SPQ #include "libpq/libpq-int.h" +#include "optimizer/planmem_walker.h" #endif #ifndef MIN @@ -480,7 +481,202 @@ static void HandleMaxCSN(RemoteQueryState* combiner, const char* msg, int msg_le combiner->maxCSN = ntohl64(*((CommitSeqNo *)msg)); combiner->hadrMainStandby = *(bool*)(msg + sizeof(int64)); } - + +static SpqAdpScanReqState *make_adps_state(SpqAdpScanPagesReq *req) +{ + SpqAdpScanReqState *paging_state = (SpqAdpScanReqState *)palloc(sizeof(SpqAdpScanReqState)); + if (!paging_state) + return NULL; + paging_state->plan_node_id = req->plan_node_id; + paging_state->direction = req->direction; + + if (req->direction == ForwardScanDirection) { /* forward */ + paging_state->current_num = 0; + } else { + paging_state->current_num = req->nblocks - 1; + } + paging_state->nblocks = req->nblocks; + + paging_state->cur_scan_iter_no = req->cur_scan_iter_no; + paging_state->node_num = 0; + return paging_state; +} + +static void init_adps_state_per_worker(SpqAdpScanReqState *p_state) +{ + p_state->this_round_finish = false; + p_state->batch_size = 512; + if (p_state->direction == ForwardScanDirection) { + p_state->scan_start = 0; + p_state->cur_page_num = 0; + p_state->scan_end = p_state->nblocks - 1; + if (p_state->scan_start > p_state->scan_end) + p_state->this_round_finish = true; + } else { /* Backward scan blocks */ + p_state->scan_start = p_state->nblocks - 1; + p_state->cur_page_num = p_state->scan_start; + p_state->scan_end = 0; + if (p_state->scan_start < p_state->scan_end) + p_state->this_round_finish = true; + } +} + +static bool adps_get_next_scan_unit(SpqAdpScanReqState *p_state, SpqAdpScanPagesRes *pRes) +{ + int64_t start = -1, end = -1; + if (p_state->this_round_finish) + return false; + /* Forward scan */ + if (p_state->direction == ForwardScanDirection) { + start = p_state->cur_page_num; + /* use small batch_size in sutiable range */ + if (start + p_state->batch_size >= p_state->scan_end) { + int small_batch = p_state->batch_size / 16; + if (small_batch < 1) + small_batch = 1; + end = p_state->cur_page_num + small_batch - 1; + if (end > p_state->scan_end) + end = p_state->scan_end; + p_state->cur_page_num += small_batch; + } else { + end = p_state->cur_page_num + p_state->batch_size - 1; + p_state->cur_page_num += p_state->batch_size; + } + if (p_state->cur_page_num > p_state->scan_end) + p_state->this_round_finish = true; + } else { /* Backward scan */ + start = p_state->cur_page_num; + if (p_state->cur_page_num == p_state->scan_start) { + end = (int64_t)((p_state->scan_start - p_state->scan_end) / p_state->batch_size) * p_state->batch_size; + p_state->cur_page_num = end - 1; + } else { + end = p_state->cur_page_num - p_state->batch_size + 1; + p_state->cur_page_num -= p_state->batch_size; + } + if (p_state->cur_page_num < p_state->scan_start) + p_state->this_round_finish = true; + } + pRes->page_start = start; + pRes->page_end = end; + return true; +} + +static bool check_match_and_update_state(SpqAdpScanReqState *p_state, SpqAdpScanPagesReq *seqReq, bool *has_finished) +{ + if (p_state->plan_node_id != seqReq->plan_node_id) + return false; + /* this round has finished */ + if (p_state->cur_scan_iter_no > seqReq->cur_scan_iter_no) { + *has_finished = true; + return true; + } + *has_finished = false; + /* upgrade to next round */ + if (p_state->cur_scan_iter_no < seqReq->cur_scan_iter_no) { + if (!p_state->this_round_finish) { + /* maybe error occur in paging */ + elog(ERROR, "block_iter: error: round %ld has unfinished page", p_state->cur_scan_iter_no); + } + p_state->cur_scan_iter_no++; + /* must be one round ahead */ + assert(p_state->cur_scan_iter_no == seqReq->cur_scan_iter_no); + /* reinit the paging state */ + init_adps_state_per_worker(p_state); + } + return true; +} + +static void adps_array_append(SpqScanAdpReqs *array, SpqAdpScanReqState *state) +{ + array->size += 1; + if (array->size > array->max) { + SpqAdpScanReqState **temp = array->req_states; + int size = array->max * sizeof(SpqAdpScanReqState *); + array->req_states = (SpqAdpScanReqState **)palloc(size * 2); + errno_t rc = memcpy_s(array->req_states, size * 2, temp, size); + securec_check(rc, "\0", "\0"); + pfree(temp); + } + array->req_states[array->size - 1] = state; +} + +SpqAdpScanPagesRes adps_get_response_block(SpqAdpScanPagesReq* seqReq) +{ + SpqAdpScanPagesRes seqRes; + SpqAdpScanReqState *p_state; + seqRes.page_start = InvalidBlockNumber; + seqRes.page_end = InvalidBlockNumber; + seqRes.success = 0; + if (seqReq->plan_node_id < 0) { + elog(ERROR, "adps_get_response_block: unrecognized node_id"); + return seqRes; + } + bool found = false; + /* + * Init seq_paging_array is 0, so at the beginning of searching, + * it will miss. + */ + for (int i = 0; i < t_thrd.spq_ctx.qc_ctx->seq_paging_array.size; i++) { + bool has_finished = false; + p_state = t_thrd.spq_ctx.qc_ctx->seq_paging_array.req_states[i]; + if (check_match_and_update_state(p_state, seqReq, &has_finished)) { + /* This round has consumed by other workers */ + if (has_finished) + return seqRes; + + found = true; + + /* Search all the nodes to find the next unit(or page) to read */ + if (adps_get_next_scan_unit(p_state, &seqRes)) { + BlockNumber page_count; + seqRes.success = 1; + page_count = Abs((int64_t)seqRes.page_end - (int64_t)seqRes.page_start) + 1; + } + break; + } + } + /* Can not find a task matches the request task, init a new task and record it */ + if (!found) { + p_state = make_adps_state(seqReq); + if (!p_state) { + elog(ERROR, "not enough memory when adps_get_response_block"); + return seqRes; + } + /* init node state */ + adps_array_append(&t_thrd.spq_ctx.qc_ctx->seq_paging_array, p_state); + init_adps_state_per_worker(p_state); + if (adps_get_next_scan_unit(p_state, &seqRes)) { + BlockNumber page_count; + seqRes.success = 1; + page_count = Abs((int64_t)seqRes.page_end - (int64_t)seqRes.page_start) + 1; + } + } + if (p_state == nullptr) { + elog(ERROR, "not enough memory when adps_get_response_block"); + return seqRes; + } + + /* Fix the result, rs_nblocks may be different between workers */ + if (seqRes.success) { + if (ForwardScanDirection == seqReq->direction) { + if (seqRes.page_start >= p_state->nblocks) + seqRes.success = false; + if (seqRes.page_end >= p_state->nblocks) + seqRes.page_end = p_state->scan_end - 1; + } else { + /* Move to the tail page */ + if (seqRes.page_start >= p_state->nblocks) + seqRes.page_start = p_state->scan_end - 1; + + /* Move to the head page */ + if (seqRes.page_end < 0) { + seqRes.page_end = p_state->scan_start; + } + } + } + return seqRes; +} + int spq_handle_response(PGXCNodeHandle* conn, RemoteQueryState* combiner, bool isdummy) { char* msg = NULL; @@ -714,7 +910,265 @@ static void ExecInitPlanState(PlanState* plan_state, EState* estate, RemoteQuery plan_state->ps_vec_TupFromTlist = false; ExecAssignResultTypeFromTL(&remotestate->ss.ps); } - + +bool adp_disconnect_walker(Node* plan, void* cxt) +{ + if (plan == nullptr) return false; + if (!IsA(plan, SpqSeqScan)) { + return plan_tree_walker(plan, (MethodWalker)adp_disconnect_walker, cxt); + } + QCConnKey key = { + .query_id = u_sess->debug_query_id, + .plan_node_id = ((Plan*)plan)->plan_node_id, + .node_id = 0, + .type = SPQ_QC_CONNECTION, + }; + bool found = false; + pthread_rwlock_wrlock(&g_instance.spq_cxt.adp_connects_lock); + hash_search(g_instance.spq_cxt.adp_connects, (void*)&key, HASH_REMOVE, &found); + pthread_rwlock_unlock(&g_instance.spq_cxt.adp_connects_lock); + return false; +} + +void disconnect_qc_conn(void* plan) +{ + if (!plan) return; + MethodPlanWalkerContext cxt; + adp_disconnect_walker((Node*)plan, &cxt); +} + +bool build_connections(Node* plan, void* cxt) +{ + if (plan == nullptr) return false; + if (!IsA(plan, SpqSeqScan)) { + return plan_tree_walker(plan, (MethodWalker)build_connections, cxt); + } + int error; + errno_t rc = EOK; + + RemoteQueryState *node = t_thrd.spq_ctx.qc_ctx->scanState; + PlannedStmt *planstmt = node->ss.ps.state->es_plannedstmt; + int num_nodes = planstmt->num_nodes; + NodeDefinition *nodesDef = planstmt->nodesDefinition; + libcommaddrinfo **addressArray = (libcommaddrinfo **)palloc(sizeof(libcommaddrinfo *) * num_nodes); + + for (int i = 0; i < num_nodes; ++i) { + int nodeNameLen = strlen(nodesDef[i].nodename.data); + int nodehostLen = strlen(nodesDef[i].nodehost.data); + addressArray[i] = (libcomm_addrinfo *)palloc0(sizeof(libcomm_addrinfo)); + addressArray[i]->host = (char *)palloc0(NAMEDATALEN); + addressArray[i]->ctrl_port = nodesDef[i].nodectlport; + addressArray[i]->listen_port = nodesDef[i].nodesctpport; + addressArray[i]->nodeIdx = nodesDef[i].nodeid; + rc = strncpy_s(addressArray[i]->host, NAMEDATALEN, nodesDef[i].nodehost.data, nodehostLen + 1); + securec_check(rc, "\0", "\0"); + rc = strncpy_s(addressArray[i]->nodename, NAMEDATALEN, nodesDef[i].nodename.data, nodeNameLen + 1); + securec_check(rc, "\0", "\0"); + /* set flag for parallel send mode */ + addressArray[i]->parallel_send_mode = false; + + addressArray[i]->streamKey.queryId = node->queryId; + addressArray[i]->streamKey.planNodeId = ((Plan*)plan)->plan_node_id; + addressArray[i]->streamKey.producerSmpId = -1; + addressArray[i]->streamKey.consumerSmpId = -1; + } + + error = gs_connect(addressArray, num_nodes, -1); + + if (error != 0) { + ereport(ERROR, (errmsg("connect failed, code : %d", error))); + } + + for (int i = 0; i < num_nodes; ++i) { + QCConnEntry* entry = (QCConnEntry*)palloc(sizeof(QCConnEntry)); + entry->key = { + .query_id = node->queryId, + .plan_node_id = ((Plan*)plan)->plan_node_id, + .node_id = addressArray[i]->gs_sock.idx, + .type = SPQ_QE_CONNECTION, + }; + entry->scannedPageNum = 0; + entry->forward = addressArray[i]->gs_sock; + entry->backward.idx = 0; + t_thrd.spq_ctx.qc_ctx->connects = lappend(t_thrd.spq_ctx.qc_ctx->connects, (void*)entry); + pfree(addressArray[i]); + } + pfree(addressArray); + return false; +} + +void spq_adps_initconns() +{ + MethodPlanWalkerContext cxt; + RemoteQueryState *node = t_thrd.spq_ctx.qc_ctx->scanState; + plan_tree_walker((Node*)node->ss.ps.plan->lefttree, (MethodWalker)build_connections, (void*)&cxt); +} + +void spq_adps_consumer() +{ + if (t_thrd.spq_ctx.qc_ctx->connects == nullptr) { + return; + } + + SpqAdpScanPagesReq req; + SpqAdpScanPagesRes res; + + ListCell *cell; + foreach(cell, t_thrd.spq_ctx.qc_ctx->connects) { + QCConnEntry* entry = (QCConnEntry*)lfirst(cell); + /* it means backward connection not build yet */ + if (entry->backward.idx == 0) { + bool found = false; + QCConnEntry* entry_in_hash; + pthread_rwlock_rdlock(&g_instance.spq_cxt.adp_connects_lock); + entry_in_hash = (QCConnEntry*)hash_search(g_instance.spq_cxt.adp_connects, (void*)(&(entry->key)), HASH_FIND, &found); + if (found && entry_in_hash->backward.idx != 0) { + entry->backward = entry_in_hash->backward; + } + pthread_rwlock_unlock(&g_instance.spq_cxt.adp_connects_lock); + if (entry->backward.idx != 0) { + pthread_rwlock_wrlock(&g_instance.spq_cxt.adp_connects_lock); + hash_search(g_instance.spq_cxt.adp_connects, (void*)(&(entry->key)), HASH_REMOVE, &found); + pthread_rwlock_unlock(&g_instance.spq_cxt.adp_connects_lock); + } + } else { + int rc = gs_recv(&entry->backward, (char *)&req, sizeof(SpqAdpScanPagesReq)); + + if (rc == 0 || errno == ECOMMTCPNODATA) { + continue; + } + + if (rc < 0) { + ereport(ERROR, (errmsg("spq adps thread recv data failed"))); + return; + } + + res = adps_get_response_block(&req); + + rc = gs_send(&entry->forward, (char *)&res, sizeof(SpqAdpScanPagesRes), -1, true); + if (rc <= 0) { + ereport(ERROR, (errmsg("spq adps thread send data failed"))); + } + if (res.success) { + entry->scannedPageNum += res.page_end - res.page_start + 1; + } + } + } +} + +void spq_adps_coordinator_thread_main() +{ + t_thrd.spq_ctx.spq_role = ROLE_QUERY_COORDINTOR; + ereport(LOG, (errmsg("spq thread started"))); + + while (!t_thrd.spq_ctx.qc_ctx->is_done) { + pthread_mutex_lock(&t_thrd.spq_ctx.qc_ctx->spq_pq_mutex); + if (t_thrd.spq_ctx.qc_ctx->scanState == NULL) { + pthread_cond_wait(&t_thrd.spq_ctx.qc_ctx->pq_wait_cv, &t_thrd.spq_ctx.qc_ctx->spq_pq_mutex); + pthread_mutex_unlock(&t_thrd.spq_ctx.qc_ctx->spq_pq_mutex); + } else { + spq_adps_consumer(); + pthread_mutex_unlock(&t_thrd.spq_ctx.qc_ctx->spq_pq_mutex); + } + } + + ereport(LOG, (errmsg("spq thread destroyed"))); + t_thrd.spq_ctx.qc_ctx->is_exited = true; +} + +void spq_createAdaptiveThread() +{ + if (!u_sess->attr.attr_spq.spq_enable_adaptive_scan) { + return; + } + + t_thrd.spq_ctx.adaptive_scan_setup = true; + t_thrd.spq_ctx.qc_ctx->scanState = NULL; + t_thrd.spq_ctx.qc_ctx->connects = NULL; + t_thrd.spq_ctx.qc_ctx->is_done = false; + t_thrd.spq_ctx.qc_ctx->is_exited = false; + t_thrd.spq_ctx.qc_ctx->pq_wait_cv = PTHREAD_COND_INITIALIZER; + if (pthread_mutex_init(&t_thrd.spq_ctx.qc_ctx->spq_pq_mutex, NULL)) { + elog(ERROR, "spq_pq_mutex init failed"); + } + + ThreadId threadId = initialize_util_thread(SPQ_COORDINATOR, (void *)t_thrd.spq_ctx.qc_ctx); + if (threadId == 0) { + pthread_cond_destroy(&t_thrd.spq_ctx.qc_ctx->pq_wait_cv); + pthread_mutex_destroy(&t_thrd.spq_ctx.qc_ctx->spq_pq_mutex); + ereport(FATAL, (errmsg("Cannot create coordinating thread."))); + } else { + ereport(LOG, (errmsg("Create Adaptive thread successfully, threadId:%lu.", threadId))); + } +} + +void spq_startQcThread(RemoteQueryState *node) +{ + /* If guc flag is closed */ + if (!u_sess->attr.attr_spq.spq_enable_adaptive_scan) { + return; + } + pthread_mutex_lock(&t_thrd.spq_ctx.qc_ctx->spq_pq_mutex); + + t_thrd.spq_ctx.qc_ctx->scanState = node; + t_thrd.spq_ctx.qc_ctx->seq_paging_array.size = 0; + t_thrd.spq_ctx.qc_ctx->seq_paging_array.max = PREALLOC_PAGE_ARRAY_SIZE; + t_thrd.spq_ctx.qc_ctx->seq_paging_array.req_states = + (SpqAdpScanReqState **)palloc(PREALLOC_PAGE_ARRAY_SIZE * sizeof(SpqAdpScanReqState *)); + spq_adps_initconns(); + + pthread_cond_signal(&t_thrd.spq_ctx.qc_ctx->pq_wait_cv); + pthread_mutex_unlock(&t_thrd.spq_ctx.qc_ctx->spq_pq_mutex); + ereport(DEBUG1, (errmsg("starting adaptive scan thread."))); +} + +void spq_finishQcThread(void) +{ + if (!u_sess->attr.attr_spq.spq_enable_adaptive_scan) { + return; + } + pthread_mutex_lock(&t_thrd.spq_ctx.qc_ctx->spq_pq_mutex); + t_thrd.spq_ctx.qc_ctx->scanState = nullptr; + /* free connections */ + if (t_thrd.spq_ctx.qc_ctx->connects != nullptr) { + ListCell *cell; + foreach(cell, t_thrd.spq_ctx.qc_ctx->connects) { + QCConnEntry* entry = (QCConnEntry*)lfirst(cell); + gs_close_gsocket(&entry->forward); + gs_close_gsocket(&entry->backward); + elog(DEBUG1, "adaptive scan end, query_id: %lu, plan_node_id: %u, node_id: %u, scanned page: %d", + entry->key.query_id, entry->key.plan_node_id, entry->key.node_id, entry->scannedPageNum); + } + list_free(t_thrd.spq_ctx.qc_ctx->connects); + t_thrd.spq_ctx.qc_ctx->connects = nullptr; + } + pthread_mutex_unlock(&t_thrd.spq_ctx.qc_ctx->spq_pq_mutex); + elog(DEBUG5, "pq_thread: stopping the background adaptive thread"); +} + +void spq_destroyQcThread(void) +{ + if (!u_sess->attr.attr_spq.spq_enable_adaptive_scan || !t_thrd.spq_ctx.qc_ctx || + (t_thrd.spq_ctx.qc_ctx && t_thrd.spq_ctx.qc_ctx->is_exited)) { + return; + } + + pthread_mutex_lock(&t_thrd.spq_ctx.qc_ctx->spq_pq_mutex); + t_thrd.spq_ctx.qc_ctx->is_done = true; + pthread_cond_signal(&t_thrd.spq_ctx.qc_ctx->pq_wait_cv); + pthread_mutex_unlock(&t_thrd.spq_ctx.qc_ctx->spq_pq_mutex); + + while (!t_thrd.spq_ctx.qc_ctx->is_exited) { + pthread_mutex_lock(&t_thrd.spq_ctx.qc_ctx->spq_pq_mutex); + pthread_cond_signal(&t_thrd.spq_ctx.qc_ctx->pq_wait_cv); + pthread_mutex_unlock(&t_thrd.spq_ctx.qc_ctx->spq_pq_mutex); + pg_usleep(1); + } + pthread_cond_destroy(&t_thrd.spq_ctx.qc_ctx->pq_wait_cv); + pthread_mutex_destroy(&t_thrd.spq_ctx.qc_ctx->spq_pq_mutex); + ereport(DEBUG3, (errmsg("destory adaptive scan thread"))); +} + RemoteQueryState* ExecInitSpqRemoteQuery(RemoteQuery* node, EState* estate, int eflags, bool row_plan) { RemoteQueryState* spqRemoteState = NULL; @@ -722,7 +1176,8 @@ RemoteQueryState* ExecInitSpqRemoteQuery(RemoteQuery* node, EState* estate, int /* RemoteQuery node is the leaf node in the plan tree, just like seqscan */ Assert(innerPlan(node) == NULL); //Assert(node->is_simple == false); - + + spq_createAdaptiveThread(); spqRemoteState = CreateResponseCombiner(0, node->combine_type); spqRemoteState->position = node->position; @@ -855,6 +1310,7 @@ PGXCNodeHandle** spq_get_exec_connections( pfree_ext(connections); connections = NULL; spq_release_conn(planstate); + spq_destroyQcThread(); ereport(ERROR, (errmsg("PQconnectdbParallel error: %s", err_msg))); } return; @@ -927,6 +1383,8 @@ void spq_do_query(RemoteQueryState* node) planstmt->current_id = step->streamID; node->queryId = generate_unique_id64(>_queryId); + spq_startQcThread(node); + connections = spq_get_exec_connections(node, step->exec_nodes, step->exec_type); Assert(node->spq_connections_info != NULL); @@ -1133,7 +1591,10 @@ void ExecEndSpqRemoteQuery(RemoteQueryState* node, bool pre_end) if (pre_end == false) { RowStoreReset(node->row_store); } - + + spq_finishQcThread(); + spq_destroyQcThread(); + /* Pack all un-completed connections together and recorrect node->conn_count */ if (node->conn_count > 0 && remote_query->sort != NULL) { node->conn_count = PackConnections(node); diff --git a/src/gausskernel/cbb/communication/libcomm.cpp b/src/gausskernel/cbb/communication/libcomm.cpp index aa724ac34..9eb402bd4 100755 --- a/src/gausskernel/cbb/communication/libcomm.cpp +++ b/src/gausskernel/cbb/communication/libcomm.cpp @@ -103,7 +103,7 @@ unsigned long LIBCOMM_BUFFER_SIZE = 1024 * 8; gsocket gs_invalid_gsock = {0, 0, 0, 0}; -static void gs_s_build_reply_conntion(libcommaddrinfo* addr_info, int remote_version); +void gs_s_build_reply_conntion(libcommaddrinfo* addr_info, int remote_version); extern GlobalNodeDefinition* global_node_definition; @@ -1630,7 +1630,7 @@ void gs_libcomm_handle_assert(bool condition, int nidx, int sidx, int node_role) * cn need to inital cmailbox to recv msgs from dn * arguments : fcmsgr: provides gs_sock */ -static void gs_s_build_reply_conntion(libcommaddrinfo* addr_info, int remote_version) +void gs_s_build_reply_conntion(libcommaddrinfo* addr_info, int remote_version) { int node_idx = addr_info->gs_sock.idx; int streamid = addr_info->gs_sock.sid; @@ -1738,7 +1738,7 @@ int gs_check_all_mailbox(libcommaddrinfo** libcomm_addrinfo, int addr_num, int r // before continue or goto clean_connection, we must release the sinfo_lock. LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock); - if (IS_PGXC_COORDINATOR) { + if (IS_PGXC_COORDINATOR || IS_SPQ_COORDINATOR) { addr_info->gs_sock = GS_INVALID_GSOCK; continue; } else { @@ -1750,7 +1750,7 @@ int gs_check_all_mailbox(libcommaddrinfo** libcomm_addrinfo, int addr_num, int r } else { pmailbox->semaphore = NULL; // for cn initial cmailbox as well as the connection is duplex - if (IS_PGXC_COORDINATOR) { + if (IS_PGXC_COORDINATOR || IS_SPQ_COORDINATOR) { remote_version = pmailbox->remote_version; build_reply_conn = true; } @@ -1762,12 +1762,14 @@ int gs_check_all_mailbox(libcommaddrinfo** libcomm_addrinfo, int addr_num, int r LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock); +#ifndef USE_SPQ // for cn initial cmailbox as well as the connection is duplex if (build_reply_conn) { // build cmailbox with the same version and remote_verion as pmailbox, // when this connection is duplex. gs_s_build_reply_conntion(addr_info, remote_version); } +#endif } // we wait on the last mailbox that state is not MAIL_READY @@ -1780,7 +1782,7 @@ int gs_check_all_mailbox(libcommaddrinfo** libcomm_addrinfo, int addr_num, int r re = gs_poll(timeout); if (re == ETIMEDOUT) { - if (IS_PGXC_COORDINATOR) { + if (IS_PGXC_COORDINATOR || IS_SPQ_COORDINATOR) { /* close all timeout connections */ gs_close_timeout_connections(libcomm_addrinfo, addr_num, node_idx, streamid); } else { diff --git a/src/gausskernel/cbb/communication/libcomm_utils/libcomm_adapter.cpp b/src/gausskernel/cbb/communication/libcomm_utils/libcomm_adapter.cpp index 034326d6c..79c28797f 100644 --- a/src/gausskernel/cbb/communication/libcomm_utils/libcomm_adapter.cpp +++ b/src/gausskernel/cbb/communication/libcomm_utils/libcomm_adapter.cpp @@ -78,6 +78,9 @@ static int libcomm_delay_no = 0; LibcommAdaptLayer g_libcomm_adapt; extern HTAB* g_htab_fd_id_node_idx; extern pthread_mutex_t g_htab_fd_id_node_idx_lock; +#ifdef USE_SPQ +extern void gs_s_build_reply_conntion(libcommaddrinfo* addr_info, int remote_version); +#endif static int gs_tcp_write_noblock(int node_idx, int sock, const char* msg, int msg_len, int *send_count); static int libcomm_build_tcp_connection(libcommaddrinfo* libcomm_addrinfo, int node_idx); @@ -680,7 +683,11 @@ void gs_accept_ctrl_conntion(struct sock_id* t_fd_id, struct FCMSG_T* fcmsgr) g_instance.comm_cxt.g_r_node_sock[idx].unlock(); /* send response to remote, thus ready control msg arrived after connection has established */ - if (IS_PGXC_COORDINATOR) { + if (IS_PGXC_COORDINATOR +#ifdef USE_SPQ + || fcmsgr->type == CTRL_QE_BACKWARD +#endif + ) { ack = 'o'; rc = mc_tcp_write_block(t_fd_id->fd, &ack, sizeof(ack)); // if tcp send failed, close tcp connction @@ -690,6 +697,51 @@ void gs_accept_ctrl_conntion(struct sock_id* t_fd_id, struct FCMSG_T* fcmsgr) return; } +#ifdef USE_SPQ + } else if (fcmsgr->type == CTRL_BACKWARD_REGIST) { + QCConnKey key = { + .query_id = fcmsgr->stream_key.queryId, + .plan_node_id = fcmsgr->stream_key.planNodeId, + .node_id = idx, + .type = SPQ_QE_CONNECTION, + }; + bool found = false; + QCConnEntry* entry; + pthread_rwlock_wrlock(&g_instance.spq_cxt.adp_connects_lock); + entry = (QCConnEntry*)hash_search(g_instance.spq_cxt.adp_connects, (void*)&key, HASH_FIND, &found); + if (!found) { + ack = 'r'; + } else { + entry->backward = { + .idx = idx, + .sid = fcmsgr->streamid, + .ver = fcmsgr->version, + .type = GSOCK_CONSUMER, + }; + libcommaddrinfo addrinfo; + addrinfo.gs_sock = entry->backward; + addrinfo.gs_sock.ver = entry->forward.ver; + addrinfo.streamKey = { + .queryId = entry->key.query_id, + .planNodeId = entry->key.plan_node_id, + .producerSmpId = 0, + .consumerSmpId = 0, + }; + gs_s_build_reply_conntion(&addrinfo, entry->backward.ver); + ack = 'o'; + } + pthread_rwlock_unlock(&g_instance.spq_cxt.adp_connects_lock); + + rc = gs_send(&(entry->forward), &ack, sizeof(ack), -1, TRUE); + // if tcp send failed, close logic connction + if (rc <= 0) { + LIBCOMM_ELOG(WARNING, "(r|flow ctrl)\tFailed to send dual channel back ack, error:%s.", mc_strerror(errno)); + gs_close_gsocket(&entry->forward); + gs_close_gsocket(&entry->backward); + return; + } + return; +#endif } else if (fcmsgr->type == CTRL_CONN_REGIST_CN) { if (g_instance.comm_cxt.g_ha_shm_data) { current_mode = g_instance.comm_cxt.g_ha_shm_data->current_mode; @@ -1161,8 +1213,12 @@ int gs_s_build_tcp_ctrl_connection(libcommaddrinfo* libcomm_addrinfo, int node_i #endif // wait ack from remote node, reject when the state of remote node is incorrect, such as standby mode; struct FCMSG_T fcmsgs = {0x0}; - if (IS_PGXC_COORDINATOR) { + if (IS_PGXC_COORDINATOR || IS_SPQ_COORDINATOR) { fcmsgs.type = CTRL_CONN_REGIST_CN; +#ifdef USE_SPQ + } else if (is_reply) { + fcmsgs.type = CTRL_QE_BACKWARD; +#endif } else { fcmsgs.type = CTRL_CONN_REGIST; } @@ -1199,7 +1255,7 @@ int gs_s_build_tcp_ctrl_connection(libcommaddrinfo* libcomm_addrinfo, int node_i // cn need to ask the remote datanode status when make connection // 'r' is received when remote is standby or pending mode // for conn between dns, skip this step, ip is given by executor - if (IS_PGXC_COORDINATOR) { + if (IS_PGXC_COORDINATOR || IS_SPQ_COORDINATOR) { error = mc_tcp_read_block(tcp_sock, &ack, sizeof(char), 0); if (error < 0 || ack != 'o') { #ifdef USE_SSL diff --git a/src/gausskernel/cbb/communication/libcomm_utils/libcomm_interface.cpp b/src/gausskernel/cbb/communication/libcomm_utils/libcomm_interface.cpp index dc6873664..c7fea5124 100644 --- a/src/gausskernel/cbb/communication/libcomm_utils/libcomm_interface.cpp +++ b/src/gausskernel/cbb/communication/libcomm_utils/libcomm_interface.cpp @@ -737,7 +737,7 @@ static int gs_internal_connect(libcommaddrinfo* libcomm_addrinfo) struct FCMSG_T fcmsgs = {0x0}; // for connect between cn and dn, channel is duplex - fcmsgs.type = (IS_PGXC_COORDINATOR) ? CTRL_CONN_DUAL : CTRL_CONN_REQUEST; + fcmsgs.type = (IS_PGXC_COORDINATOR || IS_SPQ_COORDINATOR) ? CTRL_CONN_DUAL : CTRL_CONN_REQUEST; fcmsgs.extra_info = (IS_PGXC_COORDINATOR && (u_sess != NULL)) ? (unsigned long)(unsigned)u_sess->pgxc_cxt.NumDataNodes : 0; @@ -762,6 +762,30 @@ static int gs_internal_connect(libcommaddrinfo* libcomm_addrinfo) fcmsgs.streamcap = ((unsigned long)(unsigned)g_instance.attr.attr_network.comm_control_port << 32) + (long)g_instance.attr.attr_network.comm_sctp_port; +#ifdef USE_SPQ + if (fcmsgs.type == CTRL_CONN_DUAL) { + bool found = false; + QCConnKey key = { + .query_id = libcomm_addrinfo->streamKey.queryId, + .plan_node_id = libcomm_addrinfo->streamKey.planNodeId, + .node_id = node_idx, + .type = SPQ_QE_CONNECTION, + }; + pthread_rwlock_wrlock(&g_instance.spq_cxt.adp_connects_lock); + QCConnEntry* entry = (QCConnEntry*)hash_search(g_instance.spq_cxt.adp_connects, (void*)&key, HASH_ENTER, &found); + if (!found) { + entry->forward = { + .idx = node_idx, + .sid = streamid, + .ver = version, + .type = GSOCK_PRODUCER, + }; + entry->backward.idx = 0; + } + pthread_rwlock_unlock(&g_instance.spq_cxt.adp_connects_lock); + } +#endif + LIBCOMM_PTHREAD_MUTEX_UNLOCK(&(pmailbox->sinfo_lock)); rc = gs_send_ctrl_msg(&g_instance.comm_cxt.g_s_node_sock[node_idx], &fcmsgs, ROLE_PRODUCER); if (rc <= 0) { @@ -772,6 +796,20 @@ static int gs_internal_connect(libcommaddrinfo* libcomm_addrinfo) mc_strerror(errno)); errno = ECOMMTCPTCPDISCONNECT; +#ifdef USE_SPQ + if (fcmsgs.type == CTRL_CONN_DUAL) { + QCConnKey key = { + .query_id = libcomm_addrinfo->streamKey.queryId, + .plan_node_id = libcomm_addrinfo->streamKey.planNodeId, + .node_id = node_idx, + .type = SPQ_QE_CONNECTION, + }; + bool found = false; + pthread_rwlock_wrlock(&g_instance.spq_cxt.adp_connects_lock); + hash_search(g_instance.spq_cxt.adp_connects, (void*)&key, HASH_REMOVE, &found); + pthread_rwlock_unlock(&g_instance.spq_cxt.adp_connects_lock); + } +#endif return -1; } @@ -781,7 +819,7 @@ static int gs_internal_connect(libcommaddrinfo* libcomm_addrinfo) libcomm_addrinfo->gs_sock.idx = node_idx; libcomm_addrinfo->gs_sock.sid = streamid; libcomm_addrinfo->gs_sock.ver = version; - libcomm_addrinfo->gs_sock.type = IS_PGXC_COORDINATOR ? GSOCK_DAUL_CHANNEL : GSOCK_PRODUCER; + libcomm_addrinfo->gs_sock.type = (IS_PGXC_COORDINATOR || IS_SPQ_COORDINATOR) ? GSOCK_DAUL_CHANNEL : GSOCK_PRODUCER; COMM_DEBUG_LOG("(s|send connect)\tConnect finish for node[%d]:%s.", node_idx, @@ -1031,7 +1069,7 @@ int gs_connect(libcommaddrinfo** libcomm_addrinfo, int addr_num, int timeout) re = gs_internal_connect(addr_info); // errno set in gs_connect if (re < 0) { - if (IS_PGXC_COORDINATOR) { + if (IS_PGXC_COORDINATOR || IS_SPQ_COORDINATOR) { continue; } else { error_index = i; diff --git a/src/gausskernel/cbb/communication/libcomm_utils/libcomm_message.h b/src/gausskernel/cbb/communication/libcomm_utils/libcomm_message.h index 96b70803f..680284b01 100644 --- a/src/gausskernel/cbb/communication/libcomm_utils/libcomm_message.h +++ b/src/gausskernel/cbb/communication/libcomm_utils/libcomm_message.h @@ -42,6 +42,10 @@ typedef enum { CTRL_ASSERT_FAIL, CTRL_PEER_CHANGED, CTRL_STOP_QUERY, +#ifdef USE_SPQ + CTRL_QE_BACKWARD, + CTRL_BACKWARD_REGIST, +#endif CTRL_MAX_TYPE } CtrlMsgType; diff --git a/src/gausskernel/cbb/communication/libcomm_utils/libcomm_shakehands.cpp b/src/gausskernel/cbb/communication/libcomm_utils/libcomm_shakehands.cpp index 64c53c4b8..9cdc5c6b2 100644 --- a/src/gausskernel/cbb/communication/libcomm_utils/libcomm_shakehands.cpp +++ b/src/gausskernel/cbb/communication/libcomm_utils/libcomm_shakehands.cpp @@ -65,6 +65,9 @@ #define static #endif +#ifdef USE_SPQ +extern void gs_s_build_reply_conntion(libcommaddrinfo* addr_info, int remote_version); +#endif /* * function name : gs_r_build_reply_connection @@ -78,13 +81,22 @@ * return value : -1: error * : 0:Succeed */ +#ifdef USE_SPQ +int gs_r_build_reply_connection(BackConnInfo* fcmsgr, int local_version, uint16 *sid) +#else static int gs_r_build_reply_connection(FCMSG_T* fcmsgr, int local_version) +#endif { errno_t ss_rc; uint32 cpylen; int node_idx = fcmsgr->node_idx; +#ifdef USE_SPQ + int streamid = gs_get_stream_id(node_idx); + *sid = streamid; +#else int streamid = fcmsgr->streamid; +#endif int remote_version = fcmsgr->version; // get remote nodename and host from global variable @@ -124,6 +136,12 @@ static int gs_r_build_reply_connection(FCMSG_T* fcmsgr, int local_version) ss_rc = strncpy_s(libcomm_addrinfo.nodename, NAMEDATALEN, remote_nodename, cpylen + 1); securec_check(ss_rc, "\0", "\0"); libcomm_addrinfo.nodename[cpylen] = '\0'; +#ifdef USE_SPQ + libcomm_addrinfo.gs_sock.idx = fcmsgr->node_idx; + libcomm_addrinfo.gs_sock.sid = streamid; + libcomm_addrinfo.gs_sock.ver = local_version; + libcomm_addrinfo.streamKey = fcmsgr->stream_key; +#endif /* USE_SPQ */ COMM_DEBUG_LOG("(r|build reply conn)\tBuild TCP connect for node[%d]:%s.", node_idx, @@ -163,6 +181,76 @@ static int gs_r_build_reply_connection(FCMSG_T* fcmsgr, int local_version) node_idx, REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx)); +#ifdef USE_SPQ + char* node_name = g_instance.comm_cxt.g_s_node_sock[node_idx].remote_nodename; + if (0 == strcmp(g_instance.comm_cxt.localinfo_cxt.g_self_nodename, node_name)) { + QCConnKey key = { + .query_id = fcmsgr->stream_key.queryId, + .plan_node_id = fcmsgr->stream_key.planNodeId, + .node_id = node_idx, + .type = SPQ_QE_CONNECTION, + }; + bool found = false; + QCConnEntry* entry; + /* notice: here dont use lock, so outer function call should guaranteed adp_connects has been locked */ + entry = (QCConnEntry*)hash_search(g_instance.spq_cxt.adp_connects, (void*)&key, HASH_FIND, &found); + if (found) { + entry->backward = { + .idx = node_idx, + .sid = streamid, + .ver = remote_version, + .type = GSOCK_DAUL_CHANNEL, + }; + gs_s_build_reply_conntion(&libcomm_addrinfo, local_version); + } else { + LIBCOMM_ELOG(WARNING, "(s|connect)\tFailed to build local connection to node[%d]:%s, detail:%s.", node_idx, + REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx), mc_strerror(errno)); + errno = ECOMMTCPTCPDISCONNECT; + gs_close_gsocket(fcmsgr->backward); + return -1; + } + } else { + struct FCMSG_T fcmsgs = {0x0}; + + // for connect between cn and dn, channel is duplex + fcmsgs.type = CTRL_BACKWARD_REGIST; + fcmsgs.extra_info = 0; + + fcmsgs.node_idx = node_idx; + fcmsgs.streamid = streamid; + // send pmailbox version to cmailbox, + // and cmailbox will save as cmailbox->remote_version. + fcmsgs.version = remote_version; + fcmsgs.stream_key = libcomm_addrinfo.streamKey; + fcmsgs.query_id = fcmsgr->query_id; + cpylen = comm_get_cpylen(g_instance.comm_cxt.localinfo_cxt.g_self_nodename, NAMEDATALEN); + ss_rc = memset_s(fcmsgs.nodename, NAMEDATALEN, 0x0, NAMEDATALEN); + securec_check(ss_rc, "\0", "\0"); + ss_rc = strncpy_s(fcmsgs.nodename, NAMEDATALEN, g_instance.comm_cxt.localinfo_cxt.g_self_nodename, cpylen + 1); + securec_check(ss_rc, "\0", "\0"); + fcmsgs.nodename[cpylen] = '\0'; + fcmsgs.streamcap = fcmsgr->streamcap; + + int rc = gs_send_ctrl_msg(&g_instance.comm_cxt.g_s_node_sock[node_idx], &fcmsgs, ROLE_PRODUCER); + if (rc <= 0) { + LIBCOMM_ELOG(WARNING, "(s|connect)\tFailed to send ready msg to node[%d]:%s, detail:%s.", node_idx, + REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx), mc_strerror(errno)); + + errno = ECOMMTCPTCPDISCONNECT; + gs_close_gsocket(fcmsgr->backward); + return -1; + } + char ack; + do { + rc = gs_recv(fcmsgr->backward, &ack, sizeof(char)); + } while (rc == 0 || errno == ECOMMTCPNODATA); + if (rc < 0 || ack != 'o') { + gs_close_gsocket(fcmsgr->backward); + return -1; + } + } +#endif + struct p_mailbox* pmailbox = &P_MAILBOX(node_idx, streamid); LIBCOMM_PTHREAD_MUTEX_LOCK(&pmailbox->sinfo_lock); @@ -362,6 +450,7 @@ void gs_receivers_flow_handle_ready_request(FCMSG_T* fcmsgr) deal_time, u_sess->debug_query_id); } +#ifndef USE_SPQ // for dual connection, we need to build an inverse logic conn with same gs_sock if (fcmsgr->type == CTRL_CONN_DUAL) { u_sess->pgxc_cxt.NumDataNodes = (int)(fcmsgr->extra_info); @@ -374,12 +463,33 @@ void gs_receivers_flow_handle_ready_request(FCMSG_T* fcmsgr) goto accept_failed; } } +#endif gs_sock.idx = node_idx; gs_sock.sid = streamid; gs_sock.ver = local_version; if (fcmsgr->type == CTRL_CONN_DUAL) { +#ifdef USE_SPQ + bool found = false; + QCConnKey key = { + .query_id = fcmsgr->stream_key.queryId, + .plan_node_id = fcmsgr->stream_key.planNodeId, + .node_id = 0, + .type = SPQ_QC_CONNECTION, + }; + gs_sock.type = GSOCK_CONSUMER; + pthread_rwlock_wrlock(&g_instance.spq_cxt.adp_connects_lock); + QCConnEntry* entry = (QCConnEntry*)hash_search(g_instance.spq_cxt.adp_connects, (void*)&key, HASH_ENTER, &found); + if (!found) { + entry->backward = gs_sock; + entry->forward.idx = 0; + entry->streamcap = fcmsgr->streamcap; + } + pthread_rwlock_unlock(&g_instance.spq_cxt.adp_connects_lock); + ereport(LOG, (errmsg("Receive dual connect request from qc, queryid: %lu, nodeid: %u", + fcmsgr->stream_key.queryId, fcmsgr->stream_key.planNodeId))); +#else // CN request this logic connection, is a dual channel gs_sock.type = GSOCK_DAUL_CHANNEL; @@ -393,6 +503,7 @@ void gs_receivers_flow_handle_ready_request(FCMSG_T* fcmsgr) gs_close_gsocket(&gs_sock); goto accept_failed; } +#endif } // reply MAIL_READY message to producer diff --git a/src/gausskernel/cbb/communication/libcomm_utils/libcomm_thread.cpp b/src/gausskernel/cbb/communication/libcomm_utils/libcomm_thread.cpp index 0d37ff4dd..ca98da3f5 100644 --- a/src/gausskernel/cbb/communication/libcomm_utils/libcomm_thread.cpp +++ b/src/gausskernel/cbb/communication/libcomm_utils/libcomm_thread.cpp @@ -1437,6 +1437,10 @@ static void CommReceiverFlowerProcessMsg(struct sock_id* tFdId, struct FCMSG_T* break; case CTRL_CONN_REGIST: +#ifdef USE_SPQ + case CTRL_QE_BACKWARD: + case CTRL_BACKWARD_REGIST: +#endif case CTRL_CONN_REGIST_CN: gs_accept_ctrl_conntion(tFdId, fcmsgr); break; @@ -1482,8 +1486,11 @@ static void CommReceiverFlowerReceiveData(struct sock_id* tFdId) } LIBCOMM_PTHREAD_MUTEX_UNLOCK(&g_htab_fd_id_node_idx_lock); - if ((idx < 0) && !((fcmsgr.type == CTRL_CONN_REGIST || fcmsgr.type == CTRL_CONN_REGIST_CN) && - fcmsgr.extra_info == 0xEA)) { + if ((idx < 0) && !((fcmsgr.type == CTRL_CONN_REGIST || fcmsgr.type == CTRL_CONN_REGIST_CN +#ifdef USE_SPQ + || fcmsgr.type == CTRL_QE_BACKWARD +#endif + ) && fcmsgr.extra_info == 0xEA)) { fcmsgr.nodename[NAMEDATALEN - 1] = '\0'; LIBCOMM_ELOG(WARNING, "(r|flow ctrl)\tReveive fault message with socket[%d] for[%s], type[%d].", diff --git a/src/gausskernel/optimizer/plan/planner.cpp b/src/gausskernel/optimizer/plan/planner.cpp index 654ea5e04..09e188a86 100755 --- a/src/gausskernel/optimizer/plan/planner.cpp +++ b/src/gausskernel/optimizer/plan/planner.cpp @@ -3534,14 +3534,12 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction) wflists, &needSecondLevelAgg, collectiveGroupExpr); -#if defined(ENABLE_MULTIPLE_NODES) || defined(USE_SPQ) - if (IS_SPQ_RUNNING) { - /* - * grouping_tlist was modified by build_groupingsets_plan, - * we have to change tlist at the same time. - */ - tlist = grouping_tlist; - } +#ifdef ENABLE_MULTIPLE_NODES + /* + * grouping_tlist was modified by build_groupingsets_plan, + * we have to change tlist at the same time. + */ + tlist = grouping_tlist; #endif /* Delete eq class expr after grouping */ delete_eq_member(root, tlist, collectiveGroupExpr); diff --git a/src/gausskernel/optimizer/plan/streamplan_single.cpp b/src/gausskernel/optimizer/plan/streamplan_single.cpp index 2abfd1e75..0f6334e85 100644 --- a/src/gausskernel/optimizer/plan/streamplan_single.cpp +++ b/src/gausskernel/optimizer/plan/streamplan_single.cpp @@ -535,7 +535,10 @@ void SpqSerializePlan(Plan* node, PlannedStmt* planned_stmt, StringInfoData* str /* not ship planB */ ShipPlannedStmt->ng_num = planned_stmt->ng_num; ShipPlannedStmt->ng_queryMem = planned_stmt->ng_queryMem; - + +#ifdef USE_SPQ + ShipPlannedStmt->enable_adaptive_scan = planned_stmt->enable_adaptive_scan; +#endif appendStringInfoString(str, nodeToString(ShipPlannedStmt)); } #endif diff --git a/src/gausskernel/process/postmaster/postmaster.cpp b/src/gausskernel/process/postmaster/postmaster.cpp index 15550e97c..2982ca8b3 100644 --- a/src/gausskernel/process/postmaster/postmaster.cpp +++ b/src/gausskernel/process/postmaster/postmaster.cpp @@ -2954,7 +2954,7 @@ int PostmasterMain(int argc, char* argv[]) } #ifdef USE_SPQ - if (ENABLE_DSS) { + if (ENABLE_DSS && strstr(g_instance.attr.attr_common.shared_preload_libraries_string, "spqplugin")) { #else if ((!IS_SINGLE_NODE) && ((IS_PGXC_DATANODE && !dummyStandbyMode && !isRestoreMode) || @@ -13418,6 +13418,12 @@ void SetExtraThreadInfo(knl_thread_arg* arg) CompactionWorkerProcess::SetConsumerThreadLocal(arg); break; } +#endif +#ifdef USE_SPQ + case SPQ_COORDINATOR: { + t_thrd.spq_ctx.qc_ctx = (spq_qc_ctx *)arg->payload; + break; + } #endif case THREADPOOL_LISTENER: { t_thrd.threadpool_cxt.listener = (ThreadPoolListener*)arg->payload; @@ -13694,6 +13700,10 @@ int GaussDbAuxiliaryThreadMain(knl_thread_arg* arg) return 0; } +#ifdef USE_SPQ +void spq_adps_coordinator_thread_main(); +#endif + static void is_memory_backend_reserved(const knl_thread_arg* arg) { if (arg->role == WORKER) { @@ -13948,7 +13958,12 @@ int GaussDbThreadMain(knl_thread_arg* arg) GaussDbAuxiliaryThreadMain(arg); proc_exit(0); } break; - +#ifdef USE_SPQ + case SPQ_COORDINATOR: { + spq_adps_coordinator_thread_main(); + proc_exit(0); + } break; +#endif case LOGICAL_READ_RECORD: { t_thrd.proc_cxt.MyPMChildSlot = AssignPostmasterChildSlot(); if (t_thrd.proc_cxt.MyPMChildSlot == -1) { @@ -14480,6 +14495,9 @@ static ThreadMetaData GaussdbThreadGate[] = { { GaussDbThreadMain, APPLY_LAUNCHER, "applylauncher", "apply launcher" }, { GaussDbThreadMain, APPLY_WORKER, "applyworker", "apply worker" }, { GaussDbThreadMain, STACK_PERF_WORKER, "stack_perf", "stack perf worker" }, +#ifdef USE_SPQ + { GaussDbThreadMain, SPQ_COORDINATOR, "spqcoordinator", "QC node coordinating thread" }, +#endif { GaussDbThreadMain, DMS_AUXILIARY_THREAD, "dms_auxiliary", "maintenance xmin in dms" }, { GaussDbThreadMain, EXRTO_RECYCLER, "exrtorecycler", "exrto recycler" }, diff --git a/src/gausskernel/process/tcop/postgres.cpp b/src/gausskernel/process/tcop/postgres.cpp index 4ed813d8d..c1248ab88 100755 --- a/src/gausskernel/process/tcop/postgres.cpp +++ b/src/gausskernel/process/tcop/postgres.cpp @@ -8970,6 +8970,9 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam statement_init_metric_context(); exec_simple_plan(planstmt); + if (planstmt->enable_adaptive_scan) { + disconnect_qc_conn((void*)planstmt->planTree); + } MemoryContextSwitchTo(old_cxt); // After query done, producer container is not usable anymore. diff --git a/src/gausskernel/process/threadpool/knl_instance.cpp b/src/gausskernel/process/threadpool/knl_instance.cpp index c543d44d9..a087a2ba1 100755 --- a/src/gausskernel/process/threadpool/knl_instance.cpp +++ b/src/gausskernel/process/threadpool/knl_instance.cpp @@ -934,6 +934,25 @@ void knl_plugin_vec_func_init(knl_g_plugin_vec_func_context* func_cxt) { } } +#ifdef USE_SPQ +void knl_g_spq_context_init(knl_g_spq_context* spq_context) +{ + HASHCTL hash_ctl; + errno_t rc = 0; + + spq_context->adp_connects_lock = PTHREAD_RWLOCK_INITIALIZER; + + rc = memset_s(&hash_ctl, sizeof(hash_ctl), 0, sizeof(hash_ctl)); + securec_check(rc, "\0", "\0"); + + hash_ctl.keysize = sizeof(QCConnKey); + hash_ctl.entrysize = sizeof(QCConnEntry); + hash_ctl.hash = tag_hash; + hash_ctl.hcxt = INSTANCE_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_EXECUTOR); + spq_context->adp_connects = hash_create("QC_CONNECTS", 4096, &hash_ctl, HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); +} +#endif + void knl_instance_init() { g_instance.binaryupgrade = false; @@ -1040,6 +1059,10 @@ void knl_instance_init() knl_g_datadir_init(&g_instance.datadir_cxt); knl_g_listen_sock_init(&g_instance.listen_cxt); + +#ifdef USE_SPQ + knl_g_spq_context_init(&g_instance.spq_cxt); +#endif } void add_numa_alloc_info(void* numaAddr, size_t length) diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index 7b5f3355e..95091eba5 100755 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -1801,6 +1801,11 @@ void KnlLscContextInit(knl_t_lsc_context *lsc_cxt) static void knlTSPQCxtInit(knl_t_spq_context *spqCxt) { spqCxt->spq_role = ROLE_UTILITY; + spqCxt->qc_ctx = (spq_qc_ctx*)palloc(sizeof(spq_qc_ctx)); + spqCxt->qc_ctx->is_done = false; + spqCxt->qc_ctx->is_exited = false; + spqCxt->qc_ctx->query_id = 0; + spqCxt->qc_ctx->scanState = NULL; } #endif diff --git a/src/gausskernel/runtime/executor/nodeAgg.cpp b/src/gausskernel/runtime/executor/nodeAgg.cpp index 7d421bfb5..6c379be5d 100644 --- a/src/gausskernel/runtime/executor/nodeAgg.cpp +++ b/src/gausskernel/runtime/executor/nodeAgg.cpp @@ -3893,7 +3893,7 @@ static void exec_lookups_agg(AggState *aggstate, Agg *node, EState *estate) #ifdef USE_SPQ /* Final function only required if we're finalizing the aggregates */ if (t_thrd.spq_ctx.spq_role != ROLE_UTILITY) { - if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplittype)) + if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplittype) && peraggstate->aggref->aggfnoid != STRINGAGGFUNCOID) peraggstate->finalfn_oid = finalfn_oid = InvalidOid; else peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; diff --git a/src/include/gs_thread.h b/src/include/gs_thread.h index 2a901a5e0..c64f0ce5b 100755 --- a/src/include/gs_thread.h +++ b/src/include/gs_thread.h @@ -123,6 +123,9 @@ typedef enum knl_thread_role { APPLY_LAUNCHER, APPLY_WORKER, STACK_PERF_WORKER, +#ifdef USE_SPQ + SPQ_COORDINATOR, +#endif DMS_AUXILIARY_THREAD, EXRTO_RECYCLER, BARRIER_PREPARSE, diff --git a/src/include/knl/knl_guc/knl_session_attr_spq.h b/src/include/knl/knl_guc/knl_session_attr_spq.h index 8d80ed5d9..697caac29 100644 --- a/src/include/knl/knl_guc/knl_session_attr_spq.h +++ b/src/include/knl/knl_guc/knl_session_attr_spq.h @@ -40,6 +40,8 @@ #include "knl/knl_guc/knl_guc_common.h" +#define PREALLOC_PAGE_ARRAY_SIZE 16 + struct NodeDefinition; typedef struct knl_session_attr_spq { @@ -117,6 +119,7 @@ typedef struct knl_session_attr_spq { bool spq_optimizer_enable_redistribute_nestloop_loj_inner_child; bool spq_optimizer_force_comprehensive_join_implementation; bool spq_optimizer_enable_replicated_table; + bool spq_optimizer_calc_multiple_dop; /* Optimizer plan enumeration related GUCs */ bool spq_optimizer_enumerate_plans; @@ -194,9 +197,44 @@ typedef struct knl_session_attr_spq { int spq_batch_size; int spq_mem_size; int spq_queue_size; + + bool spq_enable_adaptive_scan; } knl_session_attr_spq; + +struct RemoteQueryState; + +typedef struct SpqAdpScanReqState { + bool this_round_finish; + int plan_node_id; + int direction; + int node_num; + int64_t nblocks; + int64_t cur_scan_iter_no; + int64_t scan_start; + int64_t cur_page_num; + int64_t scan_end; + int64_t batch_size; + int64_t current_num; +} SpqAdpScanReqState; + +/* struct for paging state container */ +typedef struct SpqScanAdpReqs { + int size = 0; + int max; + SpqAdpScanReqState **req_states; +} SpqScanAdpReqs; + +typedef struct spq_qc_ctx { + pthread_mutex_t spq_pq_mutex; + pthread_cond_t pq_wait_cv; + bool is_done; + bool is_exited; + uint64 query_id; + RemoteQueryState* scanState; + List* connects; + SpqScanAdpReqs seq_paging_array; +} spq_qc_ctx; -/* TODO SPQ Thread Role*/ typedef struct knl_t_spq_context { SpqRole spq_role; uint64 spq_session_id; @@ -204,5 +242,9 @@ typedef struct knl_t_spq_context { bool skip_direct_distribute_result; int num_nodes; NodeDefinition* nodesDefinition; + + /* Spq coordinator thread */ + bool adaptive_scan_setup; + spq_qc_ctx* qc_ctx; } knl_t_spq_context; #endif /* SRC_INCLUDE_KNL_KNL_SESSION_ATTR_MPP_H_ */ diff --git a/src/include/knl/knl_instance.h b/src/include/knl/knl_instance.h index 2aaa2f461..62313672e 100755 --- a/src/include/knl/knl_instance.h +++ b/src/include/knl/knl_instance.h @@ -1246,6 +1246,13 @@ typedef struct knl_g_dms_context { ss_xmin_info_t SSXminInfo; } knl_g_dms_context; +#ifdef USE_SPQ +typedef struct knl_g_spq_context { + pthread_rwlock_t adp_connects_lock; + struct HTAB* adp_connects; +} knl_g_spq_context; +#endif + typedef struct knl_instance_context { knl_virtual_role role; volatile int status; @@ -1385,6 +1392,9 @@ typedef struct knl_instance_context { knl_g_listen_context listen_cxt; knl_g_datadir_context datadir_cxt; knl_g_dms_context dms_cxt; +#ifdef USE_SPQ + knl_g_spq_context spq_cxt; +#endif } knl_instance_context; extern long random(); diff --git a/src/include/libcomm/libcomm.h b/src/include/libcomm/libcomm.h index 15e082551..26e1941bb 100644 --- a/src/include/libcomm/libcomm.h +++ b/src/include/libcomm/libcomm.h @@ -720,4 +720,31 @@ extern void set_comm_fault_injection(int type); extern bool is_comm_fault_injection(LibcommFaultInjection type); #endif +#ifdef USE_SPQ +constexpr uint16 SPQ_QE_CONNECTION = 0; +constexpr uint16 SPQ_QC_CONNECTION = 1; +struct QCConnKey { + uint64 query_id; + uint32 plan_node_id; + uint16 node_id; + uint16 type; +}; +struct QCConnEntry { + QCConnKey key; + uint64 streamcap; + gsocket forward; + gsocket backward; + int scannedPageNum; +}; +struct BackConnInfo { + uint16 node_idx; + uint16 version; + uint64 streamcap; + uint64 query_id; + CommStreamKey stream_key; + gsocket *backward; +}; +extern int gs_r_build_reply_connection(BackConnInfo* fcmsgr, int local_version, uint16 *sid); +#endif + #endif //_GS_LIBCOMM_H_ diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 1fc9d4c3c..730e9f0b8 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -197,6 +197,7 @@ typedef struct PlannedStmt { #ifdef USE_SPQ uint64 spq_session_id; int current_id; + bool enable_adaptive_scan; bool is_spq_optmized; #endif } PlannedStmt; diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h index 8e1818db9..40465ebe8 100755 --- a/src/include/pgxc/execRemote.h +++ b/src/include/pgxc/execRemote.h @@ -133,6 +133,23 @@ typedef struct abort_callback_type { } abort_callback_type; #endif +#ifdef USE_SPQ +typedef struct SpqAdpScanPagesReq { + int plan_node_id; + int direction; + uint32 nblocks; + int64_t cur_scan_iter_no; +} SpqAdpScanPagesReq; + +typedef struct SpqAdpScanPagesRes { + int32 success; /* 1: success, 0: failed */ + BlockNumber page_start; /* where to start */ + BlockNumber page_end; /* where to end */ +} SpqAdpScanPagesRes; + +void disconnect_qc_conn(void* plan); +#endif + static inline char* GetIndexNameForStat(Oid indid, char* relname) { char* indname = get_rel_name(indid);