зеркало из
1
0
Форкнуть 0

*Add support for proxies *Filter 0 byte files (#42)

* Add support for proxies 
* Filter empty files from getting ingested
* Added README.MD
* Add nil check for no proxy

Co-authored-by: AsafMah <asafmahlev@microsoft.com>
This commit is contained in:
ag-ramachandran 2022-08-04 10:43:46 +05:30 коммит произвёл GitHub
Родитель 360830b8b5
Коммит 1cb8c9bbf6
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
9 изменённых файлов: 91 добавлений и 27 удалений

3
.github/workflows/build.yml поставляемый
Просмотреть файл

@ -2,9 +2,6 @@ name: build
on:
push:
# Sequence of patterns matched against refs/tags
branches:
- '*'
jobs:
build:

9
.gitignore поставляемый
Просмотреть файл

@ -31,6 +31,15 @@ Gemfile.lock
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
.vscode-server/*
.ssh/*
#Devcontainer
*.devcontainer/*
Jars.lock
.mvn/*
#logstash conf
*.conf
#IDEA
.idea

Просмотреть файл

@ -28,4 +28,10 @@
# 1.0.0
- Use stable (2.1.2) version of the java sdk, and retrieve it from maven with bundler.
- Renamed `mapping` to `json_mapping` in order to clarify usage. `mapping` still remains as a deprecated parameter.
- Renamed `mapping` to `json_mapping` in order to clarify usage. `mapping` still remains as a deprecated parameter.
# 1.0.5
- Use (3.1.3) version of the java sdk, and retrieve it from maven with bundler.
- Added support for `proxy_host` `proxy_port` `proxy_protocol` to support proxying ingestion to Kusto

Просмотреть файл

@ -39,6 +39,9 @@ output {
database => "<database name>"
table => "<target table>"
json_mapping => "<mapping name>"
proxy_host => "<proxy host>"
proxy_port => <proxy port>
proxy_protocol => <"http"|"https">
}
}
```
@ -57,6 +60,16 @@ More information about configuring Logstash can be found in the [logstash config
| **recovery** | If set to true (default), plugin will attempt to resend pre-existing temp files found in the path upon startup | |
| **delete_temp_files** | Determines if temp files will be deleted after a successful upload (true is default; set false for debug purposes only)| |
| **flush_interval** | The time (in seconds) for flushing writes to temporary files. Default is 2 seconds, 0 will flush on every event. Increase this value to reduce IO calls but keep in mind that events in the buffer will be lost in case of abrupt failure.| |
| **proxy_host** | The proxy hostname for redirecting traffic to Kusto.| |
| **proxy_port** | The proxy port for the proxy. Defaults to 80.| |
| **proxy_protocol** | The proxy server protocol , is one of http or https.| |
> Note : LS_JAVA_OPTS can be used to set proxy parameters as well (using export or SET options)
```bash
export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.proxyHost=1.2.3.4 -Dhttps.proxyPort=8989"
```
## Development Requirements

Просмотреть файл

@ -88,7 +88,7 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
# Note that this must be in JSON format, as this is the interface between Logstash and Kusto
config :json_mapping, validate: :string, required: true
# Mappung name - deprecated, use json_mapping
# Mapping name - deprecated, use json_mapping
config :mapping, validate: :string, deprecated: true
@ -106,6 +106,15 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
# starts processing them in the main thread (not healthy)
config :upload_queue_size, validate: :number, default: 30
# Host of the proxy , is an optional field. Can connect directly
config :proxy_host, validate: :string, required: false
# Port where the proxy runs , defaults to 80. Usually a value like 3128
config :proxy_port, validate: :number, required: false , default: 80
# Check Proxy URL can be over http or https. Dowe need it this way or ignore this & remove this
config :proxy_protocol, validate: :string, required: false , default: 'http'
default :codec, 'json_lines'
def register
@ -141,7 +150,7 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
max_queue: upload_queue_size,
fallback_policy: :caller_runs)
@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, database, table, final_mapping, delete_temp_files, @logger, executor)
@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, database, table, final_mapping, delete_temp_files, proxy_host, proxy_port,proxy_protocol, @logger, executor)
# send existing files
recover_past_files if recovery

Просмотреть файл

@ -20,23 +20,30 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
LOW_QUEUE_LENGTH = 3
FIELD_REF = /%\{[^}]+\}/
def initialize(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, logger, threadpool = DEFAULT_THREADPOOL)
def initialize(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL)
@workers_pool = threadpool
@logger = logger
validate_config(database, table, json_mapping)
@logger.debug('Preparing Kusto resources.')
validate_config(database, table, json_mapping,proxy_protocol)
@logger.info('Preparing Kusto resources.')
kusto_java = Java::com.microsoft.azure.kusto
apache_http = Java::org.apache.http
kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
#
@logger.debug(Gem.loaded_specs.to_s)
# Unfortunately there's no way to avoid using the gem/plugin name directly...
name_for_tracing = "logstash-output-kusto:#{Gem.loaded_specs['logstash-output-kusto']&.version || "unknown"}"
@logger.debug("Client name for tracing: #{name_for_tracing}")
kusto_connection_string.setClientVersionForTracing(name_for_tracing)
@kusto_client = kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string)
@kusto_client = begin
if proxy_host.nil? || proxy_host.empty?
kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string)
else
kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(proxy_host,proxy_port,proxy_protocol)).build()
kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string, kusto_http_client_properties)
end
end
@ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table)
@ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
@ -46,7 +53,7 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
@logger.debug('Kusto resources are ready.')
end
def validate_config(database, table, json_mapping)
def validate_config(database, table, json_mapping,proxy_protocol)
if database =~ FIELD_REF
@logger.error('database config value should not be dynamic.', database)
raise LogStash::ConfigurationError.new('database config value should not be dynamic.')
@ -61,6 +68,12 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
@logger.error('json_mapping config value should not be dynamic.', json_mapping)
raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.')
end
if not(["https", "http"].include? proxy_protocol)
@logger.error('proxy_protocol has to be http or https.', proxy_protocol)
raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.')
end
end
def upload_async(path, delete_on_success)
@ -95,11 +108,13 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
# local_ingestion_properties.addJsonMappingName(json_mapping)
# end
file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path, 0); # 0 - let the sdk figure out the size of the file
@kusto_client.ingestFromFile(file_source_info, @ingestion_properties)
if file_size > 0
file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path, 0); # 0 - let the sdk figure out the size of the file
@kusto_client.ingestFromFile(file_source_info, @ingestion_properties)
else
@logger.warn("File #{path} is an empty file and is not ingested.")
end
File.delete(path) if delete_on_success
@logger.debug("File #{path} sent to kusto.")
rescue Errno::ENOENT => e
@logger.error("File doesn't exist! Unrecoverable error.", exception: e.class, message: e.message, path: path, backtrace: e.backtrace)

Просмотреть файл

@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-kusto' #WATCH OUT: we hardcoded usage of this name in one of the classes.
s.version = '1.0.4'
s.version = '1.0.5'
s.licenses = ['Apache-2.0']
s.summary = 'Writes events to Azure Data Explorer (Kusto)'
s.description = 'This is a logstash output plugin used to write events to an Azure Data Explorer (a.k.a Kusto)'
@ -31,6 +31,6 @@ Gem::Specification.new do |s|
s.add_development_dependency 'rspec_junit_formatter'
# Jar dependencies
s.requirements << "jar 'com.microsoft.azure.kusto, kusto-ingest, 3.1.1'"
s.requirements << "jar 'com.microsoft.azure.kusto, kusto-ingest, 3.1.3'"
s.add_runtime_dependency 'jar-dependencies'
end

Просмотреть файл

@ -11,6 +11,9 @@ describe LogStash::Outputs::Kusto::Ingestor do
let(:app_tenant) { "mytenant" }
let(:database) { "mydatabase" }
let(:table) { "mytable" }
let(:proxy_host) { "localhost" }
let(:proxy_port) { 80 }
let(:proxy_protocol) { "http" }
let(:json_mapping) { "mymapping" }
let(:delete_local) { false }
let(:logger) { spy('logger') }
@ -21,7 +24,7 @@ describe LogStash::Outputs::Kusto::Ingestor do
# note that this will cause an internal error since connection is being tried.
# however we still want to test that all the java stuff is working as expected
expect {
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, logger)
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol, logger)
ingestor.stop
}.not_to raise_error
end
@ -32,7 +35,7 @@ describe LogStash::Outputs::Kusto::Ingestor do
dynamic_name_array.each do |test_database|
it "with database: #{test_database}" do
expect {
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, test_database, table, json_mapping, delete_local, logger)
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, test_database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger)
ingestor.stop
}.to raise_error(LogStash::ConfigurationError)
end
@ -43,7 +46,7 @@ describe LogStash::Outputs::Kusto::Ingestor do
dynamic_name_array.each do |test_table|
it "with database: #{test_table}" do
expect {
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, database, test_table, json_mapping, delete_local, logger)
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, database, test_table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger)
ingestor.stop
}.to raise_error(LogStash::ConfigurationError)
end
@ -51,16 +54,25 @@ describe LogStash::Outputs::Kusto::Ingestor do
end
context 'doesnt allow mapping to have some dynamic part' do
dynamic_name_array.each do |test_json_mapping|
it "with database: #{test_json_mapping}" do
dynamic_name_array.each do |json_mapping|
it "with database: #{json_mapping}" do
expect {
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, database, table, test_json_mapping, delete_local, logger)
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger)
ingestor.stop
}.to raise_error(LogStash::ConfigurationError)
end
end
end
context 'proxy protocol has to be http or https' do
it "with proxy protocol: socks" do
expect {
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks',logger)
ingestor.stop
}.to raise_error(LogStash::ConfigurationError)
end
end
end
# describe 'receiving events' do

Просмотреть файл

@ -12,7 +12,10 @@ describe LogStash::Outputs::Kusto do
"app_tenant" => "mytenant",
"database" => "mydatabase",
"table" => "mytable",
"json_mapping" => "mymapping"
"json_mapping" => "mymapping",
"proxy_host" => "localhost",
"proxy_port" => 3128,
"proxy_protocol" => "https"
} }
describe '#register' do