#include #include #include #include #include #include "access/dfs/dfs_common.h" #include "access/dfs/dfs_query.h" #ifdef ENABLE_MULTIPLE_NODES #include "access/dfs/carbondata_index_reader.h" #endif #include "access/dfs/dfs_stream.h" #include "access/dfs/dfs_stream_factory.h" #include "hdfs_fdw.h" #include "scheduler.h" #include "access/hash.h" #include "access/relscan.h" #include "catalog/pgxc_node.h" #include "catalog/pg_partition_fn.h" #include "commands/defrem.h" #include "foreign/foreign.h" #include "nodes/nodes.h" #include "optimizer/cost.h" #include "optimizer/predtest.h" #include "pgxc/pgxcnode.h" #include "pgxc/pgxc.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "access/heapam.h" #include "utils/syscache.h" #include "dfs_adaptor.h" typedef struct PartitionInfoCacheKey { Oid relOid; uint32 scanClauseHashValue; } PartitionInfoCacheKey; typedef struct PartitionInfoCacheEntry { PartitionInfoCacheKey key; int64 lastModifyTime; List* splitList; List* prunningResult; } PartitionInfoCacheEntry; typedef struct partition_context { /*It is used to store the restirction on partition column. */ List* partClauseList; /*It is used to store varattno list of partition columns. */ List* partColList; } partition_context; typedef struct partition_string_context { /* It is used to store built partition path for partition restriction. */ List* partColStrList; /* * It is used to store varattno of partition columns, each listcell of * partColStrList and partColNoList is one by one correspondence. */ List* partColNoList; Oid foreignTblOid; } partition_string_context; extern char* tcp_link_addr; static void AssignReplicaNode(HTAB* htab, const Oid* dnOids, const uint32_t nodeNum, const List* fileList); static bool AssignRemoteNode(HTAB* htab, int nodeNum, Oid* dnOids, SplitInfo* currentFile, bool isAnalyze); static dnWork* AssignRequestFilesToDn(HTAB* htab, List* fileList, int filesNum, dfs::DFSConnector* conn); static char* parseMultiFileNames(char** fileNames, bool checkRootDir, char delimiter); static int getAnalyzeFilesNum(int dataNodeNum, int totalFilesNum); static bool isNodeLocalToFile(Form_pgxc_node nodeForm, const char* blLocation); static List* GetAllFiles(dfs::DFSConnector* conn, Oid foreignTableId, ServerTypeOption srvType, List* columnList = NIL, List* scanClauseList = NIL); static List* GetObsAllFiles(dfs::DFSConnector* conn, Oid foreignTableId, List* columnList, List*& prunningResult, List*& partList, List* scanClauses); static List* GetHdfsAllFiles(dfs::DFSConnector* conn, Oid foreignTableId, List* columnList, List*& prunningResult, List*& partList, List* scanClauses); static List* GetSubFiles(dfs::DFSConnector* conn, SplitInfo* split, int colNo); static List* DigFiles(dfs::DFSConnector* conn, SplitInfo* split); static List* PartitionPruneProcess(dfs::DFSConnector* conn, List* partitionRelatedList, List* scanClauses, Oid foreignTableId, List*& prunningResult, List*& partList, ServerTypeOption srvType); static void CheckPartitionColNumber( dfs::DFSConnector* conn, List* partList, List* fileList, Oid foreignTableId, ServerTypeOption srvType); static SplitInfo* CheckOneSubSplit(dfs::DFSConnector* conn, SplitInfo* split, bool isLastPartition, Oid foreignTableId); static bool PartitionFilterClause(SplitInfo* split, List* scanClauses, Var* value, Expr* equalExpr); static void CollectPartPruneInfo(List*& prunningResult, int sum, int notprunning, int colno, Oid relOid); static List* DrillDown(dfs::DFSConnector* conn, List* fileList); static bool AssignLocalNode( HTAB* htab, uint64* Locations, const char* pChached, uint32 LocationSize, SplitInfo* currentFile, bool isAnalyze); static int GetDnIpAddrByOid(Oid* DnOid, uint32 OidSize, uint64* OidIp, uint32 OidIpSize); static int StatDn(uint64* dnInfo, uint32 dnCnt, dnInfoStat* statDnInfo, uint32 statDnCnt); static int CompareByLowerInt32(const void* Elem1, const void* Elem2); static int CompareByIp(const void* Elem1, const void* Elem2); static Value* getPartitionValue(dfs::DFSConnector* conn, char* partitionStr, char* ObjectStr); static void obsFileScheduling(HTAB* htab, List* FileList, Oid* dnOids, int numOfNodes, char locatorType); static void hdfsFileScheduling( dfs::DFSConnector* conn, HTAB* htab, List* FileList, Oid* dnOids, int numOfNodes, char locatorType, bool isAnalyze); static void SpillToDisk(Index relId, List* allTask, dfs::DFSConnector* conn); static char* getPrefixPath(dfs::DFSConnector* conn); static void flushToRemote(SplitMap* dnTask, const char* buffer, dfs::DFSConnector* conn); static void loadDiskSplits(SplitMap* dnTask, dfs::DFSConnector* conn); void scan_expression_tree_walker(Node* node, bool (*walker)(), void* context); void getPartitionClause(Node* node, partition_context* context); void getPartitionString(Node* node, partition_string_context* context); bool isEquivalentExpression(Oid opno); extern List* CNSchedulingForDistOBSFt(Oid foreignTableId); #ifdef ENABLE_MULTIPLE_NODES static List* ExtractNonParamRestriction(List* opExpressionList); List* CarbonDataFile(dfs::DFSConnector* conn, List* fileList, List* allColumnList, List* restrictColumnList, List* scanClauses, int16 attrNum); #endif uint32 best_effort_use_cahce = 0; // not use List* CNScheduling(Oid foreignTableId, Index relId, List* columnList, List* scanClauses, List*& prunningResult, List*& partList, char locatorType, bool isAnalyze, List* allColumnList, int16 attrNum, int64* fileNum) { int numOfNodes = 0; Oid* dnOids = NULL; HTAB* HTab = NULL; HASHCTL HashCtl; List* FileList = NIL; List* PartitionRelatedList = NIL; errno_t rc = EOK; ServerTypeOption srvType = T_INVALID; if (IS_PGXC_DATANODE) { QUERY_NOT_SUPPORT(foreignTableId, "Query on datanode is not " "supported currently for the foreign table:%s."); } numOfNodes = get_pgxc_classnodes(foreignTableId, &dnOids); Assert(NULL != dnOids && numOfNodes > 0); srvType = getServerType(foreignTableId); /* initialize the hash table which is used for storing dn's assigned files */ rc = memset_s(&HashCtl, sizeof(HashCtl), 0, sizeof(HashCtl)); securec_check(rc, "\0", "\0"); HashCtl.keysize = sizeof(Oid); HashCtl.entrysize = sizeof(dnWork); HashCtl.hash = oid_hash; HashCtl.hcxt = CurrentMemoryContext; HTab = hash_create("SchedulerHashTable", 128, &HashCtl, HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); dfs::DFSConnector* conn = dfs::createConnector(CurrentMemoryContext, foreignTableId); /* get file list */ switch (srvType) { case T_OBS_SERVER: { /* get all obs files need to be schedule */ FileList = GetObsAllFiles(conn, foreignTableId, columnList, prunningResult, partList, scanClauses); break; } case T_HDFS_SERVER: { /* get all hdfs files need to be schedule */ FileList = GetHdfsAllFiles(conn, foreignTableId, columnList, prunningResult, partList, scanClauses); break; } default: { Assert(0); break; } } if (0 == list_length(FileList)) { delete (conn); conn = NULL; return NIL; } /* Start to process partition info */ PartitionRelatedList = list_make2(FileList, columnList); /* Process the diretories of each layer of partition in order. */ FileList = PartitionPruneProcess( conn, PartitionRelatedList, scanClauses, foreignTableId, prunningResult, partList, srvType); #ifdef ENABLE_MULTIPLE_NODES char* format = HdfsGetOptionValue(foreignTableId, OPTION_NAME_FORMAT); /* check data format is carbondata, if is carbondata, analysis and filter */ if (0 == pg_strcasecmp(format, DFS_FORMAT_CARBONDATA)) { FileList = CarbonDataFile(conn, FileList, allColumnList, columnList, scanClauses, attrNum); } #endif if (0 == list_length(FileList)) { delete (conn); conn = NULL; return NIL; } if (NULL != fileNum) { *fileNum = list_length(FileList); } /* file schedule */ switch (srvType) { case T_OBS_SERVER: { /* Check if the file list is empty again after the partition prunning. */ if (0 == list_length(FileList)) { delete (conn); conn = NULL; return NIL; } obsFileScheduling(HTab, FileList, dnOids, numOfNodes, locatorType); break; } case T_HDFS_SERVER: { /* * Sometimes the hive partition layers is more than ours partition defination, so here we need to dig * down to find all the files. */ FileList = DrillDown(conn, FileList); /* Check if the file list is empty again after the partition prunning and drilling. */ if (0 == list_length(FileList)) { delete (conn); conn = NULL; return NIL; } hdfsFileScheduling(conn, HTab, FileList, dnOids, numOfNodes, locatorType, isAnalyze); break; } default: { Assert(0); break; } } List* allTask = NIL; dnWork* Item = NULL; HASH_SEQ_STATUS ScanStatus; hash_seq_init(&ScanStatus, HTab); ereport(LOG, (errmodule(MOD_HDFS), errmsg("Total %d files, %d datanodes", list_length(FileList), numOfNodes))); while ((Item = (dnWork*)hash_seq_search(&ScanStatus)) != NULL) { ListCell* lc = NULL; SplitMap* dnTask = makeNode(SplitMap); dnTask->nodeId = PGXCNodeGetNodeId(Item->nodeOid, PGXC_NODE_DATANODE); dnTask->locatorType = locatorType; dnTask->splits = Item->toDoList; dnTask->fileNums = list_length(dnTask->splits); foreach (lc, dnTask->splits) { SplitInfo* split = (SplitInfo*)lfirst(lc); dnTask->totalSize += split->ObjectSize; } allTask = lappend(allTask, dnTask); ereport(DEBUG1, (errmodule(MOD_HDFS), errmsg( "Datanode %s, assigned %d files", get_pgxc_nodename(Item->nodeOid), list_length(Item->toDoList)))); } hash_destroy(HTab); HTab = NULL; Assert(allTask != NIL); /* check the allTask, If the size of splits */ if ((!t_thrd.postgres_cxt.mark_explain_only && !isAnalyze) && list_length(FileList) >= u_sess->attr.attr_sql.schedule_splits_threshold) { SpillToDisk(relId, allTask, conn); } delete (conn); conn = NULL; /* free early will have problem */ list_free(FileList); FileList = NIL; return allTask; } /** * @Description: scheduler OBS objects for datanodes. * @in htab, the hash table. * @in FileList, the objects to be scheduled. * @in dnOids, the datanode arrary. * @in numOfNodes, the datanode number. # @in locatorType, distribute type of the given table . * @return node. */ void obsFileScheduling(HTAB* htab, List* FileList, Oid* dnOids, int numOfNodes, char locatorType) { if (LOCATOR_TYPE_REPLICATED == locatorType) { AssignReplicaNode(htab, dnOids, numOfNodes, FileList); } else { ListCell* FileCell = NULL; int num_processed = 0; int taskCnt = 0; int fileCnt = list_length(FileList); taskCnt = MIN(fileCnt, numOfNodes); if (taskCnt == 0) taskCnt = 1; foreach (FileCell, FileList) { // filter object size is zero SplitInfo* splitinfo = (SplitInfo*)lfirst(FileCell); bool found = false; CHECK_FOR_INTERRUPTS(); Oid nodeOid = dnOids[num_processed % taskCnt]; dnWork* item = (dnWork*)hash_search(htab, &nodeOid, HASH_ENTER, &found); if (!found) { item->toDoList = NIL; } item->toDoList = lappend(item->toDoList, splitinfo); num_processed++; ereport(DEBUG1, (errmodule(MOD_OBS), errmsg("Assign object %s to datanode:%s", ((SplitInfo*)lfirst(FileCell))->filePath, get_pgxc_nodename(nodeOid)))); } } } /** * @Description: scheduler hdfs files for datanodes. * @in htab, the hash table. * @in FileList, the objects to be scheduled. * @in dnOids, the datanode arrary. * @in numOfNodes, the datanode number. # @in locatorType, distribute type of the given table . * @in isAnalyze, if the isAnalyze is true, we are executing an analyze command. * @return node. */ static void hdfsFileScheduling( dfs::DFSConnector* conn, HTAB* htab, List* FileList, Oid* dnOids, int numOfNodes, char locatorType, bool isAnalyze) { ListCell* FileCell = NULL; SplitInfo* Split = NULL; char szIp[32] = {0}; uint64* dnInfo = NULL; dnInfoStat* statDnInfo = NULL; /*get dn nodes ip and combo with oid*/ dnInfo = (uint64*)palloc0(numOfNodes * sizeof(uint64)); statDnInfo = (dnInfoStat*)palloc0(numOfNodes * sizeof(dnInfoStat)); int dnCnt = GetDnIpAddrByOid(dnOids, numOfNodes, dnInfo, numOfNodes); /*sorted by lower int32 for using bsearch*/ ::qsort(dnInfo, dnCnt, sizeof(uint64), CompareByLowerInt32); /*stat by ip*/ int statDnCnt = StatDn(dnInfo, dnCnt, statDnInfo, numOfNodes); /* used for generate a random start position in AssignLocalNode */ ::srand((unsigned)time(NULL)); if (LOCATOR_TYPE_REPLICATED == locatorType) { AssignReplicaNode(htab, dnOids, numOfNodes, FileList); } else { uint64* pLocal = (uint64*)palloc0(sizeof(uint64) * MAX_ROUNDROBIN_AVAILABLE_DN_NUM); char* pCached = (char*)palloc0(sizeof(char) * MAX_ROUNDROBIN_AVAILABLE_DN_NUM); bool needPredicate = true; int fileCount = 0; int localFileCount = 0; errno_t Ret = EOK; Ret = memset_s(pCached, MAX_ROUNDROBIN_AVAILABLE_DN_NUM, 0, MAX_ROUNDROBIN_AVAILABLE_DN_NUM); securec_check(Ret, "", ""); foreach (FileCell, FileList) { CHECK_FOR_INTERRUPTS(); Split = (SplitInfo*)lfirst(FileCell); char* CurrentFileName = Split->filePath; uint32 LocalCnt = 0; if (needPredicate) { dfs::DFSBlockInfo* BlInf = conn->getBlockLocations(CurrentFileName); Assert(BlInf != NULL); int ReplNum = BlInf->getNumOfReplica(); const char* pName = NULL; for (int Loop = 0; Loop < ReplNum; Loop++) { pName = BlInf->getNames(0, Loop); Assert(pName != NULL); Ret = strcpy_s(szIp, (sizeof(szIp) - 1), pName); securec_check(Ret, "", ""); /*remove port info*/ char* pStr = ::strrchr(szIp, ':'); if (pStr != NULL) { *pStr = '\0'; } uint64 TmpVal = (uint64)inet_addr(szIp); dnInfoStat* pdnInfoStat = (dnInfoStat*)::bsearch(&TmpVal, statDnInfo, statDnCnt, sizeof(dnInfoStat), CompareByIp); if (pdnInfoStat != NULL) /*is in dn*/ { /* save all local */ Ret = memcpy_s(&pLocal[LocalCnt], ((MAX_ROUNDROBIN_AVAILABLE_DN_NUM - LocalCnt) * sizeof(uint64)), &dnInfo[pdnInfoStat->Start], pdnInfoStat->Cnt * sizeof(uint64)); securec_check(Ret, "", ""); LocalCnt += pdnInfoStat->Cnt; if (BlInf->isCached(0, Loop)) { Ret = memset_s(&pCached[LocalCnt], pdnInfoStat->Cnt, 1, pdnInfoStat->Cnt); securec_check(Ret, "", ""); } } } delete (BlInf); BlInf = NULL; } /*check whether all locations are remote*/ if (LocalCnt != 0) { (void)AssignLocalNode(htab, pLocal, pCached, LocalCnt, Split, isAnalyze); localFileCount++; } else { /* choose one mpp dn to handle the split */ if (!AssignRemoteNode(htab, numOfNodes, dnOids, Split, isAnalyze)) { delete (conn); conn = NULL; ereport(ERROR, (errcode(ERRCODE_NO_DATA_FOUND), errmodule(MOD_HDFS), errmsg("No datanode is assigned for this split: %s", CurrentFileName))); } } fileCount++; /* If the local ratio is less than 1/10 in the first 256 files, we take all the others as remote scan. */ if (fileCount == (int)MAX_ROUNDROBIN_AVAILABLE_DN_NUM && localFileCount < (int)(MAX_ROUNDROBIN_AVAILABLE_DN_NUM / 10)) { needPredicate = false; } } pfree_ext(pLocal); pfree_ext(pCached); } pfree_ext(dnInfo); pfree_ext(statDnInfo); } static void SpillToDisk(Index relId, List* allTask, dfs::DFSConnector* conn) { ListCell* lc = NULL; foreach (lc, allTask) { SplitMap* dnTask = (SplitMap*)lfirst(lc); /* when the number of files is less than u_sess->attr.attr_sql.schedule_splits_threshold / 1000, don't spill to * disk. */ if (list_length(dnTask->splits) <= (int)(u_sess->attr.attr_sql.schedule_splits_threshold / 1000)) continue; /* serialize the splits */ char* tmpBuffer = nodeToString(dnTask->splits); dnTask->lengths = lappend_int(dnTask->lengths, strlen(tmpBuffer)); /* construct the tmp file name */ StringInfoData tmpFileName; initStringInfo(&tmpFileName); char* prefix = getPrefixPath(conn); uint32 queryHashID = ((t_thrd.postgres_cxt.debug_query_string == NULL) ? 0 : hash_any((unsigned char*)t_thrd.postgres_cxt.debug_query_string, strlen(t_thrd.postgres_cxt.debug_query_string))); appendStringInfo(&tmpFileName, "%s/.%lu_%u_%u_%ld_%d_%ld", prefix, gs_thread_self(), queryHashID, relId, (int64)allTask, dnTask->nodeId, gs_random()); dnTask->downDiskFilePath = tmpFileName.data; /* flush the serial buffer to share file system. */ flushToRemote(dnTask, tmpBuffer, conn); ereport(DEBUG1, (errmodule(MOD_HDFS), errmsg("Coordinate %s, spill %d splits to dfs %s.", get_pgxc_nodename(dnTask->nodeId), list_length(dnTask->splits), dnTask->downDiskFilePath))); /* clean tmp memory */ pfree_ext(tmpBuffer); if (prefix != NULL) pfree_ext(prefix); list_free_deep(dnTask->splits); dnTask->splits = NIL; } } static char* getPrefixPath(dfs::DFSConnector* conn) { if (conn->getType() == HDFS_CONNECTOR) { /* hdfs */ StringInfoData prefix; initStringInfo(&prefix); appendStringInfo(&prefix, "/tmp"); return prefix.data; } else { /* obs */ const char* bucket = conn->getValue("bucket", NULL); StringInfoData prefix; initStringInfo(&prefix); appendStringInfo(&prefix, "%s", bucket); return prefix.data; } } static void flushToRemote(SplitMap* dnTask, const char* buffer, dfs::DFSConnector* conn) { if (conn->openFile(dnTask->downDiskFilePath, O_WRONLY) == -1 || conn->writeCurrentFile(buffer, linitial_int(dnTask->lengths)) == -1 || conn->flushCurrentFile() == -1) { delete (conn); conn = NULL; ereport(ERROR, (errcode(ERRCODE_FDW_ERROR), errmodule(MOD_HDFS), errmsg("Failed to flush the splits into disk when the count is" "too much, detail can be found in log of %s", g_instance.attr.attr_common.PGXCNodeName))); } } static void loadDiskSplits(SplitMap* dnTask, dfs::DFSConnector* conn) { MemoryContext oldCtx = MemoryContextSwitchTo(u_sess->cache_mem_cxt); int length = linitial_int(dnTask->lengths); char* buffer = (char*)palloc0(length + 1); if (conn->openFile(dnTask->downDiskFilePath, O_RDONLY) == -1 || conn->readCurrentFileFully(buffer, length, 0) == -1) { delete (conn); conn = NULL; (void)MemoryContextSwitchTo(oldCtx); ereport(ERROR, (errcode(ERRCODE_FDW_ERROR), errmodule(MOD_HDFS), errmsg("Failed to load the splits from disk when the count is" "too much, detail can be found in log of %s", g_instance.attr.attr_common.PGXCNodeName))); } ereport(DEBUG1, (errmodule(MOD_HDFS), errmsg("Datanode %s, load %d splits from dfs %s.", get_pgxc_nodename(dnTask->nodeId), list_length(dnTask->splits), dnTask->downDiskFilePath))); /* deserialize the splits */ dnTask->splits = (List*)stringToNode(buffer); pfree_ext(buffer); (void)MemoryContextSwitchTo(oldCtx); /* clear the down disk objects */ conn->closeCurrentFile(); (void)conn->deleteFile(dnTask->downDiskFilePath, 0); pfree(dnTask->downDiskFilePath); dnTask->downDiskFilePath = NULL; list_free(dnTask->lengths); dnTask->lengths = NIL; } static int GetDnIpAddrByOid(Oid* DnOid, uint32 OidSize, uint64* OidIp, uint32 OidIpSize) { uint32 Loop = 0; uint32 OidIpIdx = 0; HeapTuple tuple = NULL; Assert(OidIpSize >= OidSize); for (Loop = 0; (Loop < OidSize) && (OidIpIdx < OidIpSize); Loop++) { tuple = SearchSysCache1(PGXCNODEOID, ObjectIdGetDatum(DnOid[Loop])); if (!HeapTupleIsValid(tuple)) ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmodule(MOD_HDFS), errmsg("cache lookup failed for node %u", DnOid[Loop]))); Form_pgxc_node NodeForm = (Form_pgxc_node)GETSTRUCT(tuple); in_addr_t TmpVal = 0; /*handle local host and local ip*/ if ((strncmp(NodeForm->node_host.data, LOCAL_IP, ::strlen(LOCAL_IP)) == 0) || (strncmp(NodeForm->node_host.data, LOCAL_HOST, ::strlen(LOCAL_HOST)) == 0)) { TmpVal = inet_addr(tcp_link_addr); } else { TmpVal = inet_addr(NodeForm->node_host.data); } OidIp[OidIpIdx] = (((uint64)DnOid[Loop]) << 32) + ((uint64)TmpVal); OidIpIdx += 1; ReleaseSysCache(tuple); } return OidIpIdx; } static int StatDn(uint64* dnInfo, uint32 dnCnt, dnInfoStat* statDnInfo, uint32 statDnCnt) { uint64 Tmp = (dnCnt > 0) ? dnInfo[0] : 0; uint32 Cnt = (dnCnt > 0) ? 1 : 0; uint32 Start = 0; uint32 Loop = (dnCnt > 0) ? 1 : 0; uint32 dnInfoStatIdx = 0; Assert((dnCnt <= statDnCnt) && (dnInfo != NULL) && (statDnInfo != NULL)); for (; (Loop < dnCnt) && (dnInfoStatIdx < statDnCnt); Loop++) { if (GETIP(Tmp) == GETIP(dnInfo[Loop])) { Cnt += 1; } else { statDnInfo[dnInfoStatIdx].ipAddr = GETIP(Tmp); statDnInfo[dnInfoStatIdx].Start = Start; statDnInfo[dnInfoStatIdx].Cnt = Cnt; dnInfoStatIdx += 1; Tmp = dnInfo[Loop]; Start = Loop; Cnt = 1; } } if (Cnt > 0) { statDnInfo[dnInfoStatIdx].ipAddr = GETIP(Tmp); statDnInfo[dnInfoStatIdx].Start = Start; statDnInfo[dnInfoStatIdx].Cnt = Cnt; } return int(dnInfoStatIdx + 1); } /*quick sort & bsearch*/ static int CompareByLowerInt32(const void* Elem1, const void* Elem2) { int Ret = 0; const uint64* P1 = (const uint64*)Elem1; const uint64* P2 = (const uint64*)Elem2; uint32 Ip1 = GETIP(*P1); uint32 Ip2 = GETIP(*P2); Ret = (Ip1 > Ip2) ? 1 : ((Ip1 < Ip2) ? -1 : 0); return Ret; } static int CompareByIp(const void* Elem1, const void* Elem2) { int Ret = 0; const uint64* P1 = (const uint64*)Elem1; uint32 Ip1 = GETIP(*P1); const dnInfoStat* P2 = (dnInfoStat*)Elem2; uint32 Ip2 = P2->ipAddr; Ret = (Ip1 > Ip2) ? 1 : ((Ip1 < Ip2) ? -1 : 0); return Ret; } List* CNSchedulingForAnalyze(unsigned int* totalFilesNum, unsigned int* numOfDns, Oid foreignTableId, bool isglbstats) { errno_t rc = EOK; int filesToRead = 0; HTAB* htab = NULL; HASHCTL hash_ctl; dnWork* item = NULL; List* fileList = NIL; HASH_SEQ_STATUS scan_status; List* partitionRelatedList = NIL; List* columnList = NIL; Relation relation = RelationIdGetRelation(foreignTableId); TupleDesc tupleDescriptor = NULL; List* prunningResult = NIL; List* partList = NIL; RelationLocInfo* rel_loc_info = GetRelationLocInfo(foreignTableId); List* allTask = NIL; ServerTypeOption srvType = T_INVALID; if (!RelationIsValid(relation)) { ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmodule(MOD_HDFS), errmsg("could not open relation with OID %u", foreignTableId))); } srvType = getServerType(foreignTableId); tupleDescriptor = RelationGetDescr(relation); columnList = CreateColList((Form_pg_attribute*)tupleDescriptor->attrs, tupleDescriptor->natts); RelationClose(relation); *numOfDns = get_pgxc_classnodes(foreignTableId, NULL); Assert(*numOfDns > 0); /* we should get all dn task for global stats. */ if (isglbstats) { if (IS_OBS_CSV_TXT_FOREIGN_TABLE(foreignTableId)) { /* for dist obs foreign table.*/ #ifndef ENABLE_LITE_MODE allTask = CNSchedulingForDistOBSFt(foreignTableId); #else FEATURE_ON_LITE_MODE_NOT_SUPPORTED(); #endif } else { if (rel_loc_info == NULL) { ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmodule(MOD_HDFS), errmsg("could not get locator information for relation with OID %u", foreignTableId))); } allTask = CNScheduling(foreignTableId, 0, columnList, NULL, prunningResult, partList, rel_loc_info->locatorType, true, columnList, tupleDescriptor->natts, NULL); } pfree_ext(rel_loc_info); return allTask; } /* used for generate a random start position in AssignLocalNode */ srand((unsigned)time(NULL)); dfs::DFSConnector* conn = dfs::createConnector(CurrentMemoryContext, foreignTableId); rc = memset_s(&hash_ctl, sizeof(hash_ctl), 0, sizeof(hash_ctl)); securec_check(rc, "\0", "\0"); hash_ctl.keysize = sizeof(Oid); hash_ctl.entrysize = sizeof(dnWork); hash_ctl.hash = oid_hash; hash_ctl.hcxt = CurrentMemoryContext; htab = hash_create("SchedulerHashTable", 128, &hash_ctl, HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); /* Get the string of file names into allFiles */ fileList = GetAllFiles(conn, foreignTableId, srvType); if (0 == list_length(fileList)) { delete (conn); conn = NULL; list_free_deep(columnList); return NIL; } /* Start to process partition info */ partitionRelatedList = list_make2(fileList, columnList); /* Process the diretories of each layer of partition in order. */ fileList = PartitionPruneProcess(conn, partitionRelatedList, NIL, foreignTableId, prunningResult, partList, srvType); /* * Sometimes the hive partition layers is more than ours partition defination, so here we need to dig * down to find all the files. */ fileList = DrillDown(conn, fileList); /* Check if the file list is empty again after the partition prunning and drilling. */ if (0 == list_length(fileList)) { delete (conn); conn = NULL; return NIL; } *totalFilesNum = list_length(fileList); /* * acquire the num of files needs to analyze * at least analyze one file */ if (IsLocatorReplicated(GetLocatorType(foreignTableId))) { filesToRead = *totalFilesNum; } else { filesToRead = getAnalyzeFilesNum(*numOfDns, *totalFilesNum); } Assert(filesToRead > 0); /* * find the suitable dn which has the request files number. * we will choose local read files priorityly * if none of the dns has the enough local read files,fill the others with remote read files */ item = AssignRequestFilesToDn(htab, fileList, filesToRead, conn); Assert(NULL != item); if (item == NULL) { ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmodule(MOD_HDFS), errmsg("could not assign request files to dn"))); } delete (conn); conn = NULL; SplitMap* taskMap = makeNode(SplitMap); taskMap->nodeId = PGXCNodeGetNodeId(item->nodeOid, PGXC_NODE_DATANODE); taskMap->locatorType = LOCATOR_TYPE_NONE; taskMap->splits = list_copy(item->toDoList); allTask = lappend(allTask, taskMap); hash_seq_init(&scan_status, htab); while ((item = (dnWork*)hash_seq_search(&scan_status)) != NULL) { list_free(item->toDoList); item->toDoList = NIL; } hash_destroy(htab); list_free(fileList); fileList = NIL; return allTask; } void AssignSplits(List* splitToDnMap, dfs::reader::ReaderState* readerState, dfs::DFSConnector* conn) { ListCell* splitToDnMapCell = NULL; List* fileList = NIL; foreach (splitToDnMapCell, splitToDnMap) { SplitMap* dnTask = (SplitMap*)lfirst(splitToDnMapCell); if (u_sess->pgxc_cxt.PGXCNodeId == dnTask->nodeId || LOCATOR_TYPE_REPLICATED == dnTask->locatorType) { /* If the splits is spilled to disk ,then load it here. */ if ((dnTask->splits == NULL && dnTask->downDiskFilePath != NULL) && conn != NULL) { loadDiskSplits(dnTask, conn); } if (NIL != dnTask->splits) { fileList = (List*)copyObject(dnTask->splits); break; } } } readerState->splitList = fileList; } /* * Find a dn which has the enough local-read files to fulfill the request files number * if there's not enough files, take some remote-read files */ static dnWork* AssignRequestFilesToDn(HTAB* htab, List* fileList, int filesNum, dfs::DFSConnector* conn) { HeapTuple tuple = NULL; int maxFilesAssigned = 0; Form_pgxc_node dataNodeForm; dnWork* recordDn = NULL; bool found = false; ListCell* fileCell = NULL; dnWork* item = NULL; int dnIdx = 0; Oid* dnOids = NULL; int totalDnNum; int numOfDns; int replIdx; int startNode; PgxcNodeGetOids(NULL, &dnOids, NULL, &numOfDns, false); Assert(numOfDns > 0 && NULL != dnOids); totalDnNum = numOfDns; startNode = (int)gs_random() % totalDnNum; dnIdx = startNode; /* traversal all the dn to assign files,and return the first dn that satifies. */ while (numOfDns--) { bool assigned = false; Oid nodeOid = dnOids[dnIdx]; tuple = SearchSysCache1(PGXCNODEOID, ObjectIdGetDatum(nodeOid)); Form_pgxc_node nodeForm = (Form_pgxc_node)GETSTRUCT(tuple); /* initialize the number of files that is assigned to zero */ int filesAssgnied = 0; /* traversal all the files */ foreach (fileCell, fileList) { SplitInfo* split = (SplitInfo*)lfirst(fileCell); char* currentFileName = split->filePath; int numOfReplica; dfs::DFSBlockInfo* bl = conn->getBlockLocations(currentFileName); numOfReplica = bl->getNumOfReplica(); /* traversal the replication ,if any of the replication satify local read to current datanode, add it to * splitmap */ for (replIdx = 0; replIdx < numOfReplica; replIdx++) { if (isNodeLocalToFile(nodeForm, bl->getNames(0, replIdx))) { if (!assigned) { assigned = true; item = (dnWork*)hash_search(htab, &nodeOid, HASH_ENTER, &found); item->toDoList = NIL; Assert(!found); } filesAssgnied++; item->toDoList = lappend(item->toDoList, split); if (filesAssgnied == filesNum) { ReleaseSysCache(tuple); delete (bl); bl = NULL; return item; } break; } } delete (bl); bl = NULL; } /* record the dn which has the most local read files */ if (filesAssgnied > maxFilesAssigned) { maxFilesAssigned = filesAssgnied; recordDn = item; } ReleaseSysCache(tuple); dnIdx++; if (dnIdx == totalDnNum) dnIdx = 0; } /* after traversal all files,we don't get enough local files, * so choose the dn which has the most local files ,and add some remote files to it */ if (NULL == recordDn) { recordDn = (dnWork*)hash_search(htab, &dnOids[startNode], HASH_ENTER, &found); recordDn->toDoList = NIL; } tuple = SearchSysCache1(PGXCNODEOID, ObjectIdGetDatum(recordDn->nodeOid)); dataNodeForm = (Form_pgxc_node)GETSTRUCT(tuple); foreach (fileCell, fileList) { SplitInfo* split = (SplitInfo*)lfirst(fileCell); char* currentFileName = split->filePath; int numOfReplica; dfs::DFSBlockInfo* bl = conn->getBlockLocations(currentFileName); numOfReplica = bl->getNumOfReplica(); for (replIdx = 0; replIdx < numOfReplica; replIdx++) { if (isNodeLocalToFile(dataNodeForm, bl->getNames(0, replIdx))) { /* it has already in the toDo list */ break; } } delete (bl); bl = NULL; if (replIdx >= numOfReplica) { maxFilesAssigned++; recordDn->toDoList = lappend(recordDn->toDoList, split); if (maxFilesAssigned == filesNum) { ReleaseSysCache(tuple); return recordDn; } } } ReleaseSysCache(tuple); if (dnOids != NULL) pfree_ext(dnOids); return NULL; } static int getAnalyzeFilesNum(int dataNodeNum, int totalFilesNum) { double accurateAssignedFiles; double rawAssignedFiles; int filesToRead; if (dataNodeNum == 0) dataNodeNum = 1; accurateAssignedFiles = (double)totalFilesNum / dataNodeNum; rawAssignedFiles = (double)totalFilesNum / dataNodeNum; if (totalFilesNum <= dataNodeNum) filesToRead = 1; else if (accurateAssignedFiles - rawAssignedFiles > 0.5) filesToRead = (int)rawAssignedFiles + 1; else filesToRead = (int)rawAssignedFiles; Assert(filesToRead >= 1 && filesToRead <= totalFilesNum / dataNodeNum + 1 && filesToRead >= totalFilesNum / dataNodeNum); return filesToRead; } static bool isNodeLocalToFile(Form_pgxc_node nodeForm, const char* blLocation) { /* * if node ip is not local address, compare directly */ if (strncmp(nodeForm->node_host.data, blLocation, strlen(nodeForm->node_host.data)) == 0) { return true; } /* * if node ip is local address, compare sctp_link_addr */ if (((strncmp(nodeForm->node_host.data, LOCAL_IP, strlen(LOCAL_IP)) == 0) || (strncmp(nodeForm->node_host.data, LOCAL_HOST, strlen(LOCAL_HOST)) == 0)) && (strncmp(tcp_link_addr, blLocation, strlen(blLocation)) == 0)) { return true; } return false; } static void AssignReplicaNode(HTAB* htab, const Oid* dnOids, const uint32_t nodeNum, const List* fileList) { bool found = false; for (uint32_t dnIdx = 0; dnIdx < nodeNum; dnIdx++) { Oid nodeOid = dnOids[dnIdx]; dnWork* item = (dnWork*)hash_search(htab, &nodeOid, HASH_ENTER, &found); if (0 == dnIdx) { item->toDoList = (List*)copyObject(fileList); } else { item->toDoList = NIL; } } } /* * select a lowest workload dn in (cached&local) or (uncached&local) dns */ static bool AssignLocalNode( HTAB* htab, uint64* Locations, const char* pChached, uint32 LocationSize, SplitInfo* currentFile, bool isAnalyze) { bool bFound = false; Oid NodeOid = 0; uint32 nodeTaskLength = 0; uint64 TempU64 = 0; uint32 MinWl = MAX_UINT32; uint32 NextIdx = 0; uint32 UseCachedFactor = 1; uint32 Sel; Assert((Locations != NULL) && (LocationSize > 0)); dnWork* pMinWorkload[MAX_ROUNDROBIN_AVAILABLE_DN_NUM] = {NULL}; for (uint32 Loop = 0; Loop < LocationSize; Loop++) { TempU64 = Locations[Loop]; UseCachedFactor = ((pChached[Loop] != 0) && (best_effort_use_cahce == 1)) ? 2 : 1; NodeOid = GETOID(TempU64); /* use the oid of each node in the pgxc_node is efficient and is coveient to get information * with oid */ dnWork* Item = (dnWork*)hash_search(htab, &NodeOid, HASH_ENTER, &bFound); if (bFound) { nodeTaskLength = (list_length(Item->toDoList) + (UseCachedFactor - 1)) / UseCachedFactor; } else /* indicate the mpp datanode did not has any work */ { nodeTaskLength = 0; Item->toDoList = NIL; } if (nodeTaskLength < MinWl) { pMinWorkload[0] = Item; MinWl = nodeTaskLength; NextIdx = 1; } else if (nodeTaskLength == MinWl) { pMinWorkload[NextIdx] = Item; NextIdx += 1; } else { /*nothing to do*/ } } Assert((NextIdx > 0) && (NextIdx < MAX_ROUNDROBIN_AVAILABLE_DN_NUM)); if (NextIdx == 0) NextIdx = 1; if (!isAnalyze) { /* select one randomly from all low workload dns */ Sel = gs_random() % NextIdx; } else /* get a determinded dn to get single stats for global stats. */ Sel = (NextIdx - 1) % MAX_ROUNDROBIN_AVAILABLE_DN_NUM; pMinWorkload[Sel]->toDoList = lappend(pMinWorkload[Sel]->toDoList, currentFile); return true; } static bool AssignRemoteNode(HTAB* htab, int nodeNum, Oid* dnOids, SplitInfo* currentFile, bool isAnalyze) { HeapTuple tuple = NULL; int nodeTaskLength = 0; bool found = false; dnWork* recordDn = NULL; Oid nodeOid; int dnIdx; int totalDnNum = nodeNum; if (!isAnalyze) dnIdx = (int)gs_random() % totalDnNum; else /* the dn id often changed for anlyze of global stats, result in the totalRowCnts is error, so we should set the dn id is 0. */ dnIdx = 0; while (nodeNum-- > 0) { nodeOid = dnOids[dnIdx]; tuple = SearchSysCache1(PGXCNODEOID, ObjectIdGetDatum(nodeOid)); if (!HeapTupleIsValid(tuple)) ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmodule(MOD_HDFS), errmsg("cache lookup failed for node %u", nodeOid))); Form_pgxc_node nodeForm = (Form_pgxc_node)GETSTRUCT(tuple); /* Take definition for given node type */ if (nodeForm->node_type != PGXC_NODE_COORDINATOR) { /* * use the oid of each node in the pgxc_node is efficient and is coveient to get information * with oid */ dnWork* item = (dnWork*)hash_search(htab, &nodeOid, HASH_ENTER, &found); // and it's not in the map yet if (!found || 0 == list_length(item->toDoList)) { item->toDoList = NIL; item->toDoList = lappend(item->toDoList, currentFile); ReleaseSysCache(tuple); return true; } else { Assert(list_length(item->toDoList) > 0); if (0 == nodeTaskLength || nodeTaskLength > list_length(item->toDoList)) { nodeTaskLength = list_length(item->toDoList); recordDn = item; } } } ReleaseSysCache(tuple); dnIdx++; if (dnIdx == totalDnNum) dnIdx = 0; } if (0 != nodeTaskLength) { recordDn->toDoList = lappend(recordDn->toDoList, currentFile); return true; } return false; } static char* parseMultiFileNames(char** fileNames, bool checkRootDir, char delimiter) { char* currentFileName = NULL; char* semicolon = strchr(*fileNames, delimiter); if (semicolon == NULL) { /* NOT FOUND */ char* tmp = *fileNames; char* fileNameBegin = NULL; char* fileNameEnd = NULL; /* detele ' ' before path */ while (' ' == *tmp) tmp++; fileNameBegin = tmp; /* detele ' ' after path */ tmp++; while (' ' != *tmp && '\0' != *tmp) tmp++; fileNameEnd = tmp; int indexOfSemicolon = (int)(fileNameEnd - fileNameBegin); currentFileName = (char*)palloc0(indexOfSemicolon + 1); errno_t rc = memcpy_s(currentFileName, (indexOfSemicolon + 1), fileNameBegin, indexOfSemicolon); securec_check(rc, "", ""); currentFileName[indexOfSemicolon] = '\0'; *fileNames = NULL; /* reset to NULL as an end scan indicator */ } else { /* delete ' ' */ char* tmp = *fileNames; char* fileNameBegin = 0; char* fileNameEnd = 0; /* detele ' ' before path */ while (' ' == *tmp) tmp++; fileNameBegin = tmp; /* detele ' ' after path */ tmp++; while (' ' != *tmp && tmp < semicolon) tmp++; fileNameEnd = tmp; int indexOfSemicolon = (int)(fileNameEnd - fileNameBegin); int indexOfFile = (int)(semicolon - *fileNames); currentFileName = (char*)palloc0(indexOfSemicolon + 1); errno_t rc = memcpy_s(currentFileName, (indexOfSemicolon + 1), fileNameBegin, indexOfSemicolon); securec_check(rc, "", ""); currentFileName[indexOfSemicolon] = '\0'; *fileNames += (long)indexOfFile + 1; } if (checkRootDir && currentFileName[0] != '/') { ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_STRING_FORMAT), errmodule(MOD_HDFS), errmsg("file path need to start with root '/', but it is: %s", currentFileName))); } return currentFileName; } /** * @Description: traverse the scan expression tree. It is a base function. * when we search specified expression(for example,var, partColumnRestiction), need use it. * @in node, the given expression. * @in walker, the implementation function. * @out context, the stateless struct, the caller build this structer. * @return none. */ void scan_expression_tree_walker(Node* node, void (*walker)(), void* context) { bool (*p2walker)(void*, void*) = (bool (*)(void*, void*))walker; /* Guard against stack overflow due to overly complex expressions. */ check_stack_depth(); if (NULL == node) { return; } switch (nodeTag(node)) { case T_BoolExpr: { BoolExpr* expr = (BoolExpr*)node; scan_expression_tree_walker((Node*)expr->args, walker, context); break; } case T_OpExpr: case T_NullTest: { p2walker(node, context); break; } case T_List: { ListCell* temp = NULL; foreach (temp, (List*)node) { scan_expression_tree_walker((Node*)lfirst(temp), walker, context); } break; } default: { break; } } } /** * @Description: get the restrictions about the given partition column. * @in node, we get the restriction from this node, which is a restriction * expression list. * @in/out partition_context, the element partColList of context store * all partition column number. The element partClauseList will store * restriction clause. * @return */ static void GetPartitionClauseOpExpr(Node* node, partition_context* context) { OpExpr* op_clause = (OpExpr*)node; Node* leftop = NULL; Node* rightop = NULL; Var* var = NULL; if (list_length(op_clause->args) != 2) { return; } leftop = get_leftop((Expr*)op_clause); rightop = get_rightop((Expr*)op_clause); Assert(NULL != rightop); Assert(NULL != leftop); if (rightop == NULL) { ereport(ERROR, (errcode(ERRCODE_FDW_ERROR), errmodule(MOD_HDFS), errmsg("The right operate expression of partition column cannot be NULL."))); return; } if (leftop == NULL) { ereport(ERROR, (errcode(ERRCODE_FDW_ERROR), errmodule(MOD_HDFS), errmsg("The left operate expression of partition column cannot be NULL."))); return; } if (IsVarNode(rightop) && IsA(leftop, Const)) { if (IsA(rightop, RelabelType)) { rightop = (Node*)((RelabelType*)rightop)->arg; } var = (Var*)rightop; } else if (IsVarNode(leftop) && IsA(rightop, Const)) { if (IsA(leftop, RelabelType)) { leftop = (Node*)((RelabelType*)leftop)->arg; } var = (Var*)leftop; } if (NULL != var) { ListCell* cell = NULL; foreach (cell, context->partColList) { if (lfirst_int(cell) == var->varattno) { /* we find one partition restriction calues and put it into partColList. */ context->partClauseList = lappend(context->partClauseList, node); break; } } } } static void GetPartitionClauseNullTest(Node* node, partition_context* context) { NullTest* nullExpr = (NullTest*)node; if (IS_NOT_NULL == nullExpr->nulltesttype) { return; } if (!IsA(nullExpr->arg, Var)) { return; } Var* var = (Var*)nullExpr->arg; if (NULL != var) { ListCell* cell = NULL; foreach (cell, context->partColList) { if (lfirst_int(cell) == var->varattno) { context->partClauseList = lappend(context->partClauseList, node); break; } } } } void getPartitionClause(Node* node, partition_context* context) { if (NULL == node) { return; } switch (nodeTag(node)) { case T_BoolExpr: { ereport( ERROR, (errcode(ERRCODE_FDW_INVALID_DATA_TYPE), errmodule(MOD_HDFS), errmsg("can not reach here."))); break; } case T_OpExpr: { GetPartitionClauseOpExpr(node, context); break; } case T_NullTest: { /* Only optimize the "column Is NULL" NullTest expression. */ GetPartitionClauseNullTest(node, context); break; } default: { scan_expression_tree_walker(node, (void (*)())getPartitionClause, context); break; } } } /** * @Description: build the partition dirctory for partition restriction. * for example, /b=123/c=456/. * @in node, the restriction. * @in partition_string_context, it include the partition column. * the partition restirction to be obtained will store partColStrList * of context. * @return none. */ static void getPartitionStringOpExpr(Node* node, partition_string_context* context) { OpExpr* op_clause = (OpExpr*)node; bool equalExpr = isEquivalentExpression(op_clause->opno); Var* var = NULL; Const* constant = NULL; Node* leftop = get_leftop((Expr*)op_clause); Node* rightop = get_rightop((Expr*)op_clause); if (equalExpr) { if (rightop && IsVarNode(rightop) && leftop && IsA(leftop, Const)) { if (IsA(rightop, RelabelType)) { rightop = (Node*)((RelabelType*)rightop)->arg; } var = (Var*)rightop; constant = (Const*)leftop; } else if (leftop && IsVarNode(leftop) && rightop && IsA(rightop, Const)) { if (IsA(leftop, RelabelType)) { leftop = (Node*)((RelabelType*)leftop)->arg; } var = (Var*)leftop; constant = (Const*)rightop; } if (NULL == var) { return; } char* relName = get_relid_attribute_name(context->foreignTblOid, var->varattno); StringInfo partitionDir = makeStringInfo(); appendStringInfo(partitionDir, "%s=", relName); GetStringFromDatum(constant->consttype, constant->consttypmod, constant->constvalue, partitionDir); appendStringInfo(partitionDir, "/"); context->partColNoList = lappend_int(context->partColNoList, var->varattno); context->partColStrList = lappend(context->partColStrList, partitionDir); } } static void getPartitionStringNullTest(Node* node, partition_string_context* context) { NullTest* nullExpr = (NullTest*)node; if (IS_NOT_NULL == nullExpr->nulltesttype) { return; } if (!IsA(nullExpr->arg, Var)) { return; } Var* var = (Var*)nullExpr->arg; if (NULL != var) { char* relName = get_relid_attribute_name(context->foreignTblOid, var->varattno); StringInfo partitionDir = makeStringInfo(); appendStringInfo(partitionDir, "%s=%s", relName, DEFAULT_HIVE_NULL); appendStringInfo(partitionDir, "/"); context->partColNoList = lappend_int(context->partColNoList, var->varattno); context->partColStrList = lappend(context->partColStrList, partitionDir); } } void getPartitionString(Node* node, partition_string_context* context) { if (NULL == node) { return; } switch (nodeTag(node)) { case T_OpExpr: { getPartitionStringOpExpr(node, context); break; } case T_NullTest: { getPartitionStringNullTest(node, context); break; } default: { scan_expression_tree_walker(node, (void (*)())getPartitionString, context); break; } } } /** * @Description: whether or check the give expression include bool expression. * @in node, the given expression. * @return return true, if find the bool expression, otherwise return false. */ bool hasBoolExpr(Node* node) { check_stack_depth(); bool returnValue = false; if (node == NULL) return false; switch (nodeTag(node)) { case T_BoolExpr: { returnValue = true; break; } case T_OpExpr: { ListCell* temp = NULL; OpExpr* expr = (OpExpr*)node; foreach (temp, expr->args) { if (hasBoolExpr((Node*)lfirst(temp))) { returnValue = true; break; } } break; } case T_List: { ListCell* temp = NULL; foreach (temp, (List*)node) { if (hasBoolExpr((Node*)lfirst(temp))) { returnValue = true; break; } } break; } default: { break; } } return returnValue; } /** * @Description: add patition dirctory path from the given restriction. * @in prefix, the foldername option value. * @in foreignTableId, the given foreign table oid. * @in scanClauseList, the given restriction. * @return return the modified prefix. */ List* addPartitionPath(Oid foreignTableId, List* scanClauseList, char* prefix) { List* partList = GetPartitionList(foreignTableId); bool hasBExpr = false; List* partStrPathList = NIL; partition_string_context context; StringInfo str = NULL; hasBExpr = hasBoolExpr((Node*)scanClauseList); /*now, only optimize the opExpr. */ if (hasBExpr) { return NIL; } context.partColNoList = NIL; context.partColStrList = NIL; context.foreignTblOid = foreignTableId; partition_context part_context; part_context.partClauseList = NIL; part_context.partColList = partList; getPartitionClause((Node*)scanClauseList, &part_context); /* get the partition restriction. */ getPartitionString((Node*)part_context.partClauseList, &context); /* Bulid the partition path. */ for (int i = 0; i < list_length(partList); i++) { AttrNumber varattno = list_nth_int(partList, i); bool findPartCol = false; if (hasBExpr && i >= 1) { break; } for (int partIndex = 0; partIndex < list_length(context.partColNoList); partIndex++) { int partColNo = list_nth_int(context.partColNoList, partIndex); if (partColNo == varattno) { findPartCol = true; StringInfo partStringDir = (StringInfo)list_nth(context.partColStrList, partIndex); /* we need palloc memory. */ if (hasBExpr) { str = makeStringInfo(); appendStringInfo(str, "%s", prefix); appendStringInfo(str, "%s", partStringDir->data); partStrPathList = lappend(partStrPathList, str); ereport(LOG, (errmodule(MOD_DFS), errmsg("active pruning remain file: %s", str->data))); /*As for boolExpr, more then one restriction is existed for one column. */ continue; } else { if (0 == i) { str = makeStringInfo(); appendStringInfo(str, "%s", prefix); } appendStringInfo(str, "%s", partStringDir->data); ereport(LOG, (errmodule(MOD_DFS), errmsg("active pruning remain file: %s", str->data))); /* only one restriction for one column. */ break; } } } if (!findPartCol) { break; } } if (!hasBExpr && NULL != str) { partStrPathList = lappend(partStrPathList, str); } return partStrPathList; } static List* GetAllFiles( dfs::DFSConnector* conn, Oid foreignTableId, ServerTypeOption srvType, List* columnList, List* scanClauseList) { List* fileList = NIL; char* currentFile = NULL; List* entryList = NIL; switch (srvType) { case T_OBS_SERVER: { char* multiBucketsFolder = HdfsGetOptionValue(foreignTableId, OPTION_NAME_FOLDERNAME); char* multiBucketsRegion = HdfsGetOptionValue(foreignTableId, OPTION_NAME_LOCATION); while (NULL != multiBucketsFolder || NULL != multiBucketsRegion) { if (NULL != multiBucketsFolder) { currentFile = parseMultiFileNames(&multiBucketsFolder, false, ','); } else { /* As for region option, each region path will stat from "obs://". So we add strlen(obs:/) length * for multiBuckets. */ multiBucketsRegion = multiBucketsRegion + strlen("obs:/"); currentFile = parseMultiFileNames(&multiBucketsRegion, false, '|'); } List* fixedPathList = NIL; if (u_sess->attr.attr_sql.enable_valuepartition_pruning) { fixedPathList = addPartitionPath(foreignTableId, scanClauseList, currentFile); } if (0 == list_length(fixedPathList)) { fileList = list_concat(fileList, conn->listObjectsStat(currentFile, currentFile)); } else { ListCell* cell = NULL; foreach (cell, fixedPathList) { StringInfo fixedPath = (StringInfo)lfirst(cell); fileList = list_concat(fileList, conn->listObjectsStat(fixedPath->data, currentFile)); } } list_free_ext(fixedPathList); } break; } case T_HDFS_SERVER: { HdfsFdwOptions* options = HdfsGetOptions(foreignTableId); if (options->foldername) { /* * If the foldername is a file path, the funtion hdfsListDirectory do not validity check, * so calling IsHdfsFile to judge whether the foldername is not a file path. */ if (conn->isDfsFile(options->foldername)) { delete (conn); conn = NULL; ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_OPTION_DATA), errmodule(MOD_HDFS), errmsg("The foldername option cannot be a file path."))); } fileList = conn->listObjectsStat(options->foldername); } else { while (NULL != options->filename) { currentFile = parseMultiFileNames(&options->filename, true, ','); /* If the option use filenames, then all the entries defined must be file. */ if (!conn->isDfsFile(currentFile)) { delete (conn); conn = NULL; ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_OPTION_DATA), errmodule(MOD_HDFS), errmsg("The entries in the options fileNames must be file!"))); } fileList = list_concat(fileList, conn->listObjectsStat(currentFile)); } } break; } default: { Assert(0); break; } } list_free(entryList); entryList = NIL; return fileList; } static List* GetObsAllFiles(dfs::DFSConnector* conn, Oid foreignTableId, List* columnList, List*& prunningResult, List*& partList, List* scanClauses) { List* fileList = NIL; char* currentFile = NULL; char* multiBucketsFolder = HdfsGetOptionValue(foreignTableId, OPTION_NAME_FOLDERNAME); char* multiBucketsRegion = HdfsGetOptionValue(foreignTableId, OPTION_NAME_LOCATION); while (NULL != multiBucketsFolder || NULL != multiBucketsRegion) { if (NULL != multiBucketsFolder) { currentFile = parseMultiFileNames(&multiBucketsFolder, false, ','); } else { /* As for region option, each region path will stat from "obs://". So we add strlen(obs:/) length for * multiBuckets. */ multiBucketsRegion = multiBucketsRegion + strlen("obs:/"); currentFile = parseMultiFileNames(&multiBucketsRegion, false, '|'); } List* fixedPathList = NIL; if (u_sess->attr.attr_sql.enable_valuepartition_pruning) { fixedPathList = addPartitionPath(foreignTableId, scanClauses, currentFile); } if (0 == list_length(fixedPathList)) { fileList = list_concat(fileList, conn->listObjectsStat(currentFile, currentFile)); } else { ListCell* cell = NULL; foreach (cell, fixedPathList) { StringInfo fixedPath = (StringInfo)lfirst(cell); fileList = list_concat(fileList, conn->listObjectsStat(fixedPath->data, currentFile)); } } list_free_ext(fixedPathList); } if (0 == list_length(fileList)) { return NIL; } return fileList; } static List* GetHdfsAllFiles(dfs::DFSConnector* conn, Oid foreignTableId, List* columnList, List*& prunningResult, List*& partList, List* scanClauses) { List* fileList = NIL; char* currentFile = NULL; HdfsFdwOptions* options = HdfsGetOptions(foreignTableId); if (options->foldername) { /* * If the foldername is a file path, the funtion hdfsListDirectory do not validity check, * so calling IsHdfsFile to judge whether the foldername is not a file path. */ if (conn->isDfsFile(options->foldername)) { delete (conn); conn = NULL; ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_OPTION_DATA), errmodule(MOD_HDFS), errmsg("The foldername option cannot be a file path."))); } fileList = conn->listObjectsStat(options->foldername); } else { while (NULL != options->filename) { currentFile = parseMultiFileNames(&options->filename, true, ','); /* If the option use filenames, then all the entries defined must be file. */ if (!conn->isDfsFile(currentFile)) { delete (conn); conn = NULL; ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_OPTION_DATA), errmodule(MOD_HDFS), errmsg("The entries in the options fileNames must be file!"))); } fileList = list_concat(fileList, conn->listObjectsStat(currentFile)); } } if (0 == list_length(fileList)) { return NIL; } return fileList; } static Value* getPartitionValue(dfs::DFSConnector* conn, char* partitionStr, char* ObjectStr) { const char* partContent = strchr(partitionStr, '='); if (NULL == partContent) { delete (conn); conn = NULL; ereport(ERROR, (errcode(ERRCODE_FDW_ERROR), errmodule(MOD_HDFS), errmsg("Something wrong with the partition directory name of file %s.", ObjectStr))); } Value* partValue = makeString(UriDecode(partContent + 1)); return partValue; } /* * Get all the files in the current directory and store the partition column value in the new split if need. * This is only used for partition table and is different from DigFiles although both two functions search sub * files for a given path. * * @_in param conn: the handler of hdfs connect. * @_in param split: The split from whose path we get sub files. * @_in param colNo: the partition column index of the current split. * @return Return a list of sub files, or null for a empty directory. */ static List* GetSubFiles(dfs::DFSConnector* conn, SplitInfo* split, int colNo) { List* fileList = NIL; char* fileName = split->fileName; char* folderName = split->filePath; List* partContentList = split->partContentList; List* entryList = NIL; SplitInfo* newsplit = NULL; entryList = conn->listObjectsStat(folderName); if (entryList == NIL) { return NIL; } List* newPartContentList = list_copy(partContentList); Value* partValue = getPartitionValue(conn, fileName, folderName); newPartContentList = lappend(newPartContentList, partValue); for (int i = 0; i < list_length(entryList); i++) { SplitInfo* splitInfo = (SplitInfo*)list_nth(entryList, i); newsplit = InitFolderSplit(splitInfo->filePath, newPartContentList, splitInfo->ObjectSize); fileList = lappend(fileList, newsplit); } pfree_ext(split->fileName); pfree_ext(split->filePath); list_free(newPartContentList); list_free(entryList); entryList = NIL; return fileList; } /* * Dig the file split input. This is a general function to get list of file/directories for a given file path. * (GetSubFiles is only for partition prunning.) If the path is a file, add itself to the list and return; * else if the file is a directory and no file is found under it, return NIL; else if the directory has sub * files/directories then add all the sub ones into the list and return. * * @_in param conn: The hdfs connect handle. * @_in param split: The split file/directory to dig in. * @return Return a list of one or more files. NIL means it is a empty directory. */ static List* DigFiles(dfs::DFSConnector* conn, SplitInfo* split) { List* fileList = NIL; char* filePath = split->filePath; SplitInfo* newsplit = NULL; List* partContent = split->partContentList; List* entryList = conn->listObjectsStat(filePath); if (entryList == NIL) { return NIL; } for (int i = 0; i < list_length(entryList); i++) { SplitInfo* splitInfo = (SplitInfo*)list_nth(entryList, i); newsplit = InitFolderSplit(splitInfo->filePath, partContent, splitInfo->ObjectSize); fileList = lappend(fileList, newsplit); } list_free(entryList); entryList = NIL; return fileList; } List* GetPartitionList(Oid relid) { HeapTuple partTuple = NULL; int2vector* partVec = NULL; List* partList = NULL; Datum datum; bool isnull = false; /* Check if the current foreign table is partitioned. */ if (!isPartitionedObject(relid, RELKIND_FOREIGN_TABLE, true)) return NIL; /* Search the tuple related to the current foreign table in pg_partition. */ partTuple = searchPgPartitionByParentIdCopy(PART_OBJ_TYPE_PARTED_TABLE, relid); if (!HeapTupleIsValid(partTuple)) ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmodule(MOD_HDFS), errmsg("cache lookup failed for relid %u", relid))); datum = SysCacheGetAttr(PARTRELID, partTuple, Anum_pg_partition_partkey, &isnull); if (isnull) { ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmodule(MOD_HDFS), errmsg("Error happens when search the record in pg_partition for a partition table. "))); } else { partVec = (int2vector*)DatumGetPointer(datum); } /* Build the partition list from the partVec stored in tuple. */ for (int i = 0; i < partVec->dim1; i++) { partList = lappend_int(partList, partVec->values[i]); } heap_freetuple(partTuple); partTuple = NULL; return partList; } /** * @Description: fill the partition value into partContentList in order to * read it by setPartinfoAndDesc function. * @in conn, the DFS connextor. * @in splitObject, store the partition value, we get partition value from it. * @return none. */ void fillPartitionValueInSplitInfo(dfs::DFSConnector* conn, SplitInfo* splitObject, int partColNum) { int ibegin = find_Nth(splitObject->filePath, partColNum, "/"); int iend = find_Nth(splitObject->filePath, partColNum + 1, "/"); char* partitionStr = (char*)palloc0(iend - ibegin); error_t rc = EOK; rc = memcpy_s(partitionStr, iend - ibegin, splitObject->filePath + ibegin + 1, iend - ibegin - 1); securec_check(rc, "\0", "\0"); splitObject->fileName = partitionStr; Value* partitionValue = getPartitionValue(conn, partitionStr, splitObject->filePath); splitObject->partContentList = lappend(splitObject->partContentList, partitionValue); } /* * The function handle the whole process of partition prunning based on the partition column list, * scanClauses and so on. * * @_in param conn: The handler of hdfs connect. * @_in param partitionRelatedList: Includes partition list, file list and column list. * @_in param scanClauses: The expression clauses of foreign scan for the prunning; * @_in param foreignTableId: the relation oid of the current foreign table. * @_out param prunningResult: Statistic of the partition prunning information for each layer. * @_out param partList: The list of the partition column. * @return Return the file list after the partition prunning. */ static List* PartitionPruneProcess(dfs::DFSConnector* conn, List* partitionRelatedList, List* scanClauses, Oid foreignTableId, List*& prunningResult, List*& partList, ServerTypeOption srvType) { partList = GetPartitionList(foreignTableId); ListCell* fileCell = NULL; SplitInfo* split = NULL; bool partitionSkipped = false; List* fileList = (List*)linitial(partitionRelatedList); List* columnList = (List*)lsecond(partitionRelatedList); CheckPartitionColNumber(conn, partList, fileList, foreignTableId, srvType); for (int i = 0; i < list_length(partList); i++) { List* newFileList = NIL; int partCol = list_nth_int(partList, i); int sum = list_length(fileList); /* * If the fileList is empty, then all the files has been pruned and return immediately. * In this case , the list of part info in the split is not complete, but it is ok because * if we return NIL here, there will no task to scheduler and it will return in the begining * of scan(don't read any orc file). */ if (0 == sum) return NIL; int notprunning = 0; /* Fetch the current partition column var includes type and no. */ Var* value = GetVarFromColumnList(columnList, partCol); /* The flag to control if we need to call PartitionFilterClause, true means not. */ bool skipPartitionFilter = ((NULL == value) || (0 == list_length(scanClauses))); /* * If the partition column is not required(value ==null) or the i exceed the max number of * partition layers we can prune or the scanClauses is null, we will just fetch the partition * column value without prunning. */ if (skipPartitionFilter) { /* process all the directories in the fileList(they must be directory other than file) */ foreach (fileCell, fileList) { split = (SplitInfo*)lfirst(fileCell); if (T_HDFS_SERVER == srvType) { newFileList = list_concat(newFileList, GetSubFiles(conn, split, partCol)); } else { fillPartitionValueInSplitInfo(conn, split, i + split->prefixSlashNum); newFileList = lappend(newFileList, split); } } } else { Expr* equalExpr = (Expr*)MakeOperatorExpression(value, BTEqualStrategyNumber); /* process all the directories in the fileList(they must be directory other than file) */ ListCell* prev = NULL; ListCell* next = NULL; for (fileCell = list_head(fileList); fileCell != NULL; fileCell = next) { CHECK_FOR_INTERRUPTS(); split = (SplitInfo*)lfirst(fileCell); /* next cell */ next = lnext(fileCell); if (T_OBS_SERVER == srvType) { fillPartitionValueInSplitInfo(conn, split, i + split->prefixSlashNum); } /* Partition pruning by scanClauses. */ partitionSkipped = false; partitionSkipped = PartitionFilterClause(split, scanClauses, value, equalExpr); /* * If we can not skip the current one, we need to Add the files under the directory to * new file List which will be the fileList when we arrive to the next partition layer. */ if (!partitionSkipped) { if (T_HDFS_SERVER == srvType) { newFileList = list_concat(newFileList, GetSubFiles(conn, split, partCol)); } else { newFileList = lappend(newFileList, split); } notprunning++; /* prev cell */ prev = fileCell; } else { if (T_OBS_SERVER == srvType) { pfree_ext(split->fileName); pfree_ext(split->filePath); pfree_ext(split); /* remove from fileList */ fileList = list_delete_cell(fileList, fileCell, prev); } else { /* prev cell */ prev = fileCell; } } } /* collect the partition prunning statistic for the current partition layer. */ CollectPartPruneInfo(prunningResult, sum, notprunning, partCol, foreignTableId); } /* Clean the list if needed. */ if (T_HDFS_SERVER == srvType) { list_free_deep(fileList); } else { list_free(fileList); } fileList = newFileList; } return fileList; } /* * Check if the number of partition column is larger than in hdfs. The number of * partition column defined in MPPDB can be smaller, but can never be larger. * * .e.g * If we create a foreign table like: create foreign table hdfs_tab ~ partitioned by * (c1, c2), then we will check if all the file paths include c1 and c2. Because we must * ensure that the tree consisted of paths is absolutely balanceable which * means all the paths have the same length. But there can be no files in the last * partition directory, so we need to process this condition specially. * * @_in param conn: The handler of hdfs connect. * @_in param partList: The list of partition columns. * @_in param fileList: The file list defined when we create the foreign table. * @_in param foreignTableId: The oid of the foreign table in catalog. */ static void CheckPartitionColNumber( dfs::DFSConnector* conn, List* partList, List* fileList, Oid foreignTableId, ServerTypeOption srvType) { int length = list_length(partList); if (0 == length) { return; } if (T_HDFS_SERVER == srvType) { SplitInfo* split = (SplitInfo*)linitial(fileList); SplitInfo* curSplit = (SplitInfo*)copyObject(split); for (int i = 0; i < length; i++) { curSplit = CheckOneSubSplit(conn, curSplit, (i == length - 1), foreignTableId); } DestroySplit(curSplit); } else { ListCell* cell = NULL; foreach (cell, fileList) { SplitInfo* curSplit = (SplitInfo*)lfirst(cell); if (find_Nth(curSplit->filePath, length + curSplit->prefixSlashNum, "/") == -1) { QUERY_NOT_SUPPORT(foreignTableId, "The number of partition columns " "defined of foreign table %s is larger than it should be."); } } } } /* * We only check one split for each layer here when checking if the number of partition * column is larger than defined in hdfs. * @_in param conn: The handler of hdfs connect. * @_in param split: The split to be checked. * @_in param isLastPartition: Indicate if the split is of the last partition. * @_in param foreignTableId: The oid of the foreign table in catalog. * @return Return the first split under the current folder. */ static SplitInfo* CheckOneSubSplit(dfs::DFSConnector* conn, SplitInfo* split, bool isLastPartition, Oid foreignTableId) { char* folderName = split->filePath; SplitInfo* newsplit = NULL; List* entryList = NIL; if (conn->isDfsFile(folderName)) { delete (conn); conn = NULL; QUERY_NOT_SUPPORT(foreignTableId, "The number of partition columns " "defined of foreign table %s is larger than it should be."); } entryList = conn->listDirectory(folderName); /* * Here if the numEntries equals zero, we may need log a error. If we define N * partition columns, then there must be N layers of folder. */ if (0 == list_length(entryList)) { if (!isLastPartition) { delete (conn); conn = NULL; ereport(ERROR, (errcode(ERRCODE_FDW_ERROR), errmodule(MOD_HDFS), errmsg("Error occur when open partition folder: %s, " "it is empty.", folderName))); } } else { newsplit = InitFolderSplit((char*)list_nth(entryList, 0), NIL, 0); } /* Clean the former split. */ DestroySplit(split); list_free(entryList); entryList = NIL; return newsplit; } /* * Check if we can skip the current split by the scanClauses. * @_in param split: The split to check if it matches the scanclauses. * @_in param scanClauses: The clauses generated from the optimizer. * @_in param value: The var of the partition column. * @_in_param equalExpr: The expression of the restriction to be built. * @return Return true: we can skip the current split; false: we need to keep the split. */ static bool PartitionFilterClause(SplitInfo* split, List* scanClauses, Var* value, Expr* equalExpr) { Node* baseRestriction = NULL; char* partValue = NULL; List* partRestriction = NIL; bool partSkipped = false; char* fileName = split->fileName; Assert(fileName != NULL); Datum datumValue; /* fetch the value of the current partition column. */ partValue = strchr(fileName, '='); if (NULL == partValue) { ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_OPTION_DATA), errmodule(MOD_HDFS), errmsg("Something wrong with the partition directory name of file %s.", split->filePath))); } if (0 == strncmp(partValue + 1, DEFAULT_HIVE_NULL, 26)) { /* __HIVE_DEFAULT_PARTITION__ means the current value is NULL */ baseRestriction = BuildNullTestConstraint(value, IS_NULL); } else { /* Convert the string value to datum value. */ datumValue = GetDatumFromString(value->vartype, value->vartypmod, UriDecode(partValue + 1)); BuildConstraintConst(equalExpr, datumValue, false); baseRestriction = (Node*)equalExpr; } partRestriction = lappend(partRestriction, baseRestriction); /* * Compare the size of strings by using "C" format in coarse filter. */ List* tempScanClauses = (List*)copyObject(scanClauses); List* opExprList = pull_opExpr((Node*)tempScanClauses); ListCell* lc = NULL; foreach (lc, opExprList) { OpExpr* opExpr = (OpExpr*)lfirst(lc); opExpr->inputcollid = C_COLLATION_OID; } /* * Call the function afforded by PG to try if we can refute the predicate, if partSkipped * is true then we can skip the current split(file), otherwise we need it. */ partSkipped = predicate_refuted_by(partRestriction, tempScanClauses, true); list_free_ext(opExprList); list_free_ext(partRestriction); list_free_deep(tempScanClauses); tempScanClauses = NIL; return partSkipped; } /* * Collect the partition prunning result which includes the total number of files and the left number * of files for each layer. * * @_out param prunningResult: The result list of partition prunning. * @_in param sum: The total number of files for prunning. * @_in param notprunning: The left number of files after prunning. * @_in param colno: The no of partition column. * @_in param relOid: The oid of the partitioned foreign table. */ static void CollectPartPruneInfo(List*& prunningResult, int sum, int notprunning, int colno, Oid relOid) { Relation rel = heap_open(relOid, AccessShareLock); char* attName = NameStr(rel->rd_att->attrs[colno - 1]->attname); /* * Add 16 here because we need to add some description words, three separate characters * (';', '(', ')') and termination '\0'. */ int length = strlen(attName) + GetDigitOfInt(sum) + GetDigitOfInt(notprunning) + 16; char* tmp = (char*)palloc0(sizeof(char) * length); int ret = 0; ret = snprintf_s( tmp, sizeof(char) * length, sizeof(char) * length - 1, "%s(total %d; left %d)", attName, sum, notprunning); securec_check_ss(ret, "\0", "\0"); heap_close(rel, AccessShareLock); Value* v = makeString(tmp); prunningResult = lappend(prunningResult, v); } /* * Drill down until there is only file in the file list(no directories). * * @_in param conn: The handler of hdfs connect. * @_in param fileList: The list of files before the drill. * @return Return the list of files after drilling. */ static List* DrillDown(dfs::DFSConnector* conn, List* fileList) { ListCell* fileCell = NULL; SplitInfo* split = NULL; List* nextFileList = NIL; List* fileListList = NIL; while (0 != list_length(fileList)) { bool isAllFile = true; /* Dig in each file in the file list. */ foreach (fileCell, fileList) { CHECK_FOR_INTERRUPTS(); split = (SplitInfo*)lfirst(fileCell); if (split->ObjectSize > 0) { fileListList = lappend(fileListList, split); continue; } else { isAllFile = false; nextFileList = list_concat(nextFileList, DigFiles(conn, split)); } } /* * If all the split is file, we have digged into the bottom * and it is time to break the loop and return. */ if (isAllFile) { break; } else { list_free(fileList); fileList = nextFileList; nextFileList = NIL; } } return fileListList; } #ifdef ENABLE_MULTIPLE_NODES /* * @Description: filter *.carbondata from fileList * @IN conn: dfs connection * @IN fileList: all file list to be filter * @IN allColumnList: column list for query * @IN restrictColumnList: restrict column list * @IN scanClauses: scan clause * @IN attrNum: attr num * @Return: *.carbondata file list for query */ List* CarbonDataFile(dfs::DFSConnector* conn, List* fileList, List* allColumnList, List* restrictColumnList, List* scanClauses, int16 attrNum) { Assert(NIL != fileList); if (0 == list_length(fileList)) { return NIL; } /* if no where condition, do not read *.carbonindex file */ /* if (enable_indexscan == false), do not read *.carbonindex file */ if ((0 == list_length(restrictColumnList)) || (u_sess->attr.attr_sql.enable_indexscan == false)) { List* dataFileList = dfs::CarbonFileFilter(fileList, CARBONDATA_DATA); list_free(fileList); ereport(DEBUG1, (errmodule(MOD_CARBONDATA), errmsg("Ignore *.carbonindex file."))); return dataFileList; } /* get .carbonindex file from obs file list */ List* indexFileList = dfs::CarbonFileFilter(fileList, CARBONDATA_INDEX); /* init readerState for inputstream */ dfs::reader::ReaderState* readerState = (dfs::reader::ReaderState*)palloc0(sizeof(dfs::reader::ReaderState)); readerState->persistCtx = AllocSetContextCreate(CurrentMemoryContext, "carbon index reader context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); /* switch MemoryContext */ MemoryContext oldcontext = MemoryContextSwitchTo(readerState->persistCtx); /* init restrictRequired and readRequired */ bool* restrictRequired = (bool*)palloc0(sizeof(bool) * attrNum); bool* readRequired = (bool*)palloc0(sizeof(bool) * attrNum); ListCell* lc = NULL; Var* variable = NULL; foreach (lc, restrictColumnList) { variable = (Var*)lfirst(lc); Assert(variable->varattno <= attrNum); restrictRequired[variable->varattno - 1] = true; } foreach (lc, allColumnList) { variable = (Var*)lfirst(lc); Assert(variable->varattno <= attrNum); readRequired[variable->varattno - 1] = true; } /* init queryRestrictionList */ List* queryRestrictionList = ExtractNonParamRestriction((List*)copyObject(scanClauses)); /* get .carbondata file from .carbonindex file */ List* dataFileList = NIL; ListCell* cell = NULL; for (cell = list_head(indexFileList); cell != NULL; cell = lnext(cell)) { void* data = lfirst(cell); if (IsA(data, SplitInfo)) { SplitInfo* splitinfo = (SplitInfo*)data; char* filePath = splitinfo->filePath; readerState->currentSplit = splitinfo; readerState->currentFileSize = splitinfo->ObjectSize; readerState->currentFileID = -1; std::unique_ptr gsInputStream = dfs::InputStreamFactory(conn, filePath, readerState, false); dfs::reader::CarbondataIndexReader indexFileReader( allColumnList, queryRestrictionList, restrictRequired, readRequired, attrNum); indexFileReader.init(std::move(gsInputStream)); indexFileReader.readIndex(); /* Data File Deduplication from indexfile */ dataFileList = list_concat(dataFileList, indexFileReader.getDataFileDeduplication()); } } fileList = dfs::CarbonDataFileMatch(fileList, dataFileList); /* release memory */ (void)MemoryContextSwitchTo(oldcontext); if (NULL != readerState->persistCtx && readerState->persistCtx != CurrentMemoryContext) { MemoryContextDelete(readerState->persistCtx); pfree(readerState); readerState = NULL; } return fileList; } /* * @Description: extract non-param restriction * @IN opExpressionList: operate expression list * @Return: non-param restriction list */ static List* ExtractNonParamRestriction(List* opExpressionList) { ListCell* lc = NULL; Expr* expr = NULL; List* retRestriction = NIL; foreach (lc, opExpressionList) { expr = (Expr*)lfirst(lc); if (IsA(expr, OpExpr)) { Node* leftop = get_leftop(expr); Node* rightop = get_rightop(expr); Assert(leftop != NULL); Assert(rightop != NULL); if ((IsVarNode(leftop) && IsParamConst(rightop)) || (IsVarNode(rightop) && IsParamConst(leftop))) { continue; } } retRestriction = lappend(retRestriction, expr); } List* opExprList = pull_opExpr((Node*)retRestriction); foreach (lc, opExprList) { OpExpr* opExpr = (OpExpr*)lfirst(lc); opExpr->inputcollid = C_COLLATION_OID; } list_free_ext(opExprList); return retRestriction; } #endif