[Bug] Filter out unavaliable backends when getting tablet location (#6204)
* [Bug] Filter out unavaiable backends when getting scan range location In the previous implementation, we will eliminate non-surviving BEs in the Coordinator phase. But for Spark or Flink Connector, there is no such logic, so when a BE node is down, it will cause the problem of querying errors through the Connector. * fix ut * fix compiule
This commit is contained in:
@ -268,15 +268,21 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct
|
||||
ctx->body_bytes = 0;
|
||||
size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024;
|
||||
size_t json_max_body_bytes = config::streaming_load_json_max_mb * 1024 * 1024;
|
||||
bool read_json_by_line = false;
|
||||
if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
|
||||
if (boost::iequals(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
|
||||
read_json_by_line = true;
|
||||
}
|
||||
}
|
||||
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
|
||||
ctx->body_bytes = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
|
||||
// json max body size
|
||||
if ((ctx->format == TFileFormatType::FORMAT_JSON) &&
|
||||
(ctx->body_bytes > json_max_body_bytes)) {
|
||||
(ctx->body_bytes > json_max_body_bytes) && !read_json_by_line) {
|
||||
std::stringstream ss;
|
||||
ss << "The size of this batch exceed the max size [" << json_max_body_bytes
|
||||
<< "] of json type data "
|
||||
<< " data [ " << ctx->body_bytes << " ]";
|
||||
<< " data [ " << ctx->body_bytes << " ]. Split the file, or use 'read_json_by_line'";
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
// csv max body size
|
||||
|
||||
@ -430,8 +430,9 @@ public class OlapScanNode extends ScanNode {
|
||||
boolean collectedStat = false;
|
||||
for (Replica replica : replicas) {
|
||||
Backend backend = Catalog.getCurrentSystemInfo().getBackend(replica.getBackendId());
|
||||
if (backend == null) {
|
||||
LOG.debug("replica {} not exists", replica.getBackendId());
|
||||
if (backend == null || !backend.isAlive()) {
|
||||
LOG.debug("backend {} not exists or is not alive for replica {}",
|
||||
replica.getBackendId(), replica.getId());
|
||||
continue;
|
||||
}
|
||||
String ip = backend.getHost();
|
||||
|
||||
@ -263,10 +263,13 @@ abstract public class DorisHttpTestCase {
|
||||
private static void assignBackends() {
|
||||
Backend backend1 = new Backend(testBackendId1, "node-1", 9308);
|
||||
backend1.setBePort(9300);
|
||||
backend1.setAlive(true);
|
||||
Backend backend2 = new Backend(testBackendId2, "node-2", 9308);
|
||||
backend2.setBePort(9300);
|
||||
backend2.setAlive(true);
|
||||
Backend backend3 = new Backend(testBackendId3, "node-3", 9308);
|
||||
backend3.setBePort(9300);
|
||||
backend3.setAlive(true);
|
||||
Catalog.getCurrentSystemInfo().addBackend(backend1);
|
||||
Catalog.getCurrentSystemInfo().addBackend(backend2);
|
||||
Catalog.getCurrentSystemInfo().addBackend(backend3);
|
||||
|
||||
Reference in New Issue
Block a user