Files
doris/be/test/exec/es_scan_reader_test.cpp
令狐少侠 5a57ecca15 [Doris On ES]fix bug of query failed in doc_value_mode when fields have none value (#3513)
#3479 

Here I try to explain the cause of the problem and how to fix it.

**The Cause of The problem**
Take the case in issue(#3479 ) as an example:
The general results are as follows:
```
GET table/_doc/_search
{"query":{"match_all":{}},"stored_fields":"_none_","docvalue_fields":["k1"],"sort":["_doc"],"size":100}

{
  "took": 6,
  "timed_out": false,
  "_shards": {
    ……
  },
  "hits": {
    "total": 3,
    "max_score": null,
    "hits": [
      {
        "_index": "table",
        "_score": null,
        "sort": [
          0
        ]
      },
      {
        "_index": "table",
        "_score": null,
        "fields": {
          "k1": [
            "kkk1"
          ]
        },
        "sort": [
          0
        ]
      },
      {
        "_index": "table",
        "_score": null,
        "sort": [
          0
        ]
      }
    ]
  }
}
```

But in Doris on ES,Be fetched data parallelly on all shards, and use `filter_path` to reduce the network cost. The process will be as follows:
```
GET table/_doc/_search?preference=_shards:1&filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields
{"query":{"match_all":{}},"stored_fields":"_none_","docvalue_fields":["k1"],"sort":["_doc"],"size":100}

{
  "hits": {
    "total": 0
  }
}

GET table/_doc/_search?preference=_shards:2&filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields
{"query":{"match_all":{}},"stored_fields":"_none_","docvalue_fields":["k1"],"sort":["_doc"],"size":100}
{
  "hits": {
    "total": 1
  }
}

GET table/_doc/_search?preference=_shards:3&filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields
{"query":{"match_all":{}},"stored_fields":"_none_","docvalue_fields":["k1"],"sort":["_doc"],"size":100}
{
  "hits": {
    "total": 1,
    "hits": [
      {
        "fields": {
          "k1": [
            "kkk1"
          ]
        }
      }
    ]
  }
}
```
*Scan-Worker On BE which processed result of shard2  will failed.* 

**The reasons are as follows:**
1. "filter_path" causes the hits.hits object not exist.  
2. In the current implementation, if there are some data rows(total > 0), the hits.hits. object must be an array

**How To Fix it**

Two Method:
1. modify "filter_path" to contain the hits.  
Pros: Fixed Code is very simple
Cons: More network cost
2. Deal with the case where fields are missing in a batch. 
Pros: No loss of performance
Cons: Code is more complex 

Performance first, I use Method2.

**Design**
1. Add a variable "_doc_value_mode" into Class "EsScrollParser" to =indicate whether the data processed by this parser is doc_value_mode or not.
2. "_doc_value_mode" is passed from ESScollReader <- ESScanner <- ScrollQueryBuilder::build() that determines whether DSL is enable doc_value_mode
3. When hits.hits of response from ES is empty and total > 0. We know there are data lines, but the corresponding fields do not exist. EsScrollParser will use "_doc_value_mode"  and _total to construct _total lines which fields are assigned with 'NULL'
2020-05-11 15:34:12 +08:00

252 lines
11 KiB
C++

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <vector>
#include <map>
#include <gtest/gtest.h>
#include <string>
#include "common/logging.h"
#include "exec/es/es_scan_reader.h"
#include "exec/es/es_scroll_query.h"
#include "http/ev_http_server.h"
#include "http/http_channel.h"
#include "http/http_handler.h"
#include "http/http_request.h"
#include "rapidjson/document.h"
#include "rapidjson/writer.h"
#include "rapidjson/stringbuffer.h"
namespace doris {
class RestSearchAction : public HttpHandler {
public:
void handle(HttpRequest* req) override {
std::string user;
std::string passwd;
if (!parse_basic_auth(*req, &user, &passwd) || user != "root") {
HttpChannel::send_basic_challenge(req, "abc");
return;
}
req->add_output_header(HttpHeaders::CONTENT_TYPE, "application/json");
if (req->method() == HttpMethod::POST) {
std::string post_body = req->get_request_body();
rapidjson::Document post_doc;
post_doc.Parse<0>(post_body.c_str());
int size = 1;
if (post_doc.HasMember("size")) {
rapidjson::Value& size_value = post_doc["size"];
size = size_value.GetInt();
}
std::string _scroll_id(std::to_string(size));
rapidjson::Document search_result;
rapidjson::Document::AllocatorType &allocator = search_result.GetAllocator();
search_result.SetObject();
rapidjson::Value scroll_id_value(_scroll_id.c_str(), allocator);
search_result.AddMember("_scroll_id", scroll_id_value, allocator);
rapidjson::Value outer_hits(rapidjson::kObjectType);
outer_hits.AddMember("total", 10, allocator);
rapidjson::Value inner_hits(rapidjson::kArrayType);
rapidjson::Value source_docuement(rapidjson::kObjectType);
source_docuement.AddMember("id", 1, allocator);
rapidjson::Value value_node("1", allocator);
source_docuement.AddMember("value", value_node, allocator);
inner_hits.PushBack(source_docuement, allocator);
outer_hits.AddMember("hits", inner_hits, allocator);
search_result.AddMember("hits", outer_hits, allocator);
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
search_result.Accept(writer);
//send DELETE scorll post request
std::string search_result_json = buffer.GetString();
HttpChannel::send_reply(req, search_result_json);
} else {
std::string response = "test1";
HttpChannel::send_reply(req, response);
}
}
};
class RestSearchScrollAction : public HttpHandler {
public:
void handle(HttpRequest* req) override {
std::string user;
std::string passwd;
if (!parse_basic_auth(*req, &user, &passwd) || user != "root") {
HttpChannel::send_basic_challenge(req, "abc");
return;
}
if (req->method() == HttpMethod::POST) {
std::string post_body = req->get_request_body();
rapidjson::Document post_doc;
post_doc.Parse<0>(post_body.c_str());
std::string scroll_id;
if (!post_doc.HasMember("scroll_id")) {
HttpChannel::send_reply(req,HttpStatus::NOT_FOUND, "invalid scroll request");
return;
} else {
rapidjson::Value& scroll_id_value = post_doc["scroll_id"];
scroll_id = scroll_id_value.GetString();
int offset = atoi(scroll_id.c_str());
if (offset > 10) {
rapidjson::Document end_search_result;
rapidjson::Document::AllocatorType &allocator = end_search_result.GetAllocator();
end_search_result.SetObject();
rapidjson::Value scroll_id_value("11", allocator);
end_search_result.AddMember("_scroll_id", scroll_id_value, allocator);
rapidjson::Value outer_hits(rapidjson::kObjectType);
outer_hits.AddMember("total", 0, allocator);
end_search_result.AddMember("hits", outer_hits, allocator);
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
end_search_result.Accept(writer);
//send DELETE scorll post request
std::string end_search_result_json = buffer.GetString();
HttpChannel::send_reply(req, end_search_result_json);
return;
} else {
int start = offset + 1;
rapidjson::Document search_result;
rapidjson::Document::AllocatorType &allocator = search_result.GetAllocator();
search_result.SetObject();
rapidjson::Value scroll_id_value(std::to_string(start).c_str(), allocator);
search_result.AddMember("_scroll_id", scroll_id_value, allocator);
rapidjson::Value outer_hits(rapidjson::kObjectType);
outer_hits.AddMember("total", 1, allocator);
rapidjson::Value inner_hits(rapidjson::kArrayType);
rapidjson::Value source_docuement(rapidjson::kObjectType);
source_docuement.AddMember("id", start, allocator);
rapidjson::Value value_node(std::to_string(start).c_str(), allocator);
source_docuement.AddMember("value", value_node, allocator);
inner_hits.PushBack(source_docuement, allocator);
outer_hits.AddMember("hits", inner_hits, allocator);
search_result.AddMember("hits", outer_hits, allocator);
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
search_result.Accept(writer);
//send DELETE scorll post request
std::string search_result_json = buffer.GetString();
HttpChannel::send_reply(req, search_result_json);
return;
}
}
}
}
};
class RestClearScrollAction : public HttpHandler {
public:
void handle(HttpRequest* req) override {
std::string user;
std::string passwd;
if (!parse_basic_auth(*req, &user, &passwd) || user != "root") {
HttpChannel::send_basic_challenge(req, "abc");
return;
}
if (req->method() == HttpMethod::DELETE) {
std::string post_body = req->get_request_body();
rapidjson::Document post_doc;
post_doc.Parse<0>(post_body.c_str());
std::string scroll_id;
if (!post_doc.HasMember("scroll_id")) {
HttpChannel::send_reply(req,HttpStatus::NOT_FOUND, "invalid scroll request");
return;
} else {
rapidjson::Document clear_scroll_result;
rapidjson::Document::AllocatorType &allocator = clear_scroll_result.GetAllocator();
clear_scroll_result.SetObject();
clear_scroll_result.AddMember("succeeded", true, allocator);
clear_scroll_result.AddMember("num_freed", 1, allocator);
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
clear_scroll_result.Accept(writer);
std::string clear_scroll_result_json = buffer.GetString();
HttpChannel::send_reply(req, clear_scroll_result_json);
return;
}
}
}
};
static RestSearchAction rest_search_action = RestSearchAction();
static RestSearchScrollAction rest_search_scroll_action = RestSearchScrollAction();
static RestClearScrollAction rest_clear_scroll_action = RestClearScrollAction();
static EvHttpServer* mock_es_server = nullptr;
static int real_port = 0;
class MockESServerTest : public testing::Test {
public:
MockESServerTest() { }
~MockESServerTest() override { }
static void SetUpTestCase() {
mock_es_server = new EvHttpServer(0);
mock_es_server->register_handler(POST, "/{index}/{type}/_search", &rest_search_action);
mock_es_server->register_handler(POST, "/_search/scroll", &rest_search_scroll_action);
mock_es_server->register_handler(DELETE, "/_search/scroll", &rest_clear_scroll_action);
mock_es_server->start();
real_port = mock_es_server->get_real_port();
ASSERT_NE(0, real_port);
}
static void TearDownTestCase() {
delete mock_es_server;
}
};
TEST_F(MockESServerTest, workflow) {
std::string target = "http://127.0.0.1:" + std::to_string(real_port);
std::vector<std::string> fields = {"id", "value"};
std::map<std::string, std::string> props;
props[ESScanReader::KEY_INDEX] = "tindex";
props[ESScanReader::KEY_TYPE] = "doc";
props[ESScanReader::KEY_USER_NAME] = "root";
props[ESScanReader::KEY_PASS_WORD] = "root";
props[ESScanReader::KEY_SHARD] = "0";
props[ESScanReader::KEY_BATCH_SIZE] = "1";
std::vector<EsPredicate*> predicates;
std::map<std::string, std::string> docvalue_context;
bool doc_value_mode = false;
props[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(props, fields, predicates, docvalue_context, &doc_value_mode);
ESScanReader reader(target, props, doc_value_mode);
auto st = reader.open();
ASSERT_TRUE(st.ok());
bool eos = false;
std::unique_ptr<ScrollParser> parser = nullptr;
while(!eos){
st = reader.get_next(&eos, parser);
ASSERT_TRUE(st.ok());
if(eos) {
break;
}
}
auto cst = reader.close();
ASSERT_TRUE(cst.ok());
}
}
int main(int argc, char* argv[]) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}