зеркало из
1
0
Форкнуть 0
* Upgrade libs
* Fix tests
This commit is contained in:
Ramachandran A G 2024-06-26 20:41:52 +05:30 коммит произвёл GitHub
Родитель 44928725be
Коммит c116778273
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
9 изменённых файлов: 123 добавлений и 87 удалений

28
.github/workflows/build.yml поставляемый
Просмотреть файл

@ -7,13 +7,16 @@ jobs:
build:
name: Build gem
runs-on: ubuntu-latest
environment: build
permissions:
checks: write
pull-requests: write
pull-requests: write
id-token: write
contents: read
strategy:
matrix:
logstash: [
{ version: '8.5.0', url: "https://artifacts.elastic.co/downloads/logstash/logstash-8.5.0-linux-x86_64.tar.gz" , main: 'true' }
{ version: '8.7.0', url: "https://artifacts.elastic.co/downloads/logstash/logstash-8.7.0-linux-x86_64.tar.gz" , main: 'true' }
]
env:
LOGSTASH_SOURCE: 1
@ -21,6 +24,12 @@ jobs:
JRUBY_HOME: /home/runner/logstash/vendor/jruby
JAVA_HOME: /home/runner/logstash/jdk
steps:
- name: Azure login
uses: azure/login@v2
with:
client-id: ${{ secrets.APP_ID }}
tenant-id: ${{ secrets.AUTH_ID }}
subscription-id: ${{ secrets.SUBSCRIPTION_ID }}
- name: Build logstash
run: |
echo "Getting logstash version ${{matrix.logstash.version}}"
@ -68,8 +77,20 @@ jobs:
e2e:
name: End-To-End Testing
runs-on: ubuntu-latest
environment: build
permissions:
checks: write
pull-requests: write
id-token: write
contents: read
needs: build
steps:
- name: Azure login
uses: azure/login@v2
with:
client-id: ${{ secrets.APP_ID }}
tenant-id: ${{ secrets.AUTH_ID }}
subscription-id: ${{ secrets.SUBSCRIPTION_ID }}
- uses: ruby/setup-ruby@v1
with:
ruby-version: jruby
@ -98,7 +119,4 @@ jobs:
env:
ENGINE_URL: ${{ secrets.ENGINE_URL }}
INGEST_URL: ${{ secrets.INGEST_URL }}
APP_ID: ${{ secrets.APP_ID }}
APP_KEY: ${{ secrets.APP_KEY }}
TENANT_ID: ${{ secrets.TENANT_ID }}
TEST_DATABASE: ${{ secrets.TEST_DATABASE }}

5
.gitignore поставляемый
Просмотреть файл

@ -60,3 +60,8 @@ gradle/wrapper/gradle-wrapper.properties
rspec.xml
e2e/output_file.txt
logs.txt
docker-e2e/.env
local-run.sh
logs2.txt
**/.vscode/*.*
**/settings.json

Просмотреть файл

