diff --git a/be/src/http/http_channel.cpp b/be/src/http/http_channel.cpp index 82d7f6c7d8..7b68370272 100644 --- a/be/src/http/http_channel.cpp +++ b/be/src/http/http_channel.cpp @@ -17,17 +17,20 @@ #include "http/http_channel.h" +#include #include #include #include #include +#include "gutil/strings/split.h" #include "http/http_request.h" #include "http/http_response.h" #include "http/http_headers.h" #include "http/http_status.h" #include "common/logging.h" +#include "util/zlib.h" namespace doris { @@ -52,7 +55,13 @@ void HttpChannel::send_reply(HttpRequest* request, HttpStatus status) { void HttpChannel::send_reply( HttpRequest* request, HttpStatus status, const std::string& content) { auto evb = evbuffer_new(); - evbuffer_add(evb, content.c_str(), content.size()); + std::string compressed_content; + if (compress_content(request->header(HttpHeaders::ACCEPT_ENCODING), content, &compressed_content)) { + request->add_output_header(HttpHeaders::CONTENT_ENCODING, "gzip"); + evbuffer_add(evb, compressed_content.c_str(), compressed_content.size()); + } else { + evbuffer_add(evb, content.c_str(), content.size()); + } evhttp_send_reply(request->get_evhttp_request(), status, defalut_reason(status).c_str(), evb); evbuffer_free(evb); } @@ -66,4 +75,31 @@ void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t siz evbuffer_free(evb); } +bool HttpChannel::compress_content(const std::string& accept_encoding, const std::string& input, std::string* output) { + // Don't bother compressing empty content. + if (input.empty()) { + return false; + } + + // Check if gzip compression is accepted by the caller. If so, compress the + // content and replace the prerendered output. + bool is_compressed = false; + std::vector encodings = strings::Split(accept_encoding, ","); + for (string& encoding : encodings) { + StripWhiteSpace(&encoding); + if (encoding == "gzip") { + std::ostringstream oss; + Status s = zlib::CompressLevel(Slice(input), 1, &oss); + if (s.ok()) { + *output = oss.str(); + is_compressed = true; + } else { + LOG(WARNING) << "Could not compress output: " << s.to_string(); + } + break; + } + } + return is_compressed; +} + } diff --git a/be/src/http/http_channel.h b/be/src/http/http_channel.h index 307dd8124d..9c7546b94e 100644 --- a/be/src/http/http_channel.h +++ b/be/src/http/http_channel.h @@ -46,6 +46,8 @@ public: static void send_reply(HttpRequest* request, HttpStatus status, const std::string& content); static void send_file(HttpRequest* request, int fd, size_t off, size_t size); + + static bool compress_content(const std::string& accept_encoding, const std::string& input, std::string* output); }; } diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp index cdf1bb492d..b6205a67db 100644 --- a/be/src/http/http_request.cpp +++ b/be/src/http/http_request.cpp @@ -122,9 +122,7 @@ const std::string& HttpRequest::param(const std::string& key) const { } void HttpRequest::add_output_header(const char* key, const char* value) { -// #ifndef BE_TEST evhttp_add_header(evhttp_request_get_output_headers(_ev_req), key, value); -// #endif } std::string HttpRequest::get_request_body() { diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index d54a4d210c..4c44ddbc7b 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -100,6 +100,7 @@ set(UTIL_FILES easy_json.cc mustache/mustache.cc brpc_stub_cache.cpp + zlib.cpp ) if (WITH_MYSQL) diff --git a/be/src/util/zlib.cpp b/be/src/util/zlib.cpp new file mode 100644 index 0000000000..fe0c325b4e --- /dev/null +++ b/be/src/util/zlib.cpp @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/zlib.h" + +#include +#include +#include +#include + +#include +#include + +#include "gutil/macros.h" +#include "gutil/strings/substitute.h" + +using std::ostream; +using std::string; +using std::unique_ptr; + +#define ZRETURN_NOT_OK(call) \ + RETURN_IF_ERROR(ZlibResultToStatus(call)) + +namespace doris { +namespace zlib { + +namespace { +Status ZlibResultToStatus(int rc) { + switch (rc) { + case Z_OK: + return Status::OK(); + case Z_STREAM_END: + return Status::EndOfFile("zlib EOF"); + case Z_NEED_DICT: + return Status::Corruption("zlib error: NEED_DICT"); + case Z_ERRNO: + return Status::IOError("zlib error: Z_ERRNO"); + case Z_STREAM_ERROR: + return Status::Corruption("zlib error: STREAM_ERROR"); + case Z_DATA_ERROR: + return Status::Corruption("zlib error: DATA_ERROR"); + case Z_MEM_ERROR: + return Status::RuntimeError("zlib error: MEM_ERROR"); + case Z_BUF_ERROR: + return Status::RuntimeError("zlib error: BUF_ERROR"); + case Z_VERSION_ERROR: + return Status::RuntimeError("zlib error: VERSION_ERROR"); + default: + return Status::RuntimeError( + strings::Substitute("zlib error: unknown error $0", rc)); + } +} +} // anonymous namespace + +Status Compress(Slice input, ostream* out) { + return CompressLevel(input, Z_DEFAULT_COMPRESSION, out); +} + +Status CompressLevel(Slice input, int level, ostream* out) { + z_stream zs; + memset(&zs, 0, sizeof(zs)); + ZRETURN_NOT_OK(deflateInit2(&zs, level, Z_DEFLATED, + 15 + 16 /* 15 window bits, enable gzip */, + 8 /* memory level, max is 9 */, + Z_DEFAULT_STRATEGY)); + + zs.avail_in = input.get_size(); + zs.next_in = (unsigned char*)(input.mutable_data()); + const int kChunkSize = 256 * 1024; + unique_ptr chunk(new unsigned char[kChunkSize]); + int flush; + do { + zs.avail_out = kChunkSize; + zs.next_out = chunk.get(); + flush = (zs.avail_in == 0) ? Z_FINISH : Z_NO_FLUSH; + Status s = ZlibResultToStatus(deflate(&zs, flush)); + if (!s.ok() && !s.is_end_of_file()) { + return s; + } + int out_size = zs.next_out - chunk.get(); + if (out_size > 0) { + out->write(reinterpret_cast(chunk.get()), out_size); + } + } while (flush != Z_FINISH); + ZRETURN_NOT_OK(deflateEnd(&zs)); + return Status::OK(); +} + +Status Uncompress(Slice compressed, std::ostream* out) { + z_stream zs; + memset(&zs, 0, sizeof(zs)); + zs.next_in = (unsigned char*)(compressed.mutable_data()); + zs.avail_in = compressed.get_size(); + ZRETURN_NOT_OK(inflateInit2(&zs, 15 + 16 /* 15 window bits, enable zlib */)); + int flush; + Status s; + do { + unsigned char buf[4096]; + zs.next_out = buf; + zs.avail_out = arraysize(buf); + flush = zs.avail_in > 0 ? Z_NO_FLUSH : Z_FINISH; + s = ZlibResultToStatus(inflate(&zs, flush)); + if (!s.ok() && !s.is_end_of_file()) { + return s; + } + out->write(reinterpret_cast(buf), zs.next_out - buf); + } while (flush == Z_NO_FLUSH); + ZRETURN_NOT_OK(inflateEnd(&zs)); + + return Status::OK(); +} + +} // namespace zlib +} // namespace doris diff --git a/be/src/util/zlib.h b/be/src/util/zlib.h new file mode 100644 index 0000000000..b062c4050b --- /dev/null +++ b/be/src/util/zlib.h @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include + +#include "util/slice.h" +#include "common/status.h" + +namespace doris { +namespace zlib { + +// Zlib-compress the data in 'input', appending the result to 'out'. +// +// In case of an error, some data may still be appended to 'out'. +Status Compress(Slice input, std::ostream* out); + +// The same as the above, but with a custom level (1-9, where 1 is fastest +// and 9 is best compression). +Status CompressLevel(Slice input, int level, std::ostream* out); + +// Uncompress the zlib-compressed data in 'compressed', appending the result +// to 'out'. +// +// In case of an error, some data may still be appended to 'out'. +Status Uncompress(Slice compressed, std::ostream* out); + +} // namespace zlib +} // namespace doris diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt index 9d419bbb75..df3c9f87be 100644 --- a/be/test/util/CMakeLists.txt +++ b/be/test/util/CMakeLists.txt @@ -65,3 +65,4 @@ ADD_BE_TEST(thread_test) ADD_BE_TEST(threadpool_test) ADD_BE_TEST(trace_test) ADD_BE_TEST(easy_json-test) +ADD_BE_TEST(http_channel_test) diff --git a/be/test/util/http_channel_test.cpp b/be/test/util/http_channel_test.cpp new file mode 100644 index 0000000000..c7b04211ab --- /dev/null +++ b/be/test/util/http_channel_test.cpp @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http/http_channel.h" + +#include + +#include "util/zlib.h" +#include "util/logging.h" + +namespace doris { + +class HttpChannelTest : public testing::Test { +public: + void check_data_eq(const std::string& output, const std::string& expected) { + std::ostringstream oss; + ASSERT_TRUE(zlib::Uncompress(Slice(output), &oss).ok()); + ASSERT_EQ(expected, oss.str()); + } +}; + +TEST_F(HttpChannelTest, CompressContent) { + ASSERT_FALSE(HttpChannel::compress_content("gzip", "", nullptr)); + ASSERT_FALSE(HttpChannel::compress_content("", "test", nullptr)); + ASSERT_FALSE(HttpChannel::compress_content("Gzip", "", nullptr)); + + const std::string& intput("test_data_0123456789abcdefg"); + std::string output; + + ASSERT_TRUE(HttpChannel::compress_content("gzip", intput, &output)); + ASSERT_NO_FATAL_FAILURE(check_data_eq(output, intput)); + + ASSERT_TRUE(HttpChannel::compress_content("123,gzip,321", intput, &output)); + ASSERT_NO_FATAL_FAILURE(check_data_eq(output, intput)); +} + +} // namespace doris + +int main(int argc, char** argv) { + doris::init_glog("be-test"); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} +