[feature](extension) support beats output to doris (#18448)

This commit is contained in:
hailin0
2023-04-16 18:17:48 +08:00
committed by GitHub
parent 0f00ad4d2a
commit 24dd3f19cd
27 changed files with 3226 additions and 0 deletions

View File

@ -81,5 +81,7 @@ header:
- "docker/thirdparties/docker-compose/hive/scripts/create_tpch1_parquet.hql"
- "docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/"
- "docker/thirdparties/docker-compose/iceberg/spark-defaults.conf.tpl"
- "extension/beats/go.mod"
- "extension/beats/go.sum"
comment: on-failure

View File

@ -0,0 +1,280 @@
---
{
"title": "Beats Doris Output Plugin",
"language": "en"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Beats output plugin
This is an output implementation of [elastic beats](https://github.com/elastic/beats) for support [Filebeat](https://github.com/elastic/beats/tree/master/filebeat), [Metricbeat](https://github.com/elastic/beats/tree/master/metricbeat), [Packetbeat](https://github.com/elastic/beats/tree/master/packetbeat), [Winlogbeat](https://github.com/elastic/beats/tree/master/winlogbeat), [Auditbeat](https://github.com/elastic/beats/tree/master/auditbeat), [Heartbeat](https://github.com/elastic/beats/tree/master/heartbeat) to [Apache Doris](https://github.com/apache/doris).
This module is used to output data to Doris for elastic beats, use the HTTP protocol to interact with the Doris FE Http interface, and import data through Doris's stream load.
[Learn more about Doris Stream Load ](../data-operate/import/import-way/stream-load-manual.md)
[Learn more about Doris](/)
## Compatibility
This output is developed and tested using Beats 7.3.1
## Install
### Download source code
```
mkdir -p $GOPATH/src/github.com/apache/
cd $GOPATH/src/github.com/apache/
git clone https://github.com/apache/doris
cd doris/extension/beats
```
### Compile
```
go build -o filebeat filebeat/filebeat.go
go build -o metricbeat metricbeat/metricbeat.go
go build -o winlogbeat winlogbeat/winlogbeat.go
go build -o packetbeat packetbeat/packetbeat.go
go build -o auditbeat auditbeat/auditbeat.go
go build -o heartbeat heartbeat/heartbeat.go
```
You will get executables in various subdirectories
## Usage
In this section, you can use the sample config file in the directory [./example/], or you can create it as follow steps.
### Configure Beat
Add following configuration to `*beat.yml`
```yml
output.doris:
fenodes: ["http://localhost:8030"] # your doris fe address
user: root # your doris user
password: root # your doris password
database: example_db # your doris database
table: example_table # your doris table
codec_format_string: "%{[message]}" # beat-event format expression to row data
headers:
column_separator: ","
```
### Start Beat
Using filebeat as an example
```
./filebeat/filebeat -c filebeat.yml -e
```
## Configurations
Connection doris configuration:
| Name | Description | Default |
|----------------|---------------------------------------------------------------------------------------------------------------------------------------------|-------------|
| fenodes | FE's HTTP interactive address eg : ["http://fe1:8030", "http://fe2:8030"] | |
| user | User name, the user needs to have import permission for the doris table | |
| password | Password | |
| database | Database name | |
| table | Table name | |
| label_prefix | Import the identification prefix, the final generated ID is *{label\_prefix}\_{db}\_{table}\_{time_stamp}* | doris_beats |
| line_delimiter | Used to specify the newline character in the imported data, the default is \n. Combinations of multiple characters can be used as newlines. | \n |
| headers | Users can pass in [stream-load import parameters](../data-operate/import/import-way/stream-load-manual.md) through the headers. | |
Beats configuration:
| Name | Description | Default |
|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| codec_format_string | Set the [expression](https://www.elastic.co/guide/en/beats/filebeat/7.3/configuration-output-codec.html) of format `beat event`, and the format result will be added into http body as a row data | |
| codec | Beats [output codec](https://www.elastic.co/guide/en/beats/filebeat/7.3/configuration-output-codec.html) and the format result will be added to http body as a row, Priority to use `codec_format_string` | |
| timeout | Set the http client connection timeout | |
| bulk_max_size | The maximum number of events processed per batch | 100000 |
| max_retries | Filebeat ignores the max_retries setting and retries indefinitely. | 3 |
| backoff.init | The number of seconds to wait before trying to reconnect after a network error. | 1 |
| backoff.max | The maximum number of seconds to wait before attempting to connect after a network error. | 60 |
## Complete usage example of filebeat
### Init Doris
```sql
CREATE DATABASE example_db;
CREATE TABLE example_db.example_table (
id BIGINT,
name VARCHAR(100)
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num"="1"
);
```
### Configure Filebeat
Create `/tmp/beats/filebeat.yml` file and add following configuration:
```yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /tmp/beats/example.log
output.doris:
fenodes: ["http://localhost:8030"] # your doris fe address
user: root # your doris user
password: root # your doris password
database: example_db # your doris database
table: example_table # your doris table
codec_format_string: "%{[message]}"
headers:
column_separator: ","
```
### Start Filebeat
```
./filebeat/filebeat -c /tmp/beats/filebeat.yml -e
```
### Validate Load Data
Add write data to `/tmp/beats/example.log`
```shell
echo -e "1,A\n2,B\n3,C\n4,D" >> /tmp/beats/example.log
```
Observe the filebeat log. If the error log is not printed, the import was successful. At this time, you can view the imported data in the `example_db.example_table` table
## More configure examples
### Specify columns
Make `/tmp/beats/example.log` and add following content:
```csv
1,A
2,B
```
Configure `columns`
```yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /tmp/beats/example.log
output.doris:
...
codec_format_string: "%{[message]}"
headers:
columns: "id,name"
```
### Collect json file
Make `/tmp/beats/example.json` and add following content:
```json
{"id": 1, "name": "A"}
{"id": 2, "name": "B"}
```
Configure `headers`
```yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /tmp/beats/example.json
output.doris:
...
codec_format_string: "%{[message]}"
headers:
format: json
read_json_by_line: true
```
### Codec output fields
Make `/tmp/beats/example.log` and add following content:
```csv
1,A
2,B
```
Configure `codec_format_string`
```yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /tmp/beats/example.log
output.doris:
...
codec_format_string: "%{[message]},%{[@timestamp]},%{[@metadata.type]}"
headers:
columns: "id,name,beat_timestamp,beat_metadata_type"
```
## FAQ
### How to config batch commit size
Add following configuration to your `beat.yml`
This sample configuration forwards events to the doris if 10000 events are available or the oldest available event has been waiting for 5s in the [mem queue](https://www.elastic.co/guide/en/beats/filebeat/7.3/configuring-internal-queue.html#configuration-internal-queue-memory):
```yml
queue.mem:
events: 10000
flush.min_events: 10000
flush.timeout: 5s
```
### How to use other beats(e.g metricbeat)
Doris beats support all beats modules, see the [Install](#Install) and [Usage](#Usage)
### How to build docker image
You can package a docker image with an executable file of [Install](#Install) outputs

View File

@ -232,6 +232,7 @@
"ecosystem/kyuubi",
"ecosystem/mysql-to-doris",
"ecosystem/logstash",
"ecosystem/beats",
"ecosystem/plugin-development-manual",
"ecosystem/audit-plugin",
"ecosystem/cloudcanal",

View File

@ -0,0 +1,282 @@
---
{
"title": "Beats Doris Output Plugin",
"language": "zh-CN"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Beats output plugin
这是 [elastic beats](https://github.com/elastic/beats) 的输出实现,支持 [Filebeat](https://github.com/elastic/beats/tree/master/filebeat), [Metricbeat](https://github.com/elastic/beats/tree/master/metricbeat), [Packetbeat](https://github.com/elastic/beats/tree/master/packetbeat), [Winlogbeat](https://github.com/elastic/beats/tree/master/winlogbeat), [Auditbeat](https://github.com/elastic/beats/tree/master/auditbeat), [Heartbeat](https://github.com/elastic/beats/tree/master/heartbeat) 到 Apache Doris。
该插件用于 beats 输出数据到 Doris,使用 HTTP 协议与 Doris FE Http 接口交互,并通过 Doris 的 stream load 的方式进行数据导入.
[了解Doris Stream Load](../data-operate/import/import-way/stream-load-manual.md)
[了解更多关于Doris](/zh-CN)
## 兼容性
此插件是使用 Beats 7.3.1 开发和测试的
## 安装
### 下载源码
```
mkdir -p $GOPATH/src/github.com/apache/
cd $GOPATH/src/github.com/apache/
git clone https://github.com/apache/doris
cd doris/extension/beats
```
### 编译
在 extension/beats/ 目录下执行
```
go build -o filebeat filebeat/filebeat.go
go build -o metricbeat metricbeat/metricbeat.go
go build -o winlogbeat winlogbeat/winlogbeat.go
go build -o packetbeat packetbeat/packetbeat.go
go build -o auditbeat auditbeat/auditbeat.go
go build -o heartbeat heartbeat/heartbeat.go
```
您将在各个子目录目录下得到可执行文件
## 使用
您可以使用目录 [./example/] 中的示例配置文件,也可以按照以下步骤创建它。
### 配置 Beat
添加以下配置到 `*beat.yml`
```yml
output.doris:
fenodes: ["http://localhost:8030"] # your doris fe address
user: root # your doris user
password: root # your doris password
database: example_db # your doris database
table: example_table # your doris table
codec_format_string: "%{[message]}" # beat-event format expression to row data
headers:
column_separator: ","
```
### 启动 Beat
使用 filebeat 作为示例
```
./filebeat/filebeat -c filebeat.yml -e
```
## 配置说明
连接 doris 配置:
| Name | Description | Default |
|----------------|-----------------------------------------------------------------------------------------------|-------------|
| fenodes | FE 的 HTTP交互地址。 例如: ["http://fe1:8030", "http://fe2:8030"] | |
| user | 用户名,该用户需要有 Doris 对应库表的导入权限 | |
| password | 密码 | |
| database | 数据库名 | |
| table | 表名 | |
| label_prefix | 导入标识前缀,最终生成的标识为 *{label\_prefix}\_{db}\_{table}\_{time_stamp}* | doris_beats |
| line_delimiter | 用于指定导入数据中的换行符,可以使用做多个字符的组合作为换行符。 | \n |
| headers | 用户可以通过 headers 传入 [stream-load 导入参数](../data-operate/import/import-way/stream-load-manual.md) | |
Beats 配置:
| Name | Description | Default |
|---------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| codec_format_string | 设置格式化 beats 事件的[表达式](https://www.elastic.co/guide/en/beats/filebeat/7.3/configuration-output-codec.html),格式化结果会作为行数据添加到 http body 中 | |
| codec | beats [输出编解码器](https://www.elastic.co/guide/en/beats/filebeat/7.3/configuration-output-codec.html),格式结果将作为一行添加到 http body 中,优先使用 `codec_format_string` | |
| timeout | 设置 http 客户端超时时间 | |
| bulk_max_size | 批处理的最大事件数 | 100000 |
| max_retries | 发送失败时的最大重试次数,Filebeat 忽略 max_retries 设置并无限期重试。 | 3 |
| backoff.init | 网络错误后尝试重新连接之前等待的秒数 | 1 |
| backoff.max | 网络错误后尝试连接之前等待的最大秒数 | 60 |
## 完整使用示例(Filebeat)
### 初始化 Doris
```sql
CREATE DATABASE example_db;
CREATE TABLE example_db.example_table (
id BIGINT,
name VARCHAR(100)
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num"="1"
);
```
### 配置 Filebeat
创建 `/tmp/beats/filebeat.yml` 文件并添加以下配置:
```yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /tmp/beats/example.log
output.doris:
fenodes: ["http://localhost:8030"] # your doris fe address
user: root # your doris user
password: root # your doris password
database: example_db # your doris database
table: example_table # your doris table
codec_format_string: "%{[message]}"
headers:
column_separator: ","
```
### 启动 Filebeat
```
./filebeat/filebeat -c /tmp/beats/filebeat.yml -e
```
### 验证数据导入
添加数据到文件 `/tmp/beats/example.log`
```shell
echo -e "1,A\n2,B\n3,C\n4,D" >> /tmp/beats/example.log
```
观察 filebeat 日志,如果没有打印错误日志,则导入成功。 这时可以在 example_db.example_table 表中查看导入的数据
## 更多配置示例
### 指定导入的 columns
创建 `/tmp/beats/example.log` 文件并添加以下内容:
```csv
1,A
2,B
```
配置 `columns`
```yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /tmp/beats/example.log
output.doris:
...
codec_format_string: "%{[message]}"
headers:
columns: "id,name"
```
### 采集 json 文件
创建 `/tmp/beats/example.json` 文件并添加以下内容:
```json
{"id": 1, "name": "A"}
{"id": 2, "name": "B"}
```
配置 `headers`
```yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /tmp/beats/example.json
output.doris:
...
codec_format_string: "%{[message]}"
headers:
format: json
read_json_by_line: true
```
### 编码输出字段
创建 `/tmp/beats/example.log` 文件并添加以下内容:
```csv
1,A
2,B
```
配置 `codec_format_string`
```yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /tmp/beats/example.log
output.doris:
...
codec_format_string: "%{[message]},%{[@timestamp]},%{[@metadata.type]}"
headers:
columns: "id,name,beat_timestamp,beat_metadata_type"
```
## 常见问题
### 如何配置批处理提交大小
添加以下内容到您的 `*beat.yml` 文件中
它表示,如果有 10000 个事件可用或最旧的可用事件已在[内存队列](https://www.elastic.co/guide/en/beats/filebeat/7.3/configuring-internal-queue.html#configuration-internal-queue-memory)中等待 5 秒,此示例配置会将事件批量转发给 doris:
```yml
queue.mem:
events: 10000
flush.min_events: 10000
flush.timeout: 5s
```
### 如何使用其他的 beats(例如 metricbeat)
Doris beats 支持所有的 beats 模块,使用方式参见 [安装](#安装) 与 [使用](#使用)
### 如何构建 docker 镜像
可以使用 [安装](#安装) 输出的可执行文件打包 docker 镜像

39
extension/beats/.gitignore vendored Normal file
View File

@ -0,0 +1,39 @@
# Directories
/.vagrant
/.idea
/.vscode
/build
/*/data
/*/logs
/*/fields.yml
/*/*.template*.json
**/html_docs
/*/_meta/kibana.generated
# Files
.DS_Store
/beats.iml
*.dev.yml
*.generated.yml
coverage.out
.python-version
beat.db
*.keystore
# Editor swap files
*.swp
*.swo
*.swn
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
*.exe
*.test
*.prof
*.pyc
data/*
logs/*
dist

16
extension/beats/LICENSE Normal file
View File

@ -0,0 +1,16 @@
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

42
extension/beats/README.md Normal file
View File

@ -0,0 +1,42 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Beats output plugin
## Compatibility
This output is developed and tested using Beats 7.3.1
## How to build
```
go build -o filebeat filebeat/filebeat.go
go build -o metricbeat metricbeat/metricbeat.go
go build -o winlogbeat winlogbeat/winlogbeat.go
go build -o packetbeat packetbeat/packetbeat.go
go build -o auditbeat auditbeat/auditbeat.go
go build -o heartbeat heartbeat/heartbeat.go
```
## How to use
See:
- https://doris.apache.org/zh-CN/docs/dev/ecosystem/beats
- https://doris.apache.org/docs/dev/ecosystem/beats

3
extension/beats/auditbeat/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
# beat binary
auditbeat
auditbeat.exe

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package main
import (
"os"
_ "github.com/apache/doris/extension/beats/doris"
"github.com/elastic/beats/v7/auditbeat/cmd"
// Register modules.
_ "github.com/elastic/beats/v7/auditbeat/module/auditd"
_ "github.com/elastic/beats/v7/auditbeat/module/file_integrity"
// Register includes.
_ "github.com/elastic/beats/v7/auditbeat/include"
)
func main() {
if err := cmd.RootCmd.Execute(); err != nil {
os.Exit(1)
}
}

View File

@ -0,0 +1,200 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package doris
import (
"context"
"fmt"
"net/http"
"net/http/httputil"
"strings"
"time"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/beats/v7/libbeat/publisher"
)
type client struct {
url string
httpClient *http.Client
headers map[string]string
beat beat.Info
codec codec.Codec
database string
table string
labelPrefix string
lineDelimiter string
observer outputs.Observer
logger *logp.Logger
}
type clientSettings struct {
URL string
Timeout time.Duration
Headers map[string]string
Database string
Table string
LabelPrefix string
LineDelimiter string
Beat beat.Info
Codec codec.Codec
Observer outputs.Observer
Logger *logp.Logger
}
func (s clientSettings) String() string {
return fmt.Sprintf("clientSettings{%s, %s, %s, %s}", s.URL, s.Timeout, s.LabelPrefix, s.Headers)
}
func NewDorisClient(s clientSettings) (*client, error) {
s.Logger.Infof("Received settings: %s", s)
client := &client{
url: s.URL,
httpClient: &http.Client{
Timeout: s.Timeout,
},
headers: s.Headers,
database: s.Database,
table: s.Table,
labelPrefix: s.LabelPrefix,
lineDelimiter: s.LineDelimiter,
codec: s.Codec,
observer: s.Observer,
logger: s.Logger,
}
return client, nil
}
func (client *client) Connect() error {
return nil
}
func (client *client) Close() error {
return nil
}
func (client *client) String() string {
return fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix, client.headers)
}
func (client *client) Publish(_ context.Context, batch publisher.Batch) error {
events := batch.Events()
length := len(events)
client.logger.Debugf("Received events: %d", length)
label := fmt.Sprintf("%s_%s_%s_%d", client.labelPrefix, client.database, client.table, time.Now().UnixMilli())
rest, err := client.publishEvents(label, events)
if len(rest) == 0 {
batch.ACK()
client.logger.Debugf("Success send events: %d", length)
} else {
client.observer.Failed(length)
batch.RetryEvents(rest)
client.logger.Warnf("Retry send events: %d", length)
}
return err
}
func (client *client) publishEvents(lable string, events []publisher.Event) ([]publisher.Event, error) {
begin := time.Now()
var logFirstEvent []byte
var stringBuilder strings.Builder
dropped := 0
for i := range events {
event := &events[i]
serializedEvent, err := client.codec.Encode(client.beat.Beat, &event.Content)
if err != nil {
if event.Guaranteed() {
client.logger.Errorf("Failed to serialize the event: %+v", err)
} else {
client.logger.Warnf("Failed to serialize the event: %+v", err)
}
client.logger.Debugf("Failed event: %v", event)
dropped++
continue
}
if logFirstEvent == nil {
logFirstEvent = serializedEvent
}
stringBuilder.Write(serializedEvent)
stringBuilder.WriteString(client.lineDelimiter)
}
request, requestErr := http.NewRequest(http.MethodPut, client.url, strings.NewReader(stringBuilder.String()))
if requestErr != nil {
client.logger.Errorf("Failed to create request: %s", requestErr)
return events, requestErr
}
request.Header.Set("label", lable)
for k, v := range client.headers {
request.Header.Set(k, v)
}
response, responseErr := client.httpClient.Do(request)
if responseErr != nil {
client.logger.Errorf("Failed to stream-load request: %v", responseErr)
return events, responseErr
}
defer response.Body.Close()
if response.StatusCode < 200 || response.StatusCode >= 300 {
client.logger.Errorf("Failed to stream-load request with status code %d",
response.StatusCode)
reqBytes, reqErr := httputil.DumpRequestOut(request, false)
if reqErr != nil {
client.logger.Errorf("Failed to dump stream-load request: %v", reqErr)
} else {
client.logger.Errorf("Stream-Load request dump:\n%s\n first event: %s",
string(reqBytes), string(logFirstEvent))
}
respBytes, respErr := httputil.DumpResponse(response, true)
if respErr != nil {
client.logger.Errorf("Failed to dump stream-load response: %v", respErr)
} else {
client.logger.Errorf("Stream-Load response dump:\n%s", string(respBytes))
}
return events, nil
}
client.logger.Debugf("Stream-Load publish events: %d events have been published to doris in %v.",
len(events)-dropped,
time.Now().Sub(begin))
client.observer.Dropped(dropped)
client.observer.Acked(len(events) - dropped)
return nil, nil
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package doris
import (
"encoding/base64"
"errors"
"strings"
"time"
"github.com/elastic/beats/v7/libbeat/outputs/codec"
)
type config struct {
Hosts []string `config:"fenodes" validate:"required"`
User string `config:"user"`
Password string `config:"password"`
Database string `config:"database" validate:"required"`
Table string `config:"table" validate:"required"`
LabelPrefix string `config:"label_prefix"`
LineDelimiter string `config:"line_delimiter"`
Headers map[string]string `config:"headers"`
CodecFormatString string `config:"codec_format_string"`
Codec codec.Config `config:"codec"`
Timeout time.Duration `config:"timeout"`
BulkMaxSize int `config:"bulk_max_size" validate:"min=1,nonzero"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
Backoff backoff `config:"backoff"`
}
type backoff struct {
Init time.Duration `config:"init"`
Max time.Duration `config:"max"`
}
func defaultConfig() config {
return config{
Password: "",
LabelPrefix: "doris_beats",
LineDelimiter: "\n",
BulkMaxSize: 100000,
MaxRetries: 3,
Backoff: backoff{
Init: 1 * time.Second,
Max: 60 * time.Second,
},
}
}
func (c *config) Validate() error {
if len(c.Hosts) == 0 {
return errors.New("no http_hosts configured")
}
if len(c.Database) == 0 {
return errors.New("no database configured")
}
if len(c.Table) == 0 {
return errors.New("no table configured")
}
if len(c.CodecFormatString) == 0 && &c.Codec == nil {
return errors.New("no codec_format_expression|codec configured")
}
return nil
}
func (c *config) createHeaders() map[string]string {
headers := make(map[string]string)
headers["Expect"] = "100-continue"
headers["Content-Type"] = "text/plain;charset=utf-8"
if len(c.User) != 0 {
headers["Authorization"] = "Basic " + base64.StdEncoding.EncodeToString([]byte(c.User+":"+c.Password))
}
if len(c.LineDelimiter) != 0 && !strings.EqualFold(c.LineDelimiter, "\n") {
headers["line_delimiter"] = c.LineDelimiter
}
if c.Headers != nil && len(c.Headers) != 0 {
for k, v := range c.Headers {
if strings.EqualFold("line_delimiter", k) {
c.LineDelimiter = v
}
headers[k] = v
}
}
return headers
}

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package doris
import (
"fmt"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/codec"
)
const logSelector = "doris"
func init() {
outputs.RegisterType("doris", makeDoris)
}
func makeDoris(
_ outputs.IndexManager,
beat beat.Info,
observer outputs.Observer,
cfg *common.Config,
) (outputs.Group, error) {
logger := logp.NewLogger(logSelector)
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return outputs.Fail(err)
}
if err := config.Validate(); err != nil {
return outputs.Fail(err)
}
codecConfig := loadCodecConfig(config)
codec, err := codec.CreateEncoder(beat, codecConfig)
if err != nil {
return outputs.Fail(err)
}
clients := make([]outputs.NetworkClient, len(config.Hosts))
for i, host := range config.Hosts {
logger.Info("Making client for host: " + host)
url, err := parseURL(host)
if err != nil {
return outputs.Fail(err)
}
streamLoadPath := fmt.Sprintf("/api/%s/%s/_stream_load", config.Database, config.Table)
hostURL, err := common.MakeURL(url.Scheme, streamLoadPath, host, 80)
if err != nil {
logger.Errorf("Invalid host param set: %s, Error: %+v", host, err)
return outputs.Fail(err)
}
logger.Infof("Final http connection endpoint: %s", hostURL)
var client outputs.NetworkClient
client, err = NewDorisClient(clientSettings{
URL: hostURL,
Timeout: config.Timeout,
Observer: observer,
Headers: config.createHeaders(),
Codec: codec,
LineDelimiter: config.LineDelimiter,
LabelPrefix: config.LabelPrefix,
Database: config.Database,
Table: config.Table,
Logger: logger,
})
if err != nil {
return outputs.Fail(err)
}
client = outputs.WithBackoff(client, config.Backoff.Init, config.Backoff.Max)
clients[i] = client
}
retry := 0
if config.MaxRetries < 0 {
retry = -1
} else {
retry = config.MaxRetries
}
return outputs.SuccessNet(true, config.BulkMaxSize, retry, clients)
}
func loadCodecConfig(config config) codec.Config {
if len(config.CodecFormatString) == 0 {
return config.Codec
}
beatConfig := common.MustNewConfigFrom(map[string]interface{}{
"format.string": config.CodecFormatString,
})
codecConfig := codec.Config{}
if err := beatConfig.Unpack(&codecConfig); err != nil {
panic(err)
}
return codecConfig
}

View File

@ -0,0 +1,288 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package doris
import (
"context"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"net/http"
"strings"
"testing"
"time"
"gotest.tools/assert"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/outputs"
_ "github.com/elastic/beats/v7/libbeat/outputs/codec/format"
_ "github.com/elastic/beats/v7/libbeat/outputs/codec/json"
"github.com/elastic/beats/v7/libbeat/outputs/outest"
)
var fenodes = flag.String("fenodes", "http://localhost:8030", "fe node address")
var user = flag.String("user", "root", "doris user")
var password = flag.String("password", "", "doris password")
var database = flag.String("database", "test", "doris database")
func TestDorisPublish(t *testing.T) {
dorisConfig := map[string]interface{}{
"fenodes": strings.Split(*fenodes, ","),
"user": *user,
"password": *password,
"database": *database,
}
fmt.Println(dorisConfig)
testDorisPublishMessage(t, dorisConfig)
}
type eventInfo struct {
events []beat.Event
}
func makeConfig(t *testing.T, in map[string]interface{}) *common.Config {
cfg, err := common.NewConfigFrom(in)
if err != nil {
t.Fatal(err)
}
return cfg
}
func flatten(infos []eventInfo) []beat.Event {
var out []beat.Event
for _, info := range infos {
out = append(out, info.events...)
}
return out
}
func testDorisPublishMessage(t *testing.T, cfg map[string]interface{}) {
tests := []struct {
title string
config map[string]interface{}
events []eventInfo
}{
{
"test csv events",
map[string]interface{}{
"table": "test_beats_csv",
"codec_format_string": "%{[message]},%{[@timestamp]},%{[@metadata.type]}",
"headers": map[string]interface{}{
"column_separator": ",",
},
},
[]eventInfo{
{
events: []beat.Event{
{
Timestamp: time.Now(),
Meta: common.MapStr{
"beat": "filebeat",
"type": "_doc",
"version": "7.4.2",
},
Fields: common.MapStr{
"message": "1,A",
},
},
{
Timestamp: time.Now(),
Meta: common.MapStr{
"beat": "filebeat",
"type": "_doc",
"version": "7.4.2",
},
Fields: common.MapStr{
"message": "2,B",
},
},
},
},
},
},
{
"test json events",
map[string]interface{}{
"table": "test_beats_json",
"codec_format_string": "%{[message]}",
"headers": map[string]interface{}{
"format": "json",
"read_json_by_line": true,
},
},
[]eventInfo{
{
events: []beat.Event{
{
Timestamp: time.Now(),
Meta: common.MapStr{
"beat": "filebeat",
"type": "_doc",
"version": "7.4.2",
},
Fields: common.MapStr{
"message": "{\"id\": 1, \"name\": \"A\"}",
},
},
{
Timestamp: time.Now(),
Meta: common.MapStr{
"beat": "filebeat",
"type": "_doc",
"version": "7.4.2",
},
Fields: common.MapStr{
"message": "{\"id\": 2, \"name\": \"B\"}",
},
},
},
},
},
},
{
"test codec events",
map[string]interface{}{
"table": "test_beats_codec",
"codec_format_string": "{\"id\": %{[fields.id]}, \"name\": \"%{[fields.name]}\", \"timestamp\": \"%{[@timestamp]}\", \"type\": \"%{[@metadata.type]}\"}",
"headers": map[string]interface{}{
"format": "json",
"read_json_by_line": true,
},
},
[]eventInfo{
{
events: []beat.Event{
{
Timestamp: time.Now(),
Meta: common.MapStr{
"beat": "filebeat",
"type": "_doc",
"version": "7.4.2",
},
Fields: common.MapStr{
"fields": common.MapStr{
"id": 1,
"name": "A",
},
},
},
{
Timestamp: time.Now(),
Meta: common.MapStr{
"beat": "filebeat",
"type": "_doc",
"version": "7.4.2",
},
Fields: common.MapStr{
"fields": common.MapStr{
"id": 2,
"name": "B",
},
},
},
},
},
},
},
}
for i, test := range tests {
config := makeConfig(t, cfg)
if test.config != nil {
config.Merge(makeConfig(t, test.config))
}
name := fmt.Sprintf("run test(%v): %v", i, test.title)
t.Run(name, func(t *testing.T) {
grp, err := makeDoris(nil, beat.Info{Beat: "libbeat"}, outputs.NewNilObserver(), config)
if err != nil {
t.Fatal(err)
}
output := grp.Clients[0].(outputs.NetworkClient)
if err := output.Connect(); err != nil {
t.Fatal(err)
}
defer output.Close()
// publish test events
for i := range test.events {
batch := outest.NewBatch(test.events[i].events...)
output.Publish(context.Background(), batch)
}
expected := flatten(test.events)
stored := testReadFromDoris(t, config)
assert.Equal(t, len(expected), stored)
})
}
}
type httpResponse struct {
data map[string]int64 `json:"data"`
}
func testReadFromDoris(t *testing.T, config *common.Config) int64 {
fenode, feErr := config.String("fenodes", 0)
if feErr != nil {
t.Fatal(feErr)
}
user, userErr := config.String("user", -1)
if userErr != nil {
t.Fatal(userErr)
}
password, pwErr := config.String("password", -1)
if pwErr != nil {
t.Fatal(pwErr)
}
database, dbErr := config.String("database", -1)
if dbErr != nil {
t.Fatal(dbErr)
}
table, tableErr := config.String("table", -1)
if tableErr != nil {
t.Fatal(tableErr)
}
getTableCountURL := fmt.Sprintf("%s/api/rowcount?db=%s&table=%s", fenode, database, table)
request, requestErr := http.NewRequest(http.MethodGet, getTableCountURL, strings.NewReader(""))
request.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(user+":"+password)))
if requestErr != nil {
t.Fatal(requestErr)
}
resp, responseErr := http.DefaultClient.Do(request)
if responseErr != nil || resp.StatusCode >= 300 || resp.StatusCode < 200 {
t.Fatal(responseErr)
}
defer resp.Body.Close()
body, readErr := ioutil.ReadAll(resp.Body)
if readErr != nil {
t.Fatal(readErr)
}
var response httpResponse
if jsonErr := json.Unmarshal(body, &response); jsonErr != nil {
t.Fatal(jsonErr)
}
return response.data[table]
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package doris
import (
"net/url"
"strings"
)
func parseURL(raw string) (*url.URL, error) {
if raw == "" {
return nil, nil
}
parsedUrl, err := url.Parse(raw)
if err == nil && strings.HasPrefix(parsedUrl.Scheme, "http") {
return parsedUrl, err
}
// Proxy was bogus. Try prepending "http://" to it and
// see if that parses correctly.
return url.Parse("http://" + raw)
}

View File

@ -0,0 +1,37 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
filebeat.inputs:
- type: log
enabled: true
paths:
- /tmp/beats/example.log
output.doris:
fenodes: [ "http://localhost:8030" ] # your doris fe address
user: root # your doris user
#password: root # your doris password
database: example_db # your doris database
table: example_table # your doris table
codec_format_string: "%{[message]}" # beat-event format expression to row data
headers:
column_separator: ","
logging.level: debug

3
extension/beats/filebeat/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
# beat binary
filebeat
filebeat.exe

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package main
import (
"os"
_ "github.com/apache/doris/extension/beats/doris"
"github.com/elastic/beats/v7/filebeat/cmd"
inputs "github.com/elastic/beats/v7/filebeat/input/default-inputs"
)
func main() {
if err := cmd.Filebeat(inputs.Init, cmd.FilebeatSettings()).Execute(); err != nil {
os.Exit(1)
}
}

185
extension/beats/go.mod Normal file
View File

@ -0,0 +1,185 @@
module github.com/apache/doris/extension/beats
go 1.20
require (
github.com/elastic/beats/v7 v7.17.5
gotest.tools v2.2.0+incompatible
)
require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/Microsoft/go-winio v0.5.1 // indirect
github.com/PaesslerAG/gval v1.0.0 // indirect
github.com/PaesslerAG/jsonpath v0.1.1 // indirect
github.com/Shopify/sarama v0.0.0-00010101000000-000000000000 // indirect
github.com/StackExchange/wmi v0.0.0-20170221213301-9f32b5905fd6 // indirect
github.com/aerospike/aerospike-client-go v1.27.1-0.20170612174108-0f3b54da6bdc // indirect
github.com/armon/go-radix v1.0.0 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/containerd/containerd v1.5.7 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davecgh/go-xdr v0.0.0-20161123171359-e6a2ba005892 // indirect
github.com/digitalocean/go-libvirt v0.0.0-20180301200012-6075ea3c39a1 // indirect
github.com/dlclark/regexp2 v1.1.7-0.20171009020623-7632a260cbaf // indirect
github.com/docker/distribution v2.8.0+incompatible // indirect
github.com/docker/docker v1.4.2-0.20190924003213-a8608b5b67c7 // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/dop251/goja v0.0.0-20200831102558-9af81ddcf0e1 // indirect
github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2 // indirect
github.com/elastic/ecs v1.12.0 // indirect
github.com/elastic/elastic-agent-client/v7 v7.0.0-20210727140539-f0905d9377f6 // indirect
github.com/elastic/go-concert v0.2.0 // indirect
github.com/elastic/go-libaudit/v2 v2.2.0 // indirect
github.com/elastic/go-lumber v0.1.0 // indirect
github.com/elastic/go-seccomp-bpf v1.2.0 // indirect
github.com/elastic/go-structform v0.0.9 // indirect
github.com/elastic/go-sysinfo v1.7.1 // indirect
github.com/elastic/go-txfile v0.0.7 // indirect
github.com/elastic/go-ucfg v0.8.3 // indirect
github.com/elastic/go-windows v1.0.1 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/fatih/color v1.9.0 // indirect
github.com/fsnotify/fsevents v0.1.1 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/go-logr/logr v0.4.0 // indirect
github.com/go-ole/go-ole v1.2.5-0.20190920104607-14974a1cf647 // indirect
github.com/go-sourcemap/sourcemap v2.1.2+incompatible // indirect
github.com/go-sql-driver/mysql v1.5.0 // indirect
github.com/gocarina/gocsv v0.0.0-20170324095351-ffef3ffc77be // indirect
github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e // indirect
github.com/godbus/dbus/v5 v5.0.5 // indirect
github.com/gofrs/flock v0.7.2-0.20190320160742-5135e617513b // indirect
github.com/gofrs/uuid v3.3.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gomodule/redigo v1.8.3 // indirect
github.com/google/flatbuffers v1.12.1 // indirect
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/h2non/filetype v1.1.1 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.0 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/insomniacslk/dhcp v0.0.0-20180716145214-633285ba52b2 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/json-iterator/go v1.1.11 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/lib/pq v1.1.2-0.20190507191818-2ff3cb3adc01 // indirect
github.com/magefile/mage v1.11.0 // indirect
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/mattn/go-isatty v0.0.12 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/miekg/dns v1.1.25 // indirect
github.com/mitchellh/hashstructure v0.0.0-20170116052023-ab25296c0f51 // indirect
github.com/mitchellh/mapstructure v1.3.3 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2-0.20190823105129-775207bd45b6 // indirect
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.10.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/prometheus/prometheus v2.5.0+incompatible // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/samuel/go-parser v0.0.0-20130731160455-ca8abbf65d0e // indirect
github.com/samuel/go-thrift v0.0.0-20140522043831-2187045faa54 // indirect
github.com/santhosh-tekuri/jsonschema v1.2.4 // indirect
github.com/shirou/gopsutil v3.20.12+incompatible // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/cobra v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.7.0 // indirect
github.com/tsg/gopacket v0.0.0-20200626092518-2ab8e397a786 // indirect
github.com/urso/diag v0.0.0-20200210123136-21b3cc8eb797 // indirect
github.com/urso/go-bin v0.0.0-20180220135811-781c575c9f0e // indirect
github.com/urso/magetools v0.0.0-20190919040553-290c89e0c230 // indirect
github.com/urso/sderr v0.0.0-20210525210834-52b04e8f5c71 // indirect
github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41 // indirect
github.com/xdg/scram v1.0.3 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
github.com/yuin/gopher-lua v0.0.0-20170403160031-b402f3114ec7 // indirect
go.elastic.co/apm v1.11.0 // indirect
go.elastic.co/apm/module/apmelasticsearch v1.7.2 // indirect
go.elastic.co/apm/module/apmhttp v1.7.2 // indirect
go.elastic.co/ecszap v0.3.0 // indirect
go.elastic.co/fastjson v1.1.0 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.uber.org/atomic v1.5.0 // indirect
go.uber.org/multierr v1.3.0 // indirect
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee // indirect
go.uber.org/zap v1.14.0 // indirect
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/net v0.0.0-20211020060615-d418f374d309 // indirect
golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20211102192858-4dd72447c267 // indirect
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
golang.org/x/tools v0.1.7 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20211021150943-2b146023228c // indirect
google.golang.org/grpc v1.41.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
gopkg.in/jcmturner/gokrb5.v7 v7.5.0 // indirect
gopkg.in/jcmturner/rpc.v1 v1.1.0 // indirect
gopkg.in/mgo.v2 v2.0.0-20160818020120-3f83fa500528 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
honnef.co/go/tools v0.0.1-2020.1.4 // indirect
howett.net/plist v0.0.0-20181124034731-591f970eefbb // indirect
k8s.io/api v0.21.1 // indirect
k8s.io/apimachinery v0.21.1 // indirect
k8s.io/client-go v0.21.1 // indirect
k8s.io/klog/v2 v2.8.0 // indirect
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect
kernel.org/pub/linux/libs/security/libcap/cap v1.2.57 // indirect
kernel.org/pub/linux/libs/security/libcap/psx v1.2.57 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.1.0 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)
replace (
github.com/Microsoft/go-winio => github.com/bi-zone/go-winio v0.4.15
github.com/Shopify/sarama => github.com/elastic/sarama v1.19.1-0.20210823122811-11c3ef800752
github.com/docker/docker => github.com/docker/engine v0.0.0-20191113042239-ea84732a7725
github.com/docker/go-plugins-helpers => github.com/elastic/go-plugins-helpers v0.0.0-20200207104224-bdf17607b79f
github.com/dop251/goja => github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20
github.com/dop251/goja_nodejs => github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6
github.com/fsnotify/fsevents => github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270
github.com/fsnotify/fsnotify => github.com/adriansr/fsnotify v1.4.8-0.20211018144411-a81f2b630e7c
github.com/google/gopacket => github.com/adriansr/gopacket v1.1.18-0.20200327165309-dd62abfa8a41
github.com/insomniacslk/dhcp => github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 // indirect
)

1365
extension/beats/go.sum Normal file

File diff suppressed because it is too large Load Diff

3
extension/beats/heartbeat/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
# beat binary
heartbeat
heartbeat.exe

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package main
import (
"os"
_ "github.com/apache/doris/extension/beats/doris"
"github.com/elastic/beats/v7/heartbeat/cmd"
_ "github.com/elastic/beats/v7/heartbeat/include"
)
func main() {
if err := cmd.RootCmd.Execute(); err != nil {
os.Exit(1)
}
}

3
extension/beats/metricbeat/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
# beat binary
metricbeat
metricbeat.exe

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package main
import (
"os"
_ "github.com/apache/doris/extension/beats/doris"
"github.com/elastic/beats/v7/metricbeat/cmd"
)
func main() {
if err := cmd.RootCmd.Execute(); err != nil {
os.Exit(1)
}
}

3
extension/beats/packetbeat/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
# beat binary
packetbeat
packetbeat.exe

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package main
import (
"os"
_ "github.com/apache/doris/extension/beats/doris"
"github.com/elastic/beats/v7/packetbeat/cmd"
)
func main() {
if err := cmd.RootCmd.Execute(); err != nil {
os.Exit(1)
}
}

3
extension/beats/winlogbeat/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
# beat binary
winlogbeat
winlogbeat.exe

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package main
import (
"os"
_ "github.com/apache/doris/extension/beats/doris"
"github.com/elastic/beats/v7/winlogbeat/cmd"
)
func main() {
if err := cmd.RootCmd.Execute(); err != nil {
os.Exit(1)
}
}