@ -19,71 +19,74 @@ repositories {
// These dependencies are required by the gemspec to build the gem. The easiest to arrive at this list is to look at the effective pom of kusto-ingest and arrive at this list
// even if we use the ruby-maven gem to package the gem, install and lock_jars will create the logstash_output_kusto_jars.rb file with the same list of dependencies.
// In the gradle way, running ./gradlew vendor creates the jar file list and adds them to vendor/jar-dependencies folder from where it is referenced in the gemspec (require_paths and files)
// update dependencies to bom azure-sdk-bom/1.2.24
dependencies {
implementation 'com.azure:azure-core-http-netty:1.13.9'
implementation 'com.azure:azure-core:1.44.1'
implementation 'com.azure:azure-data-tables:12.3.16'
implementation 'com.azure:azure-identity:1.10.4'
implementation 'com.microsoft.azure.kusto:kusto-data:5.1.0'
implementation 'com.microsoft.azure.kusto:kusto-ingest:5.1.0'
implementation 'com.azure:azure-core-http-netty:1.15.0'
implementation 'com.azure:azure-core:1.49.0'
implementation 'com.azure:azure-data-tables:12.4.1'
implementation 'com.azure:azure-identity:1.12.1'
implementation 'com.azure:azure-json:1.1.0'
implementation 'com.azure:azure-storage-blob:12.24.1'
implementation 'com.azure:azure-storage-common:12.23.1'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.15.2'
implementation 'com.azure:azure-storage-queue:12.19.1'
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.15.2'
implementation 'com.fasterxml.jackson.core:jackson-core:2.15.2'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.2'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2'
implementation 'com.github.stephenc.jcip:jcip-annotations:1.0-1'
implementation 'com.microsoft.azure.kusto:kusto-data:5.0.5'
implementation 'com.microsoft.azure.kusto:kusto-ingest:5.0.5'
implementation 'com.microsoft.azure:msal4j-persistence-extension:1.2.0'
implementation 'com.microsoft.azure:msal4j:1.13.10'
implementation 'com.nimbusds:content-type:2.2'
implementation 'com.azure:azure-storage-blob:12.26.0'
implementation 'com.azure:azure-storage-common:12.25.0'
implementation 'com.azure:azure-storage-queue:12.21.0'
implementation 'com.azure:azure-xml:1.0.0'
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.16.0'
implementation 'com.fasterxml.jackson.core:jackson-core:2.16.0'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.16.0'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.16.0'
implementation 'com.fasterxml.woodstox:woodstox-core:6.7.0'
implementation 'com.github.stephenc.jcip:jcip-annotations:1.0-1'
implementation 'com.microsoft.azure:msal4j:1.15.1'
implementation 'com.nimbusds:content-type:2.3'
implementation 'com.nimbusds:lang-tag:1.7'
implementation 'com.nimbusds:nimbus-jose-jwt:9.30.2'
implementation 'com.nimbusds:oauth2-oidc-sdk:10.7.1'
implementation 'com.nimbusds:nimbus-jose-jwt:9.40'
implementation 'com.nimbusds:oauth2-oidc-sdk:11.13'
implementation 'com.univocity:univocity-parsers:2.9.1'
implementation 'commons-codec:commons-codec:1.16.0'
implementation 'commons-logging:commons-logging:1.2'
implementation 'commons-codec:commons-codec:1.16.1'
implementation 'commons-logging:commons-logging:1.3.1'
implementation 'io.github.resilience4j:resilience4j-core:1.7.1'
implementation 'io.github.resilience4j:resilience4j-retry:1.7.1'
implementation 'io.netty:netty-buffer:4.1.104.Final'
implementation 'io.netty:netty-codec-dns:4.1.104.Final'
implementation 'io.netty:netty-codec-http2:4.1.104.Final'
implementation 'io.netty:netty-codec-http:4.1.104.Final'
implementation 'io.netty:netty-codec-socks:4.1.104.Final'
implementation 'io.netty:netty-codec:4.1.104.Final'
implementation 'io.netty:netty-common:4.1.104.Final'
implementation 'io.netty:netty-handler-proxy:4.1.104.Final'
implementation 'io.netty:netty-handler:4.1.104.Final'
implementation 'io.netty:netty-resolver-dns-classes-macos:4.1.104.Final'
implementation 'io.netty:netty-resolver-dns-native-macos:4.1.104.Final:osx-x86_64'
implementation 'io.netty:netty-resolver-dns:4.1.104.Final'
implementation 'io.netty:netty-resolver:4.1.104.Final'
implementation 'io.netty:netty-tcnative-boringssl-static:2.0.62.Final'
implementation 'io.netty:netty-tcnative-classes:2.0.62.Final'
implementation 'io.netty:netty-transport-classes-epoll:4.1.104.Final'
implementation 'io.netty:netty-transport-classes-kqueue:4.1.104.Final'
implementation 'io.netty:netty-transport-native-epoll:4.1.104.Final:linux-x86_64'
implementation 'io.netty:netty-transport-native-kqueue:4.1.104.Final:osx-x86_64'
implementation 'io.netty:netty-transport-native-unix-common:4.1.104.Final'
implementation 'io.netty:netty-transport:4.1.104.Final'
implementation 'io.projectreactor.netty:reactor-netty-core:1.0.39'
implementation 'io.projectreactor.netty:reactor-netty-http:1.0.39'
implementation 'io.projectreactor:reactor-core:3.4.34'
implementation 'io.vavr:vavr-match:0.10.2'
implementation 'io.vavr:vavr:0.10.2'
implementation 'io.netty:netty-buffer:4.1.108.Final'
implementation 'io.netty:netty-codec-dns:4.1.108.Final'
implementation 'io.netty:netty-codec-http2:4.1.108.Final'
implementation 'io.netty:netty-codec-http:4.1.108.Final'
implementation 'io.netty:netty-codec-socks:4.1.108.Final'
implementation 'io.netty:netty-codec:4.1.108.Final'
implementation 'io.netty:netty-common:4.1.108.Final'
implementation 'io.netty:netty-handler-proxy:4.1.108.Final'
implementation 'io.netty:netty-handler:4.1.108.Final'
implementation 'io.netty:netty-resolver-dns-classes-macos:4.1.108.Final'
implementation 'io.netty:netty-resolver-dns-native-macos:4.1.108.Final:osx-x86_64'
implementation 'io.netty:netty-resolver-dns:4.1.108.Final'
implementation 'io.netty:netty-resolver:4.1.108.Final'
implementation 'io.netty:netty-tcnative-boringssl-static:2.0.65.Final'
implementation 'io.netty:netty-tcnative-classes:2.0.65.Final'
implementation 'io.netty:netty-transport-classes-epoll:4.1.108.Final'
implementation 'io.netty:netty-transport-classes-kqueue:4.1.108.Final'
implementation 'io.netty:netty-transport-native-epoll:4.1.108.Final:linux-x86_64'
implementation 'io.netty:netty-transport-native-kqueue:4.1.108.Final:osx-x86_64'
implementation 'io.netty:netty-transport-native-unix-common:4.1.108.Final'
implementation 'io.netty:netty-transport:4.1.108.Final'
implementation 'io.projectreactor.netty:reactor-netty-core:1.0.43'
implementation 'io.projectreactor.netty:reactor-netty-http:1.0.43'
implementation 'io.projectreactor:reactor-core:3.4.36'
implementation 'io.vavr:vavr:0.10.4'
implementation 'io.vavr:vavr-match:0.10.4'
implementation 'net.java.dev.jna:jna-platform:5.13.0'
implementation 'net.java.dev.jna:jna:5.13.0'
implementation 'net.minidev:accessors-smart:2.4.9'
implementation 'net.minidev:json-smart:2.4.10'
implementation 'net.minidev:accessors-smart:2.5.1'
implementation 'net.minidev:json-smart:2.5.1'
implementation 'org.apache.commons:commons-lang3:3.14.0'
implementation 'org.apache.commons:commons-text:1.11.0'
implementation 'org.apache.httpcomponents:httpclient:4.5.14'
implementation 'org.apache.httpcomponents:httpcore:4.4.16'
implementation 'org.codehaus.woodstox:stax2-api:4.2.1'
implementation 'org.jetbrains:annotations:22.0.0'
implementation 'org.ow2.asm:asm:9.3'
implementation 'org.codehaus.woodstox:stax2-api:4.2.2'
implementation 'org.jetbrains:annotations:24.1.0'
implementation 'org.ow2.asm:asm:9.7'
implementation 'org.reactivestreams:reactive-streams:1.0.4'
implementation 'org.slf4j:slf4j-api:1.8.0-beta4'
implementation 'org.slf4j:slf4j-simple:1.8.0-beta4'

Просмотреть файл

@ -14,10 +14,11 @@ class E2E
@column_count = 19
@engine_url = ENV["ENGINE_URL"]
@ingest_url = ENV["INGEST_URL"]
@app_id = ENV["APP_ID"]
@app_key = ENV['APP_KEY']
@tenant_id = ENV['TENANT_ID']
@database = ENV['TEST_DATABASE']
@lslocalpath = ENV['LS_LOCAL_PATH']
if @lslocalpath.nil?
@lslocalpath = "/usr/share/logstash/bin/logstash"
end
@table_with_mapping = "RubyE2E#{Time.now.getutc.to_i}"
@table_without_mapping = "RubyE2ENoMapping#{Time.now.getutc.to_i}"
@mapping_name = "test_mapping"
@ -36,19 +37,15 @@ class E2E
kusto {
path => "tmp%{+YYYY-MM-dd-HH-mm}.txt"
ingest_url => "#{@ingest_url}"
app_id => "#{@app_id}"
app_key => "#{@app_key}"
app_tenant => "#{@tenant_id}"
cli_auth => true
database => "#{@database}"
table => "#{@table_with_mapping}"
json_mapping => "#{@mapping_name}"
}
kusto {
path => "nomaptmp%{+YYYY-MM-dd-HH-mm}.txt"
cli_auth => true
ingest_url => "#{@ingest_url}"
app_id => "#{@app_id}"
app_key => "#{@app_key}"
app_tenant => "#{@tenant_id}"
database => "#{@database}"
table => "#{@table_without_mapping}"
}
@ -82,7 +79,7 @@ class E2E
logstashpath = File.absolute_path("logstash.conf")
File.write(@output_file, "")
File.write(@input_file, "")
lscommand = "/usr/share/logstash/bin/logstash -f #{logstashpath}"
lscommand = "#{@lslocalpath} -f #{logstashpath}"
puts "Running logstash from config path #{logstashpath} and final command #{lscommand}"
spawn(lscommand)
sleep(60)
@ -137,8 +134,7 @@ class E2E
end
def start
@query_client = $kusto_java.data.ClientFactory.createClient($kusto_java.data.auth.ConnectionStringBuilder::createWithAadApplicationCredentials(@engine_url, @app_id,
@app_key, @tenant_id))
@query_client = $kusto_java.data.ClientFactory.createClient($kusto_java.data.auth.ConnectionStringBuilder::createWithAzureCli(@engine_url))
create_table_and_mapping
run_logstash
assert_data

Просмотреть файл

@ -80,7 +80,8 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
config :app_tenant, validate: :string, default: nil
# managed identity id
config :managed_identity, validate: :string, default: nil
# CLI credentials for dev-test
config :cli_auth, validate: :boolean, default: false
# The following are the data settings that impact where events are written to
# Database name
config :database, validate: :string, required: true
@ -154,7 +155,7 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
max_queue: upload_queue_size,
fallback_policy: :caller_runs)
@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, final_mapping, delete_temp_files, proxy_host, proxy_port,proxy_protocol, @logger, executor)
@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cli_auth, database, table, final_mapping, delete_temp_files, proxy_host, proxy_port,proxy_protocol, @logger, executor)
# send existing files
recover_past_files if recovery

