From 279ea2f366ffa3cd430767ed2311a7233038f1e8 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Mon, 18 Mar 2024 10:31:51 +0800 Subject: [PATCH] [feature](proxy-protocol) Support proxy protocol v1 (#32338) Enable proxy protocol to support IP transparency. See: `IP Transparency` in https://github.com/apache/doris/blob/f57387b502035fa138d946936e63d1c55e7d1ff4/docs/en/docs/admin-manual/cluster-management/load-balancing.md for details --- .../cluster-management/load-balancing.md | 133 +++++------ .../cluster-management/load-balancing.md | 129 +++++------ .../java/org/apache/doris/common/Config.java | 6 + .../apache/doris/mysql/AcceptListener.java | 14 ++ .../org/apache/doris/mysql/BytesChannel.java | 29 +++ .../org/apache/doris/mysql/MysqlChannel.java | 32 ++- .../org/apache/doris/mysql/MysqlProto.java | 1 - .../doris/mysql/ProxyProtocolHandler.java | 212 ++++++++++++++++++ .../doris/qe/ProxyProtocolHandlerTest.java | 135 +++++++++++ 9 files changed, 561 insertions(+), 130 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/mysql/BytesChannel.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyProtocolHandler.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/qe/ProxyProtocolHandlerTest.java diff --git a/docs/en/docs/admin-manual/cluster-management/load-balancing.md b/docs/en/docs/admin-manual/cluster-management/load-balancing.md index 1933fa4f1a..dbee4aec63 100644 --- a/docs/en/docs/admin-manual/cluster-management/load-balancing.md +++ b/docs/en/docs/admin-manual/cluster-management/load-balancing.md @@ -470,10 +470,6 @@ OK, that's the end, you can use Mysql client, JDBC, etc. to connect to ProxySQL ## Nginx TCP reverse proxy method -### Overview - -Nginx can implement load balancing of HTTP and HTTPS protocols, as well as load balancing of TCP protocol. So, the question is, can the load balancing of the Apache Doris database be achieved through Nginx? The answer is: yes. Next, let's discuss how to use Nginx to achieve load balancing of Apache Doris. - ### Environmental preparation **Note: Using Nginx to achieve load balancing of Apache Doris database, the premise is to build an Apache Doris environment. The IP and port of Apache Doris FE are as follows. Here I use one FE to demonstrate, multiple FEs only You need to add multiple FE IP addresses and ports in the configuration** @@ -522,9 +518,9 @@ stream { upstream mysqld { hash $remote_addr consistent; server 172.31.7.119:9030 weight=1 max_fails=2 fail_timeout=60s; - ##注意这里如果是多个FE,加载这里就行了 + ## Add other `server` entries if there are multi FE node } - ###这里是配置代理的端口,超时时间等 + ### Configure the proxy server { listen 6030; proxy_connect_timeout 300s; @@ -577,64 +573,71 @@ mysql> show databases; | test | +--------------------+ 2 rows in set (0.00 sec) - -mysql> use test; -Reading table information for completion of table and column names -You can turn off this feature to get a quicker startup with -A - -Database changed -mysql> show tables; -+------------------+ -| Tables_in_test | -+------------------+ -| dwd_product_live | -+------------------+ -1 row in set (0.00 sec) -mysql> desc dwd_product_live; -+-----------------+---------------+------+-------+---------+---------+ -| Field | Type | Null | Key | Default | Extra | -+-----------------+---------------+------+-------+---------+---------+ -| dt | DATE | Yes | true | NULL | | -| proId | BIGINT | Yes | true | NULL | | -| authorId | BIGINT | Yes | true | NULL | | -| roomId | BIGINT | Yes | true | NULL | | -| proTitle | VARCHAR(1024) | Yes | false | NULL | REPLACE | -| proLogo | VARCHAR(1024) | Yes | false | NULL | REPLACE | -| shopId | BIGINT | Yes | false | NULL | REPLACE | -| shopTitle | VARCHAR(1024) | Yes | false | NULL | REPLACE | -| profrom | INT | Yes | false | NULL | REPLACE | -| proCategory | BIGINT | Yes | false | NULL | REPLACE | -| proPrice | DECIMAL(18,2) | Yes | false | NULL | REPLACE | -| couponPrice | DECIMAL(18,2) | Yes | false | NULL | REPLACE | -| livePrice | DECIMAL(18,2) | Yes | false | NULL | REPLACE | -| volume | BIGINT | Yes | false | NULL | REPLACE | -| addedTime | BIGINT | Yes | false | NULL | REPLACE | -| offTimeUnix | BIGINT | Yes | false | NULL | REPLACE | -| offTime | BIGINT | Yes | false | NULL | REPLACE | -| createTime | BIGINT | Yes | false | NULL | REPLACE | -| createTimeUnix | BIGINT | Yes | false | NULL | REPLACE | -| amount | DECIMAL(18,2) | Yes | false | NULL | REPLACE | -| views | BIGINT | Yes | false | NULL | REPLACE | -| commissionPrice | DECIMAL(18,2) | Yes | false | NULL | REPLACE | -| proCostPrice | DECIMAL(18,2) | Yes | false | NULL | REPLACE | -| proCode | VARCHAR(1024) | Yes | false | NULL | REPLACE | -| proStatus | INT | Yes | false | NULL | REPLACE | -| status | INT | Yes | false | NULL | REPLACE | -| maxPrice | DECIMAL(18,2) | Yes | false | NULL | REPLACE | -| liveView | BIGINT | Yes | false | NULL | REPLACE | -| firstCategory | BIGINT | Yes | false | NULL | REPLACE | -| secondCategory | BIGINT | Yes | false | NULL | REPLACE | -| thirdCategory | BIGINT | Yes | false | NULL | REPLACE | -| fourCategory | BIGINT | Yes | false | NULL | REPLACE | -| minPrice | DECIMAL(18,2) | Yes | false | NULL | REPLACE | -| liveVolume | BIGINT | Yes | false | NULL | REPLACE | -| liveClick | BIGINT | Yes | false | NULL | REPLACE | -| extensionId | VARCHAR(128) | Yes | false | NULL | REPLACE | -| beginTime | BIGINT | Yes | false | NULL | REPLACE | -| roomTitle | TEXT | Yes | false | NULL | REPLACE | -| beginTimeUnix | BIGINT | Yes | false | NULL | REPLACE | -| nickname | TEXT | Yes | false | NULL | REPLACE | -+-----------------+---------------+------+-------+---------+---------+ -40 rows in set (0.06 sec) ``` +## IP Transparency + +Since 2.1.1, Doris supports [Proxy Protocol](https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt). Using this protocol, IP transparency for load balancing can be achieved, so that after load balancing, Doris can still obtain the client's real IP and implement permission control such as whitelisting. + +> Note: +> +> 1. Only support Proxy Protocol V1。 +> +> 2. Only supports and works on the MySQL protocol port, and does not support and affect other protocol ports such as HTTP and ADBC. +> +> 3. After enable, the Proxy Protocol must be used to connect, otherwise the connection will fail. + +The following uses Nginx as an example to introduce how to implement IP transparency. + +1. Doris enables Proxy Protocol + + Add config in FE's fe.conf: + + ``` + enable_proxy_protocol = true + ``` + +2. Nginx enables Proxy Protocol + + ``` + events { + worker_connections 1024; + } + stream { + upstream mysqld { + hash $remote_addr consistent; + server 172.31.7.119:9030 weight=1 max_fails=2 fail_timeout=60s; + } + server { + listen 6030; + proxy_connect_timeout 300s; + proxy_timeout 300s; + proxy_pass mysqld; + # Enable Proxy Protocol to the upstream server + proxy_protocol on; + } + } + ``` + +3. Connect Doris through Nginx + + ``` + mysql -uroot -P6030 -h172.31.7.119 + ``` + +4. Verify + + ``` + mysql> show processlist; + +------------------+------+------+-------------------+---------------------+----------+------+---------+------+-------+-----------------------------------+------------------+ + | CurrentConnected | Id | User | Host | LoginTime | Catalog | Db | Command | Time | State | QueryId | Info | + +------------------+------+------+-------------------+---------------------+----------+------+---------+------+-------+-----------------------------------+------------------+ + | Yes | 1 | root | 172.21.0.32:34390 | 2024-03-17 16:32:22 | internal | | Query | 0 | OK | 82edc460d93f4e28-8bbed058a068e259 | show processlist | + +------------------+------+------+-------------------+---------------------+----------+------+---------+------+-------+-----------------------------------+------------------+ + 1 row in set (0.00 sec) + ``` + + If you see the real client IP in the `Host` column, the verification is successful. Otherwise, only the IP address of the proxy service will be visible. + + At the same time, the real client IP will also be recorded in fe.audit.log. + diff --git a/docs/zh-CN/docs/admin-manual/cluster-management/load-balancing.md b/docs/zh-CN/docs/admin-manual/cluster-management/load-balancing.md index 2cdde158b5..2abbed802b 100644 --- a/docs/zh-CN/docs/admin-manual/cluster-management/load-balancing.md +++ b/docs/zh-CN/docs/admin-manual/cluster-management/load-balancing.md @@ -470,10 +470,6 @@ OK,到此就结束了,你就可以用 Mysql 客户端,JDBC 等任何连接 ## Nginx TCP反向代理方式 -### 概述 - -Nginx能够实现HTTP、HTTPS协议的负载均衡,也能够实现TCP协议的负载均衡。那么,问题来了,可不可以通过Nginx实现Apache Doris数据库的负载均衡呢?答案是:可以。接下来,就让我们一起探讨下如何使用Nginx实现Apache Doris的负载均衡。 - ### 环境准备 **注意:使用Nginx实现Apache Doris数据库的负载均衡,前提是要搭建Apache Doris的环境,Apache Doris FE的IP和端口分别如下所示, 这里我是用一个FE来做演示的,多个FE只需要在配置里添加多个FE的IP地址和端口即可** @@ -577,63 +573,70 @@ mysql> show databases; | test | +--------------------+ 2 rows in set (0.00 sec) - -mysql> use test; -Reading table information for completion of table and column names -You can turn off this feature to get a quicker startup with -A - -Database changed -mysql> show tables; -+------------------+ -| Tables_in_test | -+------------------+ -| dwd_product_live | -+------------------+ -1 row in set (0.00 sec) -mysql> desc dwd_product_live; -+-----------------+---------------+------+-------+---------+---------+ -| Field | Type | Null | Key | Default | Extra | -+-----------------+---------------+------+-------+---------+---------+ -| dt | DATE | Yes | true | NULL | | -| proId | BIGINT | Yes | true | NULL | | -| authorId | BIGINT | Yes | true | NULL | | -| roomId | BIGINT | Yes | true | NULL | | -| proTitle | VARCHAR(1024) | Yes | false | NULL | REPLACE | -| proLogo | VARCHAR(1024) | Yes | false | NULL | REPLACE | -| shopId | BIGINT | Yes | false | NULL | REPLACE | -| shopTitle | VARCHAR(1024) | Yes | false | NULL | REPLACE | -| profrom | INT | Yes | false | NULL | REPLACE | -| proCategory | BIGINT | Yes | false | NULL | REPLACE | -| proPrice | DECIMAL(18,2) | Yes | false | NULL | REPLACE | -| couponPrice | DECIMAL(18,2) | Yes | false | NULL | REPLACE | -| livePrice | DECIMAL(18,2) | Yes | false | NULL | REPLACE | -| volume | BIGINT | Yes | false | NULL | REPLACE | -| addedTime | BIGINT | Yes | false | NULL | REPLACE | -| offTimeUnix | BIGINT | Yes | false | NULL | REPLACE | -| offTime | BIGINT | Yes | false | NULL | REPLACE | -| createTime | BIGINT | Yes | false | NULL | REPLACE | -| createTimeUnix | BIGINT | Yes | false | NULL | REPLACE | -| amount | DECIMAL(18,2) | Yes | false | NULL | REPLACE | -| views | BIGINT | Yes | false | NULL | REPLACE | -| commissionPrice | DECIMAL(18,2) | Yes | false | NULL | REPLACE | -| proCostPrice | DECIMAL(18,2) | Yes | false | NULL | REPLACE | -| proCode | VARCHAR(1024) | Yes | false | NULL | REPLACE | -| proStatus | INT | Yes | false | NULL | REPLACE | -| status | INT | Yes | false | NULL | REPLACE | -| maxPrice | DECIMAL(18,2) | Yes | false | NULL | REPLACE | -| liveView | BIGINT | Yes | false | NULL | REPLACE | -| firstCategory | BIGINT | Yes | false | NULL | REPLACE | -| secondCategory | BIGINT | Yes | false | NULL | REPLACE | -| thirdCategory | BIGINT | Yes | false | NULL | REPLACE | -| fourCategory | BIGINT | Yes | false | NULL | REPLACE | -| minPrice | DECIMAL(18,2) | Yes | false | NULL | REPLACE | -| liveVolume | BIGINT | Yes | false | NULL | REPLACE | -| liveClick | BIGINT | Yes | false | NULL | REPLACE | -| extensionId | VARCHAR(128) | Yes | false | NULL | REPLACE | -| beginTime | BIGINT | Yes | false | NULL | REPLACE | -| roomTitle | TEXT | Yes | false | NULL | REPLACE | -| beginTimeUnix | BIGINT | Yes | false | NULL | REPLACE | -| nickname | TEXT | Yes | false | NULL | REPLACE | -+-----------------+---------------+------+-------+---------+---------+ -40 rows in set (0.06 sec) ``` + +## IP 透传 + +自 2.1.1 版本开始,Doris 支持 [Proxy Protocol](https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt) 协议。利用这个协议,可以是实现负载均衡的 IP 透传,从而在经过负载均衡后,Doris 依然可以获取客户端的真实 IP,实现白名单等权限控制。 + +> 注: +> +> 1. 仅支持 Proxy Protocol V1。 +> +> 2. 仅支持并做用于 MySQL 协议端口,不支持和影响 HTTP、ADBC 等其他协议端口。 +> +> 3. 开启后,必须使用 Proxy Protocol 协议进行连接,否则连接失败。 + +下面以 Nginx 为例,介绍如何实现 IP 透传。 + +1. Doris 开启 Proxy Protocol + + 在 FE 的 fe.conf 中添加: + + ``` + enable_proxy_protocol = true + ``` + +2. Nginx 开启 Proxy Protocol + + ``` + events { + worker_connections 1024; + } + stream { + upstream mysqld { + hash $remote_addr consistent; + server 172.31.7.119:9030 weight=1 max_fails=2 fail_timeout=60s; + } + server { + listen 6030; + proxy_connect_timeout 300s; + proxy_timeout 300s; + proxy_pass mysqld; + # Enable Proxy Protocol to the upstream server + proxy_protocol on; + } + } + ``` + +3. 通过代理连接Doris + + ``` + mysql -uroot -P6030 -h172.31.7.119 + ``` + +4. 验证 + + ``` + mysql> show processlist; + +------------------+------+------+-------------------+---------------------+----------+------+---------+------+-------+-----------------------------------+------------------+ + | CurrentConnected | Id | User | Host | LoginTime | Catalog | Db | Command | Time | State | QueryId | Info | + +------------------+------+------+-------------------+---------------------+----------+------+---------+------+-------+-----------------------------------+------------------+ + | Yes | 1 | root | 172.21.0.32:34390 | 2024-03-17 16:32:22 | internal | | Query | 0 | OK | 82edc460d93f4e28-8bbed058a068e259 | show processlist | + +------------------+------+------+-------------------+---------------------+----------+------+---------+------+-------+-----------------------------------+------------------+ + 1 row in set (0.00 sec) + ``` + + 如果在 `Host` 列看到的真实的客户端 IP,则说明验证成功。否则,只能看到代理服务的 IP 地址。 + + 同时,在 fe.audit.log 中也会记录真实的客户端 IP。 diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index b9ce7e5f98..7f4f579cdd 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2511,6 +2511,12 @@ public class Config extends ConfigBase { }) public static String inverted_index_storage_format = "V1"; + @ConfField(description = { + "是否开启 Proxy Protocol 支持", + "Whether to enable proxy protocol" + }) + public static boolean enable_proxy_protocol = false; + //========================================================================== // begin of cloud config //========================================================================== diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java index 4aa6cc85ab..c3cdf8a955 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java @@ -18,12 +18,15 @@ package org.apache.doris.mysql; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; +import org.apache.doris.mysql.ProxyProtocolHandler.ProxyProtocolResult; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ConnectProcessor; import org.apache.doris.qe.ConnectScheduler; import org.apache.doris.qe.MysqlConnectProcessor; +import com.google.common.base.Preconditions; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.xnio.ChannelListener; @@ -71,6 +74,17 @@ public class AcceptListener implements ChannelListener \r\n + * or + * PROXY UNKNOWN xxxx\r\n + */ +public class ProxyProtocolHandler { + private static final Logger LOG = LogManager.getLogger(ProxyProtocolHandler.class); + + private static final byte[] V1_HEADER = "PROXY ".getBytes(StandardCharsets.US_ASCII); + private static final byte[] V2_HEADER + = new byte[] {0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A}; + + private static final String UNKNOWN = "UNKNOWN"; + private static final String TCP4 = "TCP4"; + private static final String TCP6 = "TCP6"; + + public static class ProxyProtocolResult { + public String sourceIP = null; + public int sourcePort = -1; + public String destIp = null; + public int destPort = -1; + public boolean isUnknown = false; + + @Override + public String toString() { + return "ProxyProtocolResult{" + + "sourceIP='" + sourceIP + '\'' + + ", sourcePort=" + sourcePort + + ", destIp='" + destIp + '\'' + + ", destPort=" + destPort + + ", isUnknown=" + isUnknown + + '}'; + } + } + + public static ProxyProtocolResult handle(BytesChannel channel) throws IOException { + // First read 1 byte to see if it is V1 or V2 + ByteBuffer buffer = ByteBuffer.allocate(1); + int readLen = channel.read(buffer); + if (readLen != 1) { + throw new IOException("Invalid proxy protocol, expect incoming bytes first"); + } + buffer.flip(); + byte firstByte = buffer.get(); + if ((char) firstByte == V1_HEADER[0]) { + return handleV1(channel); + } else if (firstByte == V2_HEADER[0]) { + return handleV2(channel); + } else { + throw new IOException("Invalid proxy protocol header in first bytes: " + firstByte + "."); + } + } + + private static ProxyProtocolResult handleV1(BytesChannel channel) throws IOException { + ProxyProtocolResult result = new ProxyProtocolResult(); + + int byteCount = 1; // already read the first byte, so start with 1 + boolean parsingUnknown = false; // true if "UNKNOWN" is found + boolean carriageFound = false; // true if \r is found + String protocol = null; + StringBuilder stringBuilder = new StringBuilder(); + + // read last 5 bytes of "PROXY " + ByteBuffer buffer = ByteBuffer.allocate(5); + int readLen = channel.read(buffer); + if (readLen != 5) { + throw new IOException("Invalid proxy protocol v1, expected \"PROXY \""); + } + byteCount += readLen; + StringBuilder debugInfo = new StringBuilder("PROXY "); + // start reading + buffer = ByteBuffer.allocate(1); + channel.read(buffer); + buffer.flip(); + while (buffer.hasRemaining()) { + char c = (char) buffer.get(); + debugInfo.append(c); + if (parsingUnknown) { + // Found "PROXY UNKNOWN" + // ignore any other bytes until "\r\n" + if (c == '\r') { + carriageFound = true; + } else if (c == '\n') { + if (!carriageFound) { + throw new ProtocolException("Invalid proxy protocol v1. '\\r' is not found before '\\n'", + debugInfo.toString()); + } + result.isUnknown = true; + return result; + } else if (carriageFound) { + throw new ProtocolException("Invalid proxy protocol v1. " + + "'\\r' should follow with '\\n', but see: " + c + ".", debugInfo.toString()); + } + } else if (carriageFound) { + if (c == '\n') { + // eof, set remote ip + if (LOG.isDebugEnabled()) { + LOG.debug("Finish parsing proxy protocol v1. result: {}", result); + } + return result; + } else { + throw new ProtocolException("Invalid proxy protocol v1. " + + "'\\r' should follow with '\\n', but see: " + c + ".", debugInfo.toString()); + } + } else { + switch (c) { + case ' ': + if (result.sourcePort != -1 || stringBuilder.length() == 0) { + throw new ProtocolException("Invalid proxy protocol v1. expecting a '\\r' or a '\\n'", + debugInfo.toString()); + } else if (protocol == null) { + protocol = stringBuilder.toString(); + stringBuilder.setLength(0); + if (protocol.equals(UNKNOWN)) { + parsingUnknown = true; + } else if (!protocol.equals(TCP4) && !protocol.equals(TCP6)) { + throw new ProtocolException("Invalid proxy protocol v1. expecting TCP4/TCP6/UNKNOWN." + + " See: " + protocol + ".", debugInfo.toString()); + } + } else if (result.sourceIP == null) { + result.sourceIP = stringBuilder.toString(); + stringBuilder.setLength(0); + } else if (result.destIp == null) { + result.destIp = stringBuilder.toString(); + stringBuilder.setLength(0); + } else { + result.sourcePort = Integer.parseInt(stringBuilder.toString()); + stringBuilder.setLength(0); + } + break; + case '\r': + if (result.destPort == -1 && result.sourcePort != -1 + && !carriageFound && stringBuilder.length() > 0) { + result.destPort = Integer.parseInt(stringBuilder.toString()); + stringBuilder.setLength(0); + carriageFound = true; + } else if (protocol == null) { + if (UNKNOWN.equals(stringBuilder.toString())) { + parsingUnknown = true; + carriageFound = true; + } + } else { + throw new ProtocolException( + "Invalid proxy protocol v1. Already see '\\r' but no valid info", + debugInfo.toString()); + } + break; + case '\n': + throw new ProtocolException("Invalid proxy protocol v1. '\\r' is not found before '\\n'", + debugInfo.toString()); + default: + stringBuilder.append(c); + } + } + byteCount++; + if (byteCount == 107) { + throw new ProtocolException("Invalid proxy protocol v1, max length(107) exceeds", + debugInfo.toString()); + } else { + buffer.clear(); + channel.read(buffer); + buffer.flip(); + } + } + throw new ProtocolException("Invalid proxy protocol v1, unexpected end of stream", debugInfo.toString()); + } + + private static ProxyProtocolResult handleV2(BytesChannel channel) throws IOException { + throw new IOException("proxy protocol v2 is not supported yet"); + } + + public static class ProtocolException extends IOException { + public ProtocolException(String message, String protocolStr) { + super(message + ": " + protocolStr); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ProxyProtocolHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ProxyProtocolHandlerTest.java new file mode 100644 index 0000000000..13ad0b67d9 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ProxyProtocolHandlerTest.java @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.mysql.BytesChannel; +import org.apache.doris.mysql.ProxyProtocolHandler; + +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.io.IOException; + +public class ProxyProtocolHandlerTest { + + public static class TestChannel implements BytesChannel { + private byte[] data; + private int pos; + + public TestChannel(byte[] data) { + this.data = data; + this.pos = 0; + } + + @Override + public int read(java.nio.ByteBuffer buffer) { + int len = Math.min(buffer.remaining(), data.length - pos); + if (len > 0) { + buffer.put(data, pos, len); + pos += len; + } + return len; + } + } + + private TestChannel testChannel; + + @Test + public void handleV1ProtocolWithValidData() throws IOException { + byte[] data = "PROXY TCP4 192.168.0.1 192.168.0.2 12345 54321\r\n".getBytes(); + testChannel = new TestChannel(data); + ProxyProtocolHandler.ProxyProtocolResult result = ProxyProtocolHandler.handle(testChannel); + Assertions.assertNotNull(result); + Assertions.assertFalse(result.isUnknown); + Assertions.assertEquals("192.168.0.1", result.sourceIP); + Assertions.assertEquals(12345, result.sourcePort); + Assertions.assertEquals("192.168.0.2", result.destIp); + Assertions.assertEquals(54321, result.destPort); + } + + @Test + public void handleV1ProtocolWithUnknown() throws IOException { + byte[] data = "PROXY UNKNOWN xxxxxxxxxxxxxxxxxx\r\n".getBytes(); + testChannel = new TestChannel(data); + ProxyProtocolHandler.ProxyProtocolResult result = ProxyProtocolHandler.handle(testChannel); + Assertions.assertNotNull(result); + Assertions.assertTrue(result.isUnknown); + } + + @Test(expected = IOException.class) + public void handleV1ProtocolWithInvalidProtocol() throws IOException { + byte[] data = "PROXY TCP7 xxx\r\n".getBytes(); + testChannel = new TestChannel(data); + ProxyProtocolHandler.handle(testChannel); + } + + @Test(expected = IOException.class) + public void handleV1ProtocolWithInvalidData() throws IOException { + byte[] data = "INVALID DATA".getBytes(); + testChannel = new TestChannel(data); + ProxyProtocolHandler.handle(testChannel); + } + + @Test(expected = IOException.class) + public void handleV1ProtocolWithIncompleteData() throws IOException { + byte[] data = "PROXY TCP4 192.168.0.1 192.168.0.2 12345".getBytes(); + testChannel = new TestChannel(data); + ProxyProtocolHandler.handle(testChannel); + } + + @Test(expected = IOException.class) + public void handleV1ProtocolWithExtraData() throws IOException { + byte[] data = "PROXY TCP4 192.168.0.1 192.168.0.2 12345 54321 EXTRA DATA\r\n".getBytes(); + testChannel = new TestChannel(data); + ProxyProtocolHandler.handle(testChannel); + } + + @Test + public void handleV1ProtocolWithValidIPv6Data() throws IOException { + byte[] data = "PROXY TCP6 2001:db8:0:1:1:1:1:1 2001:db8:0:1:1:1:1:2 12345 54321\r\n".getBytes(); + testChannel = new TestChannel(data); + ProxyProtocolHandler.ProxyProtocolResult result = ProxyProtocolHandler.handle(testChannel); + Assertions.assertNotNull(result); + Assertions.assertFalse(result.isUnknown); + Assertions.assertEquals("2001:db8:0:1:1:1:1:1", result.sourceIP); + Assertions.assertEquals(12345, result.sourcePort); + Assertions.assertEquals("2001:db8:0:1:1:1:1:2", result.destIp); + Assertions.assertEquals(54321, result.destPort); + } + + @Test(expected = IOException.class) + public void handleV1ProtocolWithInvalidIPv6Data() throws IOException { + byte[] data = "PROXY TCP6 2001:db8:0:1:1:1:1:1 2001:db8:0:1:1:1:1:2 12345 EXTRA DATA\r\n".getBytes(); + testChannel = new TestChannel(data); + ProxyProtocolHandler.handle(testChannel); + } + + @Test(expected = IOException.class) + public void handleV1ProtocolWithIncompleteIPv6Data() throws IOException { + byte[] data = "PROXY TCP6 2001:db8:0:1:1:1:1:1 2001:db8:0:1:1:1:1:2 12345".getBytes(); + testChannel = new TestChannel(data); + ProxyProtocolHandler.handle(testChannel); + } + + @Test(expected = IOException.class) + public void handleV2Protocol() throws IOException { + byte[] data = new byte[] {0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A}; + testChannel = new TestChannel(data); + ProxyProtocolHandler.handle(testChannel); + } +}