[Extension] Logstash Doris output plugin (#3800)
This plugin is used to output data to Doris for logstash Use the HTTP protocol to interact with the Doris FE Http interface Load data through Doris's stream load
This commit is contained in:
@ -126,6 +126,7 @@ module.exports = [
|
||||
"plugin-development-manual",
|
||||
"user-defined-function",
|
||||
"spark-doris-connector",
|
||||
"logstash",
|
||||
],
|
||||
},
|
||||
{
|
||||
|
||||
@ -137,6 +137,7 @@ module.exports = [
|
||||
"plugin-development-manual",
|
||||
"user-defined-function",
|
||||
"spark-doris-connector",
|
||||
"logstash",
|
||||
],
|
||||
},
|
||||
{
|
||||
|
||||
198
docs/en/extending-doris/logstash.md
Normal file
198
docs/en/extending-doris/logstash.md
Normal file
@ -0,0 +1,198 @@
|
||||
---
|
||||
{
|
||||
"title": "Logstash Doris Output Plugin",
|
||||
"language": "en"
|
||||
}
|
||||
---
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
# Doris output plugin
|
||||
|
||||
This plugin is used to output data to Doris for logstash, use the HTTP protocol to interact with the Doris FE Http interface, and import data through Doris's stream load.
|
||||
|
||||
[Learn more about Doris Stream Load ](http://doris.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html)
|
||||
|
||||
[Learn more about Doris](http://doris.apache.org/master/zh-CN/)
|
||||
|
||||
|
||||
## Install and compile
|
||||
### 1.Download source code
|
||||
|
||||
### 2.compile ##
|
||||
Execute under extension/logstash/ directory
|
||||
|
||||
`gem build logstash-output-doris.gemspec`
|
||||
|
||||
You will get logstash-output-doris-{version}.gem file in the same directory
|
||||
|
||||
### 3.Plug-in installation
|
||||
copy logstash-output-doris-{version}.gem to the logstash installation directory
|
||||
|
||||
Excuting an order
|
||||
|
||||
`./bin/logstash-plugin install logstash-output-doris-{version}.gem`
|
||||
|
||||
Install logstash-output-doris plugin
|
||||
|
||||
## Configuration
|
||||
### Example:
|
||||
|
||||
Create a new configuration file in the config directory and name it logstash-doris.conf
|
||||
|
||||
The specific configuration is as follows:
|
||||
|
||||
output {
|
||||
doris {
|
||||
http_hosts => [ "http://fehost:8030" ]
|
||||
user => user_name
|
||||
password => password
|
||||
db => "db_name"
|
||||
table => "table_name"
|
||||
label_prefix => "label_prefix"
|
||||
column_separator => ","
|
||||
}
|
||||
}
|
||||
|
||||
Configuration instructions:
|
||||
|
||||
Connection configuration:
|
||||
|
||||
Configuration | Explanation
|
||||
--- | ---
|
||||
`http_hosts` | FE's HTTP interactive address eg | ["http://fe1:8030", "http://fe2:8030"]
|
||||
`user` | User name, the user needs to have import permission for the doris table
|
||||
`password` | Password
|
||||
`db` | Database name
|
||||
`table` | Table name
|
||||
`label_prefix` | Import the identification prefix, the final generated ID is *{label\_prefix}\_{db}\_{table}\_{time_stamp}*
|
||||
|
||||
|
||||
Load configuration:([Reference documents](http://doris.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html))
|
||||
|
||||
Configuration | Explanation
|
||||
--- | ---
|
||||
`column_separator` | Column separator, the default is \t
|
||||
`columns` | Used to specify the correspondence between the columns in the import file and the columns in the table
|
||||
`where` | The filter conditions specified by the import task
|
||||
`max_filter_ratio` | The maximum tolerance rate of the import task, the default is zero tolerance
|
||||
`partition` | Partition information of the table to be imported
|
||||
`timeout` | timeout, the default is 600s
|
||||
`strict_mode` | Strict mode, the default is false
|
||||
`timezone` | Specify the time zone used for this import, the default is the East Eight District
|
||||
`exec_mem_limit` | Import memory limit, default is 2GB, unit is byte
|
||||
|
||||
Other configuration:
|
||||
|
||||
Configuration | Explanation
|
||||
--- | ---
|
||||
`save_on_failure` | If the import fails to save locally, the default is true
|
||||
`save_dir` | Local save directory, default is /tmp
|
||||
`automatic_retries` | The maximum number of retries on failure, the default is 3
|
||||
`batch_size` | The maximum number of events processed per batch, the default is 100000
|
||||
`idle_flush_time` | Maximum interval, the default is 20 (seconds)
|
||||
|
||||
|
||||
## Start Up
|
||||
Run the command to start the doris output plugin:
|
||||
|
||||
`{logstash-home}/bin/logstash -f {logstash-home}/config/logstash-doris.conf --config.reload.automatic`
|
||||
|
||||
|
||||
|
||||
|
||||
## Complete usage example
|
||||
### 1. Compile doris-output-plugin
|
||||
1> Download the ruby compressed package and go to [ruby official website](https://www.ruby-lang.org/en/downloads/) to download it. The version 2.7.1 used here
|
||||
|
||||
2> Compile and install, configure ruby environment variables
|
||||
|
||||
3> Go to the doris source extension/logstash/ directory and execute
|
||||
|
||||
`gem build logstash-output-doris.gemspec`
|
||||
|
||||
Get the file logstash-output-doris-0.1.0.gem, and the compilation is complete
|
||||
|
||||
### 2. Install and configure filebeat (here use filebeat as input)
|
||||
|
||||
1> [es official website](https://www.elastic.co/) Download the filebeat tar compression package and decompress it
|
||||
|
||||
2> Enter the filebeat directory and modify the configuration file filebeat.yml as follows:
|
||||
|
||||
filebeat.inputs:
|
||||
- type: log
|
||||
paths:
|
||||
- /tmp/doris.data
|
||||
output.logstash:
|
||||
hosts: ["localhost:5044"]
|
||||
|
||||
/tmp/doris.data is the doris data path
|
||||
|
||||
3> Start filebeat:
|
||||
|
||||
`./filebeat -e -c filebeat.yml -d "publish"`
|
||||
|
||||
|
||||
### 3.Install logstash and doris-out-plugin
|
||||
1> [es official website](https://www.elastic.co/) Download the logstash tar compressed package and decompress it
|
||||
|
||||
2> Copy the logstash-output-doris-0.1.0.gem obtained in step 1 to the logstash installation directory
|
||||
|
||||
3> execute
|
||||
|
||||
`./bin/logstash-plugin install logstash-output-doris-0.1.0.gem`
|
||||
|
||||
Install the plugin
|
||||
|
||||
4> Create a new configuration file logstash-doris.conf in the config directory as follows:
|
||||
|
||||
input {
|
||||
beats {
|
||||
port => "5044"
|
||||
}
|
||||
}
|
||||
|
||||
output {
|
||||
doris {
|
||||
http_hosts => [ "http://127.0.0.1:8030" ]
|
||||
user => doris
|
||||
password => doris
|
||||
db => "logstash_output_test"
|
||||
table => "output"
|
||||
label_prefix => "doris"
|
||||
column_separator => ","
|
||||
columns => "a,b,c,d,e"
|
||||
}
|
||||
}
|
||||
|
||||
The configuration here needs to be configured according to the configuration instructions
|
||||
|
||||
5> Start logstash:
|
||||
|
||||
./bin/logstash -f ./config/logstash-doris.conf --config.reload.automatic
|
||||
|
||||
### 4.Test Load
|
||||
|
||||
Add write data to /tmp/doris.data
|
||||
|
||||
`echo a,b,c,d,e >> /tmp/doris.data`
|
||||
|
||||
Observe the logstash log. If the status of the returned response is Success, the import was successful. At this time, you can view the imported data in the logstash_output_test.output table
|
||||
|
||||
198
docs/zh-CN/extending-doris/logstash.md
Normal file
198
docs/zh-CN/extending-doris/logstash.md
Normal file
@ -0,0 +1,198 @@
|
||||
---
|
||||
{
|
||||
"title": "Logstash Doris Output Plugin",
|
||||
"language": "zh-CN"
|
||||
}
|
||||
---
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
# Doris output plugin
|
||||
|
||||
该插件用于logstash输出数据到Doris,使用 HTTP 协议与 Doris FE Http接口交互,并通过 Doris 的 stream load 的方式进行数据导入.
|
||||
|
||||
[了解Doris Stream Load ](http://doris.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html)
|
||||
|
||||
[了解更多关于Doris](http://doris.apache.org/master/zh-CN/)
|
||||
|
||||
|
||||
## 安装和编译
|
||||
### 1.下载插件源码
|
||||
|
||||
### 2.编译 ##
|
||||
在extension/logstash/ 目录下执行
|
||||
|
||||
`gem build logstash-output-doris.gemspec`
|
||||
|
||||
你将在同目录下得到 logstash-output-doris-{version}.gem 文件
|
||||
|
||||
### 3.插件安装
|
||||
copy logstash-output-doris-{version}.gem 到 logstash 安装目录下
|
||||
|
||||
执行命令
|
||||
|
||||
`./bin/logstash-plugin install logstash-output-doris-{version}.gem`
|
||||
|
||||
安装 logstash-output-doris 插件
|
||||
|
||||
## 配置
|
||||
### 示例:
|
||||
|
||||
在config目录下新建一个配置配置文件,命名为 logstash-doris.conf
|
||||
|
||||
具体配置如下:
|
||||
|
||||
output {
|
||||
doris {
|
||||
http_hosts => [ "http://fehost:8030" ]
|
||||
user => user_name
|
||||
password => password
|
||||
db => "db_name"
|
||||
table => "table_name"
|
||||
label_prefix => "label_prefix"
|
||||
column_separator => ","
|
||||
}
|
||||
}
|
||||
|
||||
配置说明:
|
||||
|
||||
连接相关配置:
|
||||
|
||||
配置 | 说明
|
||||
--- | ---
|
||||
`http_hosts` | FE的HTTP交互地址 eg | ["http://fe1:8030", "http://fe2:8030"]
|
||||
`user` | 用户名,该用户需要有doris对应库表的导入权限
|
||||
`password` | 密码
|
||||
`db` | 数据库名
|
||||
`table` | 表名
|
||||
`label_prefix` | 导入标识前缀,最终生成的标识为 *{label\_prefix}\_{db}\_{table}\_{time_stamp}*
|
||||
|
||||
|
||||
导入相关配置:([参考文档](http://doris.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html))
|
||||
|
||||
配置 | 说明
|
||||
--- | ---
|
||||
`column_separator` | 列分割符,默认为\t。
|
||||
`columns` | 用于指定导入文件中的列和 table 中的列的对应关系。
|
||||
`where` | 导入任务指定的过滤条件。
|
||||
`max_filter_ratio` | 导入任务的最大容忍率,默认零容忍。
|
||||
`partition` | 待导入表的 Partition 信息。
|
||||
`timeout` | 超时时间,默认为600s。
|
||||
`strict_mode` | 严格模式,默认为false。
|
||||
`timezone` | 指定本次导入所使用的时区,默认为东八区。
|
||||
`exec_mem_limit` | 导入内存限制,默认为 2GB,单位为字节。
|
||||
|
||||
其他配置
|
||||
|
||||
配置 | 说明
|
||||
--- | ---
|
||||
`save_on_failure` | 如果导入失败是否在本地保存,默认为true
|
||||
`save_dir` | 本地保存目录,默认为 /tmp
|
||||
`automatic_retries` | 失败时重试最大次数,默认为3
|
||||
`batch_size` | 每批次最多处理的event数量,默认为100000
|
||||
`idle_flush_time` | 最大间隔时间,默认为20(秒)
|
||||
|
||||
|
||||
## 启动
|
||||
执行命令启动doris output plugin:
|
||||
|
||||
`{logstash-home}/bin/logstash -f {logstash-home}/config/logstash-doris.conf --config.reload.automatic`
|
||||
|
||||
|
||||
|
||||
|
||||
## 完整使用示例
|
||||
### 1.编译doris-output-plugin
|
||||
1> 下载ruby压缩包,自行到[ruby官网](https://www.ruby-lang.org/en/downloads/)下载,这里使用的2.7.1版本
|
||||
|
||||
2> 编译安装,配置ruby的环境变量
|
||||
|
||||
3> 到doris源码 extension/logstash/ 目录下,执行
|
||||
|
||||
`gem build logstash-output-doris.gemspec`
|
||||
|
||||
得到文件 logstash-output-doris-0.1.0.gem,至此编译完成
|
||||
|
||||
### 2.安装配置filebeat(此处使用filebeat作为input)
|
||||
|
||||
1> [es官网](https://www.elastic.co/)下载 filebeat tar压缩包并解压
|
||||
|
||||
2> 进入filebeat目录下,修改配置文件 filebeat.yml 如下:
|
||||
|
||||
filebeat.inputs:
|
||||
- type: log
|
||||
paths:
|
||||
- /tmp/doris.data
|
||||
output.logstash:
|
||||
hosts: ["localhost:5044"]
|
||||
|
||||
/tmp/doris.data 为doris数据路径
|
||||
|
||||
3> 启动filebeat:
|
||||
|
||||
`./filebeat -e -c filebeat.yml -d "publish"`
|
||||
|
||||
|
||||
### 3.安装logstash及doris-out-plugin
|
||||
1> [es官网](https://www.elastic.co/)下载 logstash tar压缩包并解压
|
||||
|
||||
2> 将步骤1中得到的 logstash-output-doris-0.1.0.gem copy到logstash安装目录下
|
||||
|
||||
3> 执行
|
||||
|
||||
`./bin/logstash-plugin install logstash-output-doris-0.1.0.gem`
|
||||
|
||||
安装插件
|
||||
|
||||
4> 在config 目录下新建配置文件 logstash-doris.conf 内容如下:
|
||||
|
||||
input {
|
||||
beats {
|
||||
port => "5044"
|
||||
}
|
||||
}
|
||||
|
||||
output {
|
||||
doris {
|
||||
http_hosts => [ "http://127.0.0.1:8030" ]
|
||||
user => doris
|
||||
password => doris
|
||||
db => "logstash_output_test"
|
||||
table => "output"
|
||||
label_prefix => "doris"
|
||||
column_separator => ","
|
||||
columns => "a,b,c,d,e"
|
||||
}
|
||||
}
|
||||
|
||||
这里的配置需按照配置说明自行配置
|
||||
|
||||
5> 启动logstash:
|
||||
|
||||
./bin/logstash -f ./config/logstash-doris.conf --config.reload.automatic
|
||||
|
||||
### 4.测试功能
|
||||
|
||||
向/tmp/doris.data追加写入数据
|
||||
|
||||
`echo a,b,c,d,e >> /tmp/doris.data`
|
||||
|
||||
观察logstash日志,若返回response的Status为 Success,则导入成功,此时可在 logstash_output_test.output 表中查看已导入的数据
|
||||
|
||||
20
extension/logstash/Gemfile
Normal file
20
extension/logstash/Gemfile
Normal file
@ -0,0 +1,20 @@
|
||||
=begin
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
=end
|
||||
source 'https://rubygems.org'
|
||||
gemspec
|
||||
16
extension/logstash/LICENSE
Normal file
16
extension/logstash/LICENSE
Normal file
@ -0,0 +1,16 @@
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
28
extension/logstash/README.md
Normal file
28
extension/logstash/README.md
Normal file
@ -0,0 +1,28 @@
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
1. How to build
|
||||
|
||||
`gem build logstash-output-doris.gemspec`
|
||||
|
||||
2. How to use
|
||||
|
||||
`http://doris.incubator.apache.org/master/en/extending-doris/logstash.html`
|
||||
`http://doris.incubator.apache.org/master/zh-CN/extending-doris/logstash.html`
|
||||
|
||||
25
extension/logstash/Rakefile
Normal file
25
extension/logstash/Rakefile
Normal file
@ -0,0 +1,25 @@
|
||||
=begin
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
=end
|
||||
@files=[]
|
||||
|
||||
task :default do
|
||||
system("rake -T")
|
||||
end
|
||||
|
||||
require "logstash/devutils/rake"
|
||||
294
extension/logstash/lib/logstash/outputs/doris.rb
Normal file
294
extension/logstash/lib/logstash/outputs/doris.rb
Normal file
@ -0,0 +1,294 @@
|
||||
=begin
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
=end
|
||||
|
||||
# encoding: utf-8
|
||||
require "logstash/outputs/base"
|
||||
require "logstash/namespace"
|
||||
require "logstash/json"
|
||||
require "logstash/util/shortname_resolver"
|
||||
require "uri"
|
||||
require "stud/buffer"
|
||||
require "logstash/plugin_mixins/http_client"
|
||||
require "securerandom"
|
||||
require "json"
|
||||
require "base64"
|
||||
require "restclient"
|
||||
|
||||
|
||||
class LogStash::Outputs::Doris < LogStash::Outputs::Base
|
||||
include LogStash::PluginMixins::HttpClient
|
||||
include Stud::Buffer
|
||||
|
||||
concurrency :single
|
||||
|
||||
config_name "doris"
|
||||
# hosts array of Doris Frontends. eg ["http://fe1:8030", "http://fe2:8030"]
|
||||
config :http_hosts, :validate => :array, :required => true
|
||||
# the database which data is loaded to
|
||||
config :db, :validate => :string, :required => true
|
||||
# the table which data is loaded to
|
||||
config :table, :validate => :string, :required => true
|
||||
# label prefix of a stream load requst.
|
||||
config :label_prefix, :validate => :string, :required => true
|
||||
# user
|
||||
config :user, :validate => :string, :required => true
|
||||
# password
|
||||
config :password, :validate => :password, :required => true
|
||||
# column separator
|
||||
config :column_separator, :validate => :string, :default => ""
|
||||
# column mappings. eg: "k1, k2, tmpk3, k3 = tmpk3 + 1"
|
||||
config :columns, :validate => :string, :default => ""
|
||||
# where predicate to filter data. eg: "k1 > 1 and k3 < 100"
|
||||
config :where, :validate => :string, :default => ""
|
||||
# max filter ratio
|
||||
config :max_filter_ratio, :validate => :number, :default => -1
|
||||
# partition which data is loaded to. eg: "p1, p2"
|
||||
config :partition, :validate => :array, :default => {}
|
||||
# timeout of a stream load, in second
|
||||
config :timeout, :validate => :number, :default => -1
|
||||
# switch off or on of strict mode
|
||||
config :strict_mode, :validate => :string, :default => "false"
|
||||
# timezone
|
||||
config :timezone, :validate => :string, :default => ""
|
||||
# memory limit of a stream load
|
||||
config :exec_mem_limit, :validate => :number, :default => -1
|
||||
|
||||
# Custom headers to use
|
||||
# format is `headers => ["X-My-Header", "%{host}"]`
|
||||
config :headers, :validate => :hash
|
||||
|
||||
config :batch_size, :validate => :number, :default => 100000
|
||||
|
||||
config :idle_flush_time, :validate => :number, :default => 20
|
||||
|
||||
config :save_on_failure, :validate => :boolean, :default => true
|
||||
|
||||
config :save_dir, :validate => :string, :default => "/tmp"
|
||||
|
||||
config :save_file, :validate => :string, :default => "failed.data"
|
||||
|
||||
config :host_resolve_ttl_sec, :validate => :number, :default => 120
|
||||
|
||||
config :automatic_retries, :validate => :number, :default => 3
|
||||
|
||||
|
||||
def print_plugin_info()
|
||||
@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-doris/ }
|
||||
@plugin_name = @@plugins[0].name
|
||||
@plugin_version = @@plugins[0].version
|
||||
@logger.debug("Running #{@plugin_name} version #{@plugin_version}")
|
||||
|
||||
@logger.info("Initialized doris output with settings",
|
||||
:db => @db,
|
||||
:table => @table,
|
||||
:label_prefix => @label_prefix,
|
||||
:batch_size => @batch_size,
|
||||
:idle_flush_time => @idle_flush_time,
|
||||
:http_hosts => @http_hosts)
|
||||
end
|
||||
|
||||
def register
|
||||
# Handle this deprecated option. TODO: remove the option
|
||||
#@ssl_certificate_validation = @verify_ssl if @verify_ssl
|
||||
|
||||
# We count outstanding requests with this queue
|
||||
# This queue tracks the requests to create backpressure
|
||||
# When this queue is empty no new requests may be sent,
|
||||
# tokens must be added back by the client on success
|
||||
#@request_tokens = SizedQueue.new(@pool_max)
|
||||
#@pool_max.times {|t| @request_tokens << true }
|
||||
#@requests = Array.new
|
||||
|
||||
@http_query = "/api/#{db}/#{table}/_stream_load"
|
||||
|
||||
@hostnames_pool =
|
||||
parse_http_hosts(http_hosts,
|
||||
ShortNameResolver.new(ttl: @host_resolve_ttl_sec, logger: @logger))
|
||||
|
||||
@request_headers = make_request_headers
|
||||
@logger.info("request headers: ", @request_headers)
|
||||
|
||||
buffer_initialize(
|
||||
:max_items => @batch_size,
|
||||
:max_interval => @idle_flush_time,
|
||||
:logger => @logger
|
||||
)
|
||||
|
||||
print_plugin_info()
|
||||
end # def register
|
||||
|
||||
private
|
||||
|
||||
def parse_http_hosts(hosts, resolver)
|
||||
ip_re = /^[\d]+\.[\d]+\.[\d]+\.[\d]+$/
|
||||
|
||||
lambda {
|
||||
hosts.flat_map { |h|
|
||||
scheme = URI(h).scheme
|
||||
host = URI(h).host
|
||||
port = URI(h).port
|
||||
path = URI(h).path
|
||||
|
||||
if ip_re !~ host
|
||||
resolver.get_addresses(host).map { |ip|
|
||||
"#{scheme}://#{ip}:#{port}#{path}"
|
||||
}
|
||||
else
|
||||
[h]
|
||||
end
|
||||
}
|
||||
}
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def get_host_addresses()
|
||||
begin
|
||||
@hostnames_pool.call
|
||||
rescue Exception => ex
|
||||
@logger.error('Error while resolving host', :error => ex.to_s)
|
||||
end
|
||||
end
|
||||
|
||||
# This module currently does not support parallel requests as that would circumvent the batching
|
||||
def receive(event)
|
||||
buffer_receive(event)
|
||||
end
|
||||
|
||||
public
|
||||
def flush(events, close=false)
|
||||
documents = ""
|
||||
event_num = 0
|
||||
events.each do |event|
|
||||
documents << event.get("[message]") << "\n"
|
||||
event_num += 1
|
||||
end
|
||||
|
||||
@logger.info("get event num: #{event_num}")
|
||||
@logger.debug("get documents: #{documents}")
|
||||
|
||||
hosts = get_host_addresses()
|
||||
|
||||
@request_headers["label"] = label_prefix + "_" + @db + "_" + @table + "_" + Time.now.strftime('%Y%m%d%H%M%S_%L')
|
||||
make_request(documents, hosts, @http_query, 1, hosts.sample)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def save_to_disk(documents)
|
||||
begin
|
||||
file = File.open("#{save_dir}/#{db}_#{table}_#{save_file}", "a")
|
||||
file.write(documents)
|
||||
rescue IOError => e
|
||||
log_failure("An error occurred while saving file to disk: #{e}",
|
||||
:file_name => file_name)
|
||||
ensure
|
||||
file.close unless file.nil?
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
private
|
||||
|
||||
def make_request(documents, hosts, query, req_count = 1,host = "", uuid = SecureRandom.hex)
|
||||
|
||||
if host == ""
|
||||
host = hosts.pop
|
||||
end
|
||||
|
||||
url = host+query
|
||||
@logger.debug("req count: #{req_count}. get url: #{url}")
|
||||
@logger.debug("request headers: ", @request_headers)
|
||||
|
||||
|
||||
result = RestClient.put(url, documents,@request_headers) { |response, request, result|
|
||||
case response.code
|
||||
when 301, 302, 307
|
||||
@logger.debug("redirect to: #{response.headers[:location]}")
|
||||
response.follow_redirection
|
||||
else
|
||||
response.return!
|
||||
end
|
||||
}
|
||||
|
||||
@logger.info("response : \n #{result}" )
|
||||
result_body = JSON.parse(result.body)
|
||||
if result_body['Status'] != "Success"
|
||||
if req_count < @automatic_retries
|
||||
@logger.warn("Response Status : #{result_body['Status']} . Retrying...... #{req_count}")
|
||||
make_request(documents,hosts,query,req_count + 1,host,uuid)
|
||||
return
|
||||
end
|
||||
@logger.warn("Load failed ! Try #{req_count} times.")
|
||||
if @save_on_failure
|
||||
@logger.warn("Retry times over #{req_count} times.Try save to disk.Disk file path : #{save_dir}/#{table}_#{save_file}")
|
||||
save_to_disk(documents)
|
||||
end
|
||||
end
|
||||
|
||||
end # def make_request
|
||||
|
||||
# This is split into a separate method mostly to help testing
|
||||
def log_failure(message, opts)
|
||||
@logger.warn("[HTTP Output Failure] #{message}", opts)
|
||||
end
|
||||
|
||||
def make_request_headers()
|
||||
headers = @headers || {}
|
||||
headers["Expect"] ||= "100-continue"
|
||||
headers["Content-Type"] ||= "text/plain;charset=utf-8"
|
||||
headers["strict_mode"] ||= @strict_mode
|
||||
headers["Authorization"] = "Basic " + Base64.strict_encode64("#{user}:#{password.value}")
|
||||
# column_separator
|
||||
if @column_separator != ""
|
||||
headers["column_separator"] = @column_separator
|
||||
end
|
||||
# timezone
|
||||
if @timezone != ""
|
||||
headers["timezone"] = @timezone
|
||||
end
|
||||
# partition
|
||||
if @partition.size > 0
|
||||
headers["partition"] ||= @partition
|
||||
end
|
||||
# where
|
||||
if @where != ""
|
||||
headers["where"] ||= @where
|
||||
end
|
||||
# timeout
|
||||
if @timeout != -1
|
||||
headers["timeout"] ||= @timeout
|
||||
end
|
||||
# max_filter_ratio
|
||||
if @max_filter_ratio != -1
|
||||
headers["max_filter_ratio"] ||= @max_filter_ratio
|
||||
end
|
||||
# exec_mem_limit
|
||||
if @exec_mem_limit != -1
|
||||
headers["exec_mem_limit"] ||= @exec_mem_limit
|
||||
end
|
||||
# columns
|
||||
if @columns != ""
|
||||
headers["columns"] ||= @columns
|
||||
end
|
||||
headers
|
||||
end
|
||||
end # end of class LogStash::Outputs::Doris
|
||||
|
||||
|
||||
58
extension/logstash/lib/logstash/util/shortname_resolver.rb
Normal file
58
extension/logstash/lib/logstash/util/shortname_resolver.rb
Normal file
@ -0,0 +1,58 @@
|
||||
=begin
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
=end
|
||||
require 'resolv'
|
||||
require 'mini_cache'
|
||||
|
||||
class ShortNameResolver
|
||||
def initialize(ttl:, logger:)
|
||||
@ttl = ttl
|
||||
@store = MiniCache::Store.new
|
||||
@logger = logger
|
||||
end
|
||||
|
||||
private
|
||||
def resolve_cached(shortname)
|
||||
@store.get_or_set(shortname) do
|
||||
addresses = resolve(shortname)
|
||||
raise "Bad shortname '#{shortname}'" if addresses.empty?
|
||||
MiniCache::Data.new(addresses, expires_in: @ttl)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
def resolve(shortname)
|
||||
addresses = Resolv::DNS.open do |dns|
|
||||
dns.getaddresses(shortname).map { |r| r.to_s }
|
||||
end
|
||||
|
||||
@logger.info("Resolved shortname '#{shortname}' to addresses #{addresses}")
|
||||
|
||||
return addresses
|
||||
end
|
||||
|
||||
public
|
||||
def get_address(shortname)
|
||||
return resolve_cached(shortname).sample
|
||||
end
|
||||
|
||||
public
|
||||
def get_addresses(shortname)
|
||||
return resolve_cached(shortname)
|
||||
end
|
||||
end
|
||||
47
extension/logstash/logstash-output-doris.gemspec
Normal file
47
extension/logstash/logstash-output-doris.gemspec
Normal file
@ -0,0 +1,47 @@
|
||||
=begin
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
=end
|
||||
Gem::Specification.new do |s|
|
||||
s.name = 'logstash-output-doris'
|
||||
s.version = '0.1.0'
|
||||
s.author = 'wfjcmcb'
|
||||
s.email = 'dev@doris.apache.org'
|
||||
s.homepage = 'http://doris.apache.org'
|
||||
s.licenses = ['Apache-2.0']
|
||||
s.summary = "This output lets you `PUT` messages in a batched fashion to Doris HTTP endpoint"
|
||||
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
|
||||
s.require_paths = ["lib"]
|
||||
|
||||
# Files
|
||||
s.files = Dir['lib/**/*','spec/**/*','*.gemspec','*.md','Gemfile','LICENSE' ]
|
||||
|
||||
# Tests
|
||||
s.test_files = s.files.grep(%r{^(test|spec|features)/})
|
||||
|
||||
# Special flag to let us know this is actually a logstash plugin
|
||||
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }
|
||||
|
||||
# Gem dependencies
|
||||
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
|
||||
s.add_runtime_dependency 'mini_cache', ">= 1.0.0", "< 2.0.0"
|
||||
s.add_runtime_dependency "rest-client", '~> 2.1'
|
||||
|
||||
s.add_development_dependency 'logstash-devutils', '~> 2.0', '>= 2.0.3'
|
||||
s.add_development_dependency 'sinatra', '~> 2.0', '>= 2.0.8.1'
|
||||
s.add_development_dependency 'webrick', '~> 1.6'
|
||||
end
|
||||
Reference in New Issue
Block a user