Просмотреть файл

@ -20,17 +20,18 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
LOW_QUEUE_LENGTH = 3
FIELD_REF = /%\{[^}]+\}/
def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL)
def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli_auth, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL)
@workers_pool = threadpool
@logger = logger
validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id)
validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id,cli_auth)
@logger.info('Preparing Kusto resources.')
kusto_java = Java::com.microsoft.azure.kusto
apache_http = Java::org.apache.http
# kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
# If there is managed identity, use it. This means the AppId and AppKey are empty/nil
is_managed_identity = (app_id.nil? && app_key.nil?)
# If there is CLI Auth, use that instead of managed identity
is_managed_identity = (app_id.nil? && app_key.nil? && !cli_auth)
# If it is system managed identity, propagate the system identity
is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id)
# Is it direct connection
@ -45,7 +46,13 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id)
end
else
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
if cli_auth
@logger.warn('*Use of CLI Auth is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production*')
kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(ingest_url)
else
@logger.info('Using app id and app key.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
end
end
#
@logger.debug(Gem.loaded_specs.to_s)
@ -75,16 +82,21 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
@ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
else
@logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output')
@ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
end
@delete_local = delete_local
@logger.debug('Kusto resources are ready.')
end
def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id)
def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id,cli_auth)
# Add an additional validation and fail this upfront
if app_id.nil? && app_key.nil? && managed_identity_id.nil?
@logger.error('managed_identity_id is not provided and app_id/app_key is empty.')
raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.')
if cli_auth
@logger.info('Using CLI Auth, this is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production')
else
@logger.error('managed_identity_id is not provided and app_id/app_key is empty.')
raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.')
end
end
if database =~ FIELD_REF
@logger.error('database config value should not be dynamic.', database)

