From d20b5f90d86ce0ea36826d60579c3d9d1d823993 Mon Sep 17 00:00:00 2001 From: Mryange <59914473+Mryange@users.noreply.github.com> Date: Wed, 10 May 2023 17:07:41 +0800 Subject: [PATCH] [feature](executor) Automatically set the instance_num using the info from be. (#19345) 1. fixed some error regressions (results error with big nstance_num due to incorrect order by). 2. if set parallel_fragment_exec_instance_num to 0, the concurrency in the Pipeline execution engine will automatically be set to half of the number of CPU cores. 3. add limit to parallel_fragment_exec_instance_num that it cannot be set to more than fe.conf::max_instance_num(Default: 128) ``` mysql [(none)]>set parallel_fragment_exec_instance_num = 514; ERROR 1231 (42000): errCode = 2, detailMessage = Variable 'parallel_fragment_exec_instance_num' can't be set to the value of '514(Should not be set to more than 128)' ``` --- be/src/agent/task_worker_pool.cpp | 4 +- docs/en/docs/admin-manual/config/fe-config.md | 8 + .../pipeline-execution-engine.md | 5 + .../docs/admin-manual/config/fe-config.md | 8 + .../pipeline-execution-engine.md | 2 + .../java/org/apache/doris/common/Config.java | 9 + .../org/apache/doris/analysis/SetVar.java | 10 +- .../apache/doris/master/ReportHandler.java | 5 + .../org/apache/doris/qe/SessionVariable.java | 10 +- .../org/apache/doris/qe/StmtExecutor.java | 3 +- .../java/org/apache/doris/system/Backend.java | 62 +++++- .../doris/system/SystemInfoService.java | 7 + gensrc/thrift/MasterService.thrift | 1 + .../data/dynamic_table_p0/sql/q01.out | 200 +++++++++--------- .../suites/dynamic_table_p0/sql/q01.sql | 4 +- .../suites/dynamic_table_p0/sql/q04.sql | 2 +- 16 files changed, 230 insertions(+), 110 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index f85a5218b3..0e4fadd24c 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1311,7 +1311,9 @@ void TaskWorkerPool::_report_disk_state_worker_thread_callback() { disk.__set_used(root_path_info.is_used); request.disks[root_path_info.path] = disk; } - + int num_cores = config::pipeline_executor_size > 0 ? config::pipeline_executor_size + : CpuInfo::num_cores(); + request.__set_num_cores(num_cores); _handle_report(request, ReportType::DISK); } StorageEngine::instance()->deregister_report_listener(this); diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md index bc3acae538..6bb154f572 100644 --- a/docs/en/docs/admin-manual/config/fe-config.md +++ b/docs/en/docs/admin-manual/config/fe-config.md @@ -2701,3 +2701,11 @@ If false, when select from tables in information_schema database, the result will not contain the information of the table in external catalog. This is to avoid query time when external catalog is not reachable. +#### `max_instance_num` + + + +Default: 128 + +This is used to limit the setting of "parallel_fragment_exec_instance_num". +"parallel_fragment_exec_instance_num" cannot be set higher than "max_instance_num". \ No newline at end of file diff --git a/docs/en/docs/query-acceleration/pipeline-execution-engine.md b/docs/en/docs/query-acceleration/pipeline-execution-engine.md index ecebea78ee..994bdac6d9 100644 --- a/docs/en/docs/query-acceleration/pipeline-execution-engine.md +++ b/docs/en/docs/query-acceleration/pipeline-execution-engine.md @@ -76,3 +76,8 @@ The default configuration of `parallel_fragment_exec_instance_num` represents th ``` set parallel_fragment_exec_instance_num = 16; ``` + +Specifically, if set to 0, the concurrency in the Pipeline execution engine will automatically be set to half of the number of CPU cores. +"parallel_fragment_exec_instance_num" cannot be set higher than the "max_instance_num" in "fe.conf" (default is 128). + + diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index 99b97544bd..57612c7d26 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -2703,3 +2703,11 @@ show data (其他用法:HELP SHOW DATA) 当设置为 false 时,查询 `information_schema` 中的表时,将不再返回 external catalog 中的表的信息。 这个参数主要用于避免因 external catalog 无法访问、信息过多等原因导致的查询 `information_schema` 超时的问题。 + +#### `max_instance_num` + + + +默认值:128 + +用于限制parallel_fragment_exec_instance_num的设置,set parallel_fragment_exec_instance_num不能超过max_instance_num \ No newline at end of file diff --git a/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md b/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md index f3bcd2b7e7..a23336a63b 100644 --- a/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md +++ b/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md @@ -76,3 +76,5 @@ set enable_pipeline_engine = true; ``` set parallel_fragment_exec_instance_num = 16; ``` + +特别的,如果设置为0, 则在Pipeline 执行引擎中的并发数会自动的设置为cpu核心数目的一半。并且parallel_fragment_exec_instance_num不能设置超过fe.conf中的max_instance_num(默认128) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 38df46f922..33141597ec 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1932,6 +1932,15 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static boolean disable_datev1 = true; + /* + * "max_instance_num" is used to set the maximum concurrency. When the value set + * by "parallel_fragment_exec_instance_num" is greater than "max_instance_num", + * an error will be reported. + */ + @ConfField(mutable = true) + public static int max_instance_num = 128; + + /** * This config used for export/outfile. * Whether delete all files in the directory specified by export/outfile. diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java index 37eac5ac30..ccb505bbe4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java @@ -19,6 +19,7 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; @@ -155,7 +156,14 @@ public class SetVar { this.value = new StringLiteral(TimeUtils.checkTimeZoneValidAndStandardize(getValue().getStringValue())); this.result = (LiteralExpr) this.value; } - + if (getVariable().equalsIgnoreCase(SessionVariable.PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM)) { + int instanceNum = Integer.parseInt(getValue().getStringValue()); + if (instanceNum > Config.max_instance_num) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR, + SessionVariable.PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM, + instanceNum + "(Should not be set to more than " + Config.max_instance_num + ")"); + } + } if (getVariable().equalsIgnoreCase(SessionVariable.EXEC_MEM_LIMIT)) { this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getValue().getStringValue()))); this.result = (LiteralExpr) this.value; diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 646bed22a3..718e78ca18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -194,6 +194,11 @@ public class ReportHandler extends Daemon { LOG.info("receive report from be {}. type: {}, current queue size: {}", backend.getId(), reportType, reportQueue.size()); + if (reportType == ReportType.DISK) { + Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector(); + int numCores = request.getNumCores(); + beinfoCollector.addBeInfo(beId, numCores); + } return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index ad2143b3da..fcde985f42 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -28,6 +28,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.nereids.metrics.Event; import org.apache.doris.nereids.metrics.EventSwitchParser; import org.apache.doris.qe.VariableMgr.VarAttr; +import org.apache.doris.system.Backend; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TResourceLimit; import org.apache.doris.thrift.TRuntimeFilterType; @@ -1224,7 +1225,12 @@ public class SessionVariable implements Serializable, Writable { } public int getParallelExecInstanceNum() { - return parallelExecInstanceNum; + if (enablePipelineEngine && parallelExecInstanceNum == 0) { + Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector(); + return beinfoCollector.getParallelExecInstanceNum(); + } else { + return parallelExecInstanceNum; + } } public int getExchangeInstanceParallel() { @@ -1768,7 +1774,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setCodegenLevel(codegenLevel); tResult.setBeExecVersion(Config.be_exec_version); tResult.setEnablePipelineEngine(enablePipelineEngine); - tResult.setParallelInstance(parallelExecInstanceNum); + tResult.setParallelInstance(getParallelExecInstanceNum()); tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary); tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery); tResult.setEnableShareHashTableForBroadcastJoin(enableShareHashTableForBroadcastJoin); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 6f7b9326ed..fd7c8fc003 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -289,7 +289,7 @@ public class StmtExecutor { builder.instancesNumPerBe( beToInstancesNum.entrySet().stream().map(entry -> entry.getKey() + ":" + entry.getValue()) .collect(Collectors.joining(","))); - builder.parallelFragmentExecInstance(String.valueOf(context.sessionVariable.parallelExecInstanceNum)); + builder.parallelFragmentExecInstance(String.valueOf(context.sessionVariable.getParallelExecInstanceNum())); builder.traceId(context.getSessionVariable().getTraceId()); return builder.build(); } @@ -2206,4 +2206,3 @@ public class StmtExecutor { } } - diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index dc661ef757..f1faaa20a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -49,6 +49,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -513,7 +514,6 @@ public class Backend implements Writable { long dataUsedCapacityB = tDisk.getDataUsedCapacity(); long diskAvailableCapacityB = tDisk.getDiskAvailableCapacity(); boolean isUsed = tDisk.isUsed(); - DiskInfo diskInfo = disks.get(rootPath); if (diskInfo == null) { diskInfo = new DiskInfo(rootPath); @@ -799,4 +799,64 @@ public class Backend implements Writable { public String getTagMapString() { return "{" + new PrintableMap<>(tagMap, ":", true, false).toString() + "}"; } + + public static BeInfoCollector getBeInfoCollector() { + return BeInfoCollector.get(); + } + + public static class BeInfoCollector { + private int numCores = 1; + private static volatile BeInfoCollector instance = null; + private static final Map Info = new ConcurrentHashMap<>(); + + private BeInfoCollector(int numCores) { + this.numCores = numCores; + } + + public static BeInfoCollector get() { + if (instance == null) { + synchronized (BeInfoCollector.class) { + if (instance == null) { + instance = new BeInfoCollector(Integer.MAX_VALUE); + } + } + } + return instance; + } + + public int getNumCores() { + return numCores; + } + + public void clear() { + Info.clear(); + } + + public void addBeInfo(long beId, int numCores) { + Info.put(beId, new BeInfoCollector(numCores)); + } + + public void dropBeInfo(long beId) { + Info.remove(beId); + } + + public int getMinNumCores() { + int minNumCores = Integer.MAX_VALUE; + for (BeInfoCollector beinfo : Info.values()) { + minNumCores = Math.min(minNumCores, beinfo.getNumCores()); + } + return Math.max(1, minNumCores); + } + + public int getParallelExecInstanceNum() { + if (getMinNumCores() == Integer.MAX_VALUE) { + return 1; + } + return (getMinNumCores() + 1) / 2; + } + + public BeInfoCollector getBeInfoCollectorById(long beId) { + return Info.get(beId); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 76a664bb28..16883fe745 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -270,6 +270,9 @@ public class SystemInfoService { } dropBackend(backend.getIp(), backend.getHostName(), backend.getHeartbeatPort()); + // update BeInfoCollector + Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector(); + beinfoCollector.dropBeInfo(backendId); } // final entry of dropping backend @@ -1231,6 +1234,10 @@ public class SystemInfoService { copiedReportVersions.remove(backend.getId()); ImmutableMap newIdToReportVersion = ImmutableMap.copyOf(copiedReportVersions); idToReportVersionRef = newIdToReportVersion; + + // update BeInfoCollector + Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector(); + beinfoCollector.dropBeInfo(backend.getId()); } public void updateBackendState(Backend be) { diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index 893bb0171a..18d95b3854 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -101,6 +101,7 @@ struct TReportRequest { 8: optional i64 tablet_max_compaction_score 9: optional list storage_policy // only id and version 10: optional list resource // only id and version + 11: i32 num_cores } struct TMasterResult { diff --git a/regression-test/data/dynamic_table_p0/sql/q01.out b/regression-test/data/dynamic_table_p0/sql/q01.out index 9719048c79..c2daca09b1 100644 --- a/regression-test/data/dynamic_table_p0/sql/q01.out +++ b/regression-test/data/dynamic_table_p0/sql/q01.out @@ -15,12 +15,12 @@ [[8188473317676292]] 113 [[1246015209249282]] 137 [[2507054251761891]] 705 -[[3773869957247344]] 762 -[[8270995445718570]] 762 [[5110638976348890]] 762 -[[1010130250138698]] 769 +[[8270995445718570]] 762 +[[3773869957247344]] 762 [[7412557565511915]] 769 [[8987047093946431]] 769 +[[1010130250138698]] 769 [[4455327319328767]] 783 [[8828150595154990]] 885 [[7159775357516943]] 1104 @@ -60,14 +60,14 @@ [[4747614140915628]] 2535 [[8597511952125331]] 2535 [[5730174107868168], [5730174107868168]] 2596 -[[4354883209311598]] 2660 [[8119601212676566]] 2660 +[[4354883209311598]] 2660 [[2043666895665144]] 2700 [[3521040327201062]] 2767 [[5776942886505092]] 2787 [[359661801933760]] 2841 -[[153644039088669]] 2842 [[7004296435935771]] 2842 +[[153644039088669]] 2842 [[1627354218830689]] 2842 [[5859840954071140]] 2918 [[4386667172417724]] 2944 @@ -83,9 +83,9 @@ [[3724245364169931]] 4416 [[7916138178043215]] 4420 [[7763869997273429]] 4452 -[[2782955878363126]] 4966 -[[2354480809921026]] 4966 [[670650193123303]] 4966 +[[2354480809921026]] 4966 +[[2782955878363126]] 4966 [[5165127671898678]] 5000 [[6389401809481499]] 5355 [[7615002208576309]] 6460 @@ -97,11 +97,11 @@ [[8088693211060628]] 9441 [[1693725099154825]] 9441 [[1809442347930970]] 9550 +[[1946642491818096]] 9550 [[3825997181791941]] 9550 [[8070919951629642]] 9550 -[[1946642491818096]] 9550 -[[1421733087335887]] 9550 [[2525418264579859]] 9550 +[[1421733087335887]] 9550 [[8344178849407849], [8344178849407849], [8344178849407849], [8344178849407849], [8344178849407849], [8344178849407849], [8344178849407849], [8344178849407849]] 9597 [[8092402219407069]] 15236 [[1472749710201141]] 29743 @@ -114,106 +114,106 @@ [[8485415091903596], [8485415091903596], [8485415091903596], [8485415091903596], [8485415091903596], [8485415091903596], [8485415091903596]] 140000 -- !q01_6 -- -153644039088669 -359661801933760 -433649683957555 -461763776919610 -670650193123303 -793107092391462 -909085960590569 -1010130250138698 1062399915275687 -1246015209249282 -1421733087335887 -1472749710201141 -1594793682472250 -1627354218830689 -1685792381137272 -1693725099154825 -1809442347930970 -1946642491818096 -2042138032090263 -2043666895665144 -2264322793739384 -2321583468933540 -2354480809921026 -2371994091383347 -2377102276803763 -2401724312046667 -2507054251761891 -2525418264579859 -2663410947740189 -2782955878363126 -2957271339174925 -3246031659541791 -3467530916498381 -3521040327201062 -3724245364169931 -3773869957247344 -3825997181791941 -4095256255478091 -4245294408202541 -4354883209311598 -4386667172417724 -4455327319328767 -4590062018629189 -4747614140915628 -4809930628427145 -4975090293678843 5110638976348890 -5131906219034408 -5165127671898678 -5220311469220731 -5343737085255499 -5588351545916968 -5679970266278027 -5730174107868168 -5776942886505092 -5790470732472115 -5814976036215560 -5827569795054362 -5857463922541047 -5859840954071140 -6313101949227572 -6337013526045930 -6389401809481499 -6403234720218018 6437188822160102 -6492680839765176 -7004296435935771 +670650193123303 +8119601212676566 +359661801933760 7010643802717876 -7119085210022543 -7159775357516943 +5730174107868168 +5679970266278027 +2663410947740189 +8344178849407849 +8088693211060628 +8440511462993326 +1809442347930970 +793107092391462 +433649683957555 +3467530916498381 +2264322793739384 +4455327319328767 +2377102276803763 +5165127671898678 +8322339034003297 +2371994091383347 +3246031659541791 +8270995445718570 +2043666895665144 +7929380011639087 +1246015209249282 +4747614140915628 +1594793682472250 +5588351545916968 +8168903104703233 +6313101949227572 +909085960590569 +2042138032090263 +4095256255478091 +8536831366526581 7170312234638999 7300691594757017 -7367822710336978 -7388338586294277 -7412557565511915 -7615002208576309 -7763869997273429 -7916138178043215 -7929380011639087 -8049598119637189 -8070919951629642 -8088693211060628 +5776942886505092 8092402219407069 -8119601212676566 -8135216530942053 -8168903104703233 -8188473317676292 -8270995445718570 -8305732448960719 -8322339034003297 -8344178849407849 -8440511462993326 -8485415091903596 -8536831366526581 -8597511952125331 -8608916026515418 +7367822710336978 +5343737085255499 +6389401809481499 +6492680839765176 +6337013526045930 +1946642491818096 +1685792381137272 +2354480809921026 +5814976036215560 +1693725099154825 +2401724312046667 +7004296435935771 +4354883209311598 8708146704871155 -8812535475818332 -8828150595154990 +7119085210022543 +8597511952125331 +7412557565511915 +4386667172417724 +2321583468933540 +8305732448960719 +3773869957247344 +4245294408202541 +1472749710201141 +2782955878363126 +461763776919610 +2957271339174925 +8188473317676292 +5220311469220731 8987047093946431 +5131906219034408 +153644039088669 +8049598119637189 +4809930628427145 +1010130250138698 +6403234720218018 +7615002208576309 +2507054251761891 +7916138178043215 +3825997181791941 +8070919951629642 +7763869997273429 +5790470732472115 +7388338586294277 +5827569795054362 +2525418264579859 +3724245364169931 +8828150595154990 +7159775357516943 +5859840954071140 +4590062018629189 +8812535475818332 +3521040327201062 +8485415091903596 +8135216530942053 +5857463922541047 +1627354218830689 +1421733087335887 +8608916026515418 +4975090293678843 -- !q01_7 -- [3521040327201062, 3521040327201062] 2767 834842 diff --git a/regression-test/suites/dynamic_table_p0/sql/q01.sql b/regression-test/suites/dynamic_table_p0/sql/q01.sql index e9139983d3..14f730426e 100644 --- a/regression-test/suites/dynamic_table_p0/sql/q01.sql +++ b/regression-test/suites/dynamic_table_p0/sql/q01.sql @@ -2,6 +2,6 @@ SELECT count() FROM test_btc_json; SELECT avg(fee) FROM test_btc_json; SELECT avg(size(`inputs.prev_out.spending_outpoints.n`)) FROM test_btc_json; SELECT avg(size(`inputs.prev_out.spending_outpoints.tx_index`)) FROM test_btc_json; -select `inputs.prev_out.spending_outpoints.tx_index`, fee from test_btc_json order by fee; -select `out.tx_index`[-1] from test_btc_json order by `out.tx_index`[-1]; +select `inputs.prev_out.spending_outpoints.tx_index`, fee from test_btc_json order by fee,hash; +select `out.tx_index`[-1] from test_btc_json order by hash,`out.tx_index`[-1]; select `out.tx_index`, fee, `out.value`[1] from test_btc_json where array_contains(`out.value`, 2450939412); \ No newline at end of file diff --git a/regression-test/suites/dynamic_table_p0/sql/q04.sql b/regression-test/suites/dynamic_table_p0/sql/q04.sql index a1c6873fd7..d42e7328f9 100644 --- a/regression-test/suites/dynamic_table_p0/sql/q04.sql +++ b/regression-test/suites/dynamic_table_p0/sql/q04.sql @@ -1,4 +1,4 @@ -select `answers.date`, `answers.user` from test_es_nested_json where size(`answers.date`) > 3 limit 10; +select `answers.date`, `answers.user` from test_es_nested_json where size(`answers.date`) > 3 order by qid limit 10 ; select `answers.date`[1], qid, title from test_es_nested_json where array_contains(`answers.user`, 'Michael Ecklund (804104)') order by qid limit 10; select qid, title from test_es_nested_json where array_contains(`answers.date`, '2012-04-03T19:35:38.007'); select `answers.date` from test_es_nested_json where size(`answers.date`) > 0 order by `answers.date`[1] limit 10;