Просмотреть файл

@ -32,4 +32,4 @@ Gem::Specification.new do |s|
s.add_development_dependency 'rspec_junit_formatter'
end
end

Просмотреть файл

@ -11,6 +11,7 @@ describe LogStash::Outputs::Kusto::Ingestor do
let(:app_tenant) { "mytenant" }
let(:managed_identity) { "managed_identity" }
let(:database) { "mydatabase" }
let(:cliauth) { false }
let(:table) { "mytable" }
let(:proxy_host) { "localhost" }
let(:proxy_port) { 80 }
@ -25,7 +26,7 @@ describe LogStash::Outputs::Kusto::Ingestor do
# note that this will cause an internal error since connection is being tried.
# however we still want to test that all the java stuff is working as expected
expect {
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol, logger)
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol, logger)
ingestor.stop
}.not_to raise_error
end
@ -36,7 +37,7 @@ describe LogStash::Outputs::Kusto::Ingestor do
dynamic_name_array.each do |test_database|
it "with database: #{test_database}" do
expect {
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, test_database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger)
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, test_database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger)
ingestor.stop
}.to raise_error(LogStash::ConfigurationError)
end
@ -47,7 +48,7 @@ describe LogStash::Outputs::Kusto::Ingestor do
dynamic_name_array.each do |test_table|
it "with database: #{test_table}" do
expect {
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, test_table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger)
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, test_table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger)
ingestor.stop
}.to raise_error(LogStash::ConfigurationError)
end
@ -58,7 +59,7 @@ describe LogStash::Outputs::Kusto::Ingestor do
dynamic_name_array.each do |json_mapping|
it "with database: #{json_mapping}" do
expect {
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger)
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger)
ingestor.stop
}.to raise_error(LogStash::ConfigurationError)
end
@ -68,7 +69,7 @@ describe LogStash::Outputs::Kusto::Ingestor do
context 'proxy protocol has to be http or https' do
it "with proxy protocol: socks" do
expect {
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks',logger)
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks',logger)
ingestor.stop
}.to raise_error(LogStash::ConfigurationError)
end
@ -77,7 +78,7 @@ describe LogStash::Outputs::Kusto::Ingestor do
context 'one of appid or managedid has to be provided' do
it "with empty managed identity and appid" do
expect {
ingestor = described_class.new(ingest_url, "", app_key, app_tenant, "",database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks',logger)
ingestor = described_class.new(ingest_url, "", app_key, app_tenant, "", cliauth, database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks',logger)
ingestor.stop
}.to raise_error(LogStash::ConfigurationError)
end

Просмотреть файл

@ -1 +1 @@
2.0.5
2.0.6