Add support for managed identities (#9)

This commit is contained in:
Jonas-Taha El Sesiy 2020-08-11 10:57:00 -07:00 коммит произвёл GitHub
Родитель 9c22e4171a
Коммит 6c76ea7f33
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 220 добавлений и 158 удалений

2
.gitignore поставляемый
Просмотреть файл

@ -42,7 +42,7 @@ build-iPhoneSimulator/
# for a library or gem, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# Gemfile.lock
Gemfile.lock
# .ruby-version
# .ruby-gemset

2
.rubocop.yml Normal file
Просмотреть файл

@ -0,0 +1,2 @@
Style/FrozenStringLiteralComment:
Enabled: false

112
README.md
Просмотреть файл

@ -8,58 +8,69 @@ Azure Storage Append Blob output plugin buffers logs in local file and uploads t
### RubyGems
```
$ gem install fluent-plugin-azure-storage-append-blob
```
gem install fluent-plugin-azure-storage-append-blob
### Bundler
Add following line to your Gemfile:
```ruby
gem "fluent-plugin-azure-storage-append-blob"
```
gem "fluent-plugin-azure-storage-append-blob"
And then execute:
```
$ bundle
```
bundle
## Configuration
```
<match pattern>
type azure-storage-append-blob
<match pattern>
type azure-storage-append-blob
azure_storage_account <your azure storage account>
azure_storage_access_key <your azure storage access key>
azure_container <your azure storage container>
auto_create_container true
path logs/
azure_object_key_format %{path}%{time_slice}_%{index}.log
time_slice_format %Y%m%d-%H
# if you want to use %{tag} or %Y/%m/%d/ like syntax in path / azure_blob_name_format,
# need to specify tag for %{tag} and time for %Y/%m/%d in <buffer> argument.
<buffer tag,time>
@type file
path /var/log/fluent/azurestorageappendblob
timekey 120 # 2 minutes
timekey_wait 60
timekey_use_utc true # use utc
</buffer>
</match>
```
azure_storage_account <your azure storage account>
azure_storage_access_key <your azure storage access key> # leave empty to use MSI
azure_storage_sas_token <your azure storage sas token> # leave empty to use MSI
azure_imds_api_version <Azure Instance Metadata Service API Version> # only used for MSI
azure_token_refresh_interval <refresh interval in min> # only used for MSI
azure_container <your azure storage container>
auto_create_container true
path logs/
azure_object_key_format %{path}%{time_slice}_%{index}.log
time_slice_format %Y%m%d-%H
# if you want to use %{tag} or %Y/%m/%d/ like syntax in path / azure_blob_name_format,
# need to specify tag for %{tag} and time for %Y/%m/%d in <buffer> argument.
<buffer tag,time>
@type file
path /var/log/fluent/azurestorageappendblob
timekey 120 # 2 minutes
timekey_wait 60
timekey_use_utc true # use utc
</buffer>
</match>
### `azure_storage_account` (Required)
Your Azure Storage Account Name. This can be retrieved from Azure Management portal.
### `azure_storage_access_key` or `azure_storage_sas_token` (Required)
### `azure_storage_access_key` or `azure_storage_sas_token` (Either required or both empty to use MSI)
Your Azure Storage Access Key (Primary or Secondary) or shared access signature (SAS) token.
This also can be retrieved from Azure Management portal.
If both are empty, the plugin will use the local Managed Identity endpoint to obtain a token for the target storage account.
### `azure_imds_api_version` (Optional, only for MSI)
Default: 2019-08-15
The Instance Metadata Service is used during the OAuth flow to obtain an access token. This API is versioned and specifying the version is mandatory.
See [here](https://docs.microsoft.com/en-us/azure/virtual-machines/linux/instance-metadata-service#versioning) for more details.
### `azure_token_refresh_interval` (Optional, only for MSI)
Default: 60 (1 hour)
When using MSI, the initial access token needs to be refreshed periodically.
### `azure_container` (Required)
Azure Storage Container name
@ -87,48 +98,41 @@ The default format is "%{path}%{time_slice}-%{index}.log".
For instance, using the example configuration above, actual object keys on Azure Storage will be something like:
```
"logs/20130111-22-0.log"
"logs/20130111-23-0.log"
"logs/20130112-00-0.log"
```
"logs/20130111-22-0.log"
"logs/20130111-23-0.log"
"logs/20130112-00-0.log"
With the configuration:
```
azure_object_key_format %{path}/events/ts=%{time_slice}/events.log
path log
time_slice_format %Y%m%d-%H
```
azure_object_key_format %{path}/events/ts=%{time_slice}/events.log
path log
time_slice_format %Y%m%d-%H
You get:
```
"log/events/ts=20130111-22/events.log"
"log/events/ts=20130111-23/events.log"
"log/events/ts=20130112-00/events.log"
```
"log/events/ts=20130111-22/events.log"
"log/events/ts=20130111-23/events.log"
"log/events/ts=20130112-00/events.log"
The [fluent-mixin-config-placeholders](https://github.com/tagomoris/fluent-mixin-config-placeholders) mixin is also incorporated, so additional variables such as %{hostname}, etc. can be used in the `azure_object_key_format`. This is useful in preventing filename conflicts when writing from multiple servers.
```
azure_object_key_format %{path}/events/ts=%{time_slice}/events-%{hostname}.log
```
azure_object_key_format %{path}/events/ts=%{time_slice}/events-%{hostname}.log
### `time_slice_format`
Format of the time used in the file name. Default is '%Y%m%d'. Use '%Y%m%d%H' to split files hourly.
### Run tests
$ gem install bundler
$ bundle install
$ bundle exec rake test
# Contributing
gem install bundler
bundle install
bundle exec rake test
## Contributing
This project welcomes contributions and suggestions. Most contributions require you to agree to a
Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us
the rights to use your contribution. For details, visit https://cla.microsoft.com.
the rights to use your contribution. For details, visit [https://cla.microsoft.com](https://cla.microsoft.com).
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide
a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions

Просмотреть файл

@ -1,11 +1,11 @@
require "bundler"
require 'bundler'
Bundler::GemHelper.install_tasks
require "rake/testtask"
require 'rake/testtask'
Rake::TestTask.new(:test) do |t|
t.libs.push("lib", "test")
t.test_files = FileList["test/**/test_*.rb"]
t.libs.push('lib', 'test')
t.test_files = FileList['test/**/test_*.rb']
t.verbose = true
t.warning = true
end

Просмотреть файл

@ -2,27 +2,27 @@ lib = File.expand_path("../lib", __FILE__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
Gem::Specification.new do |spec|
spec.name = "fluent-plugin-azure-storage-append-blob"
spec.version = "0.1.1"
spec.authors = ["Microsoft Corporation"]
spec.email = [""]
spec.name = 'fluent-plugin-azure-storage-append-blob'
spec.version = '0.2.0'
spec.authors = ['Microsoft Corporation']
spec.email = ['']
spec.summary = "Azure Storage Append Blob output plugin for Fluentd event collector"
spec.description = "Fluentd plugin to upload logs to Azure Storage append blobs."
spec.homepage = "https://github.com/Microsoft/fluent-plugin-azure-storage-append-blob"
spec.license = "MIT"
spec.summary = 'Azure Storage Append Blob output plugin for Fluentd event collector'
spec.description = 'Fluentd plugin to upload logs to Azure Storage append blobs.'
spec.homepage = 'https://github.com/Microsoft/fluent-plugin-azure-storage-append-blob'
spec.license = 'MIT'
test_files, files = `git ls-files -z`.split("\x0").partition do |f|
test_files, files = `git ls-files -z`.split("\x0").partition do |f|
f.match(%r{^(test|spec|features)/})
end
spec.files = files
spec.executables = files.grep(%r{^bin/}) { |f| File.basename(f) }
spec.test_files = test_files
spec.require_paths = ["lib"]
spec.files = files
spec.executables = files.grep(%r{^bin/}) { |f| File.basename(f) }
spec.test_files = test_files
spec.require_paths = ['lib']
spec.add_development_dependency "bundler", "~> 1.14"
spec.add_development_dependency "rake", "~> 12.0"
spec.add_development_dependency "test-unit", "~> 3.0"
spec.add_runtime_dependency "fluentd", [">= 0.14.10", "< 2"]
spec.add_runtime_dependency "azure-storage-blob", "~> 1.0"
spec.add_development_dependency 'bundler', '~> 2.0'
spec.add_development_dependency 'rake', '~> 13.0'
spec.add_development_dependency 'test-unit', '~> 3.0'
spec.add_runtime_dependency 'azure-storage-blob', '~> 2.0'
spec.add_runtime_dependency 'fluentd', ['>= 0.14.10', '< 2']
end

Просмотреть файл

@ -3,103 +3,133 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
#--------------------------------------------------------------------------------------------*/
require 'fluent/plugin/output'
require 'azure/storage/common'
require 'azure/storage/blob'
require 'time'
require 'tempfile'
require 'faraday'
require 'fluent/plugin/output'
require 'json'
module Fluent
module Plugin
class AzureStorageAppendBlobOut < Fluent::Plugin::Output
Fluent::Plugin.register_output("azure-storage-append-blob", self)
Fluent::Plugin.register_output('azure-storage-append-blob', self)
helpers :formatter, :inject
DEFAULT_FORMAT_TYPE = "out_file"
DEFAULT_FORMAT_TYPE = 'out_file'.freeze
AZURE_BLOCK_SIZE_LIMIT = 4 * 1024 * 1024 - 1
config_param :path, :string, :default => ""
config_param :azure_storage_account, :string, :default => nil
config_param :azure_storage_access_key, :string, :default => nil, :secret => true
config_param :azure_storage_sas_token, :string, :default => nil, :secret => true
config_param :azure_container, :string, :default => nil
config_param :azure_object_key_format, :string, :default => "%{path}%{time_slice}-%{index}.log"
config_param :auto_create_container, :bool, :default => true
config_param :format, :string, :default => DEFAULT_FORMAT_TYPE
config_param :time_slice_format, :string, :default => '%Y%m%d'
config_param :path, :string, default: ''
config_param :azure_storage_account, :string, default: nil
config_param :azure_storage_access_key, :string, default: nil, secret: true
config_param :azure_storage_sas_token, :string, default: nil, secret: true
config_param :azure_container, :string, default: nil
config_param :azure_imds_api_version, :string, default: '2019-08-15'
config_param :azure_token_refresh_interval, :integer, default: 60
config_param :use_msi, :bool, default: false
config_param :azure_object_key_format, :string, default: '%{path}%{time_slice}-%{index}.log'
config_param :auto_create_container, :bool, default: true
config_param :format, :string, default: DEFAULT_FORMAT_TYPE
config_param :time_slice_format, :string, default: '%Y%m%d'
config_param :localtime, :bool, default: false
config_section :format do
config_set_default :@type, DEFAULT_FORMAT_TYPE
end
config_section :buffer do
config_set_default :chunk_keys, ['time']
config_set_default :timekey, (60 * 60 * 24)
end
attr_reader :bs
def configure(conf)
super
@formatter = formatter_create
if @localtime
@path_slicer = Proc.new {|path|
Time.now.strftime(path)
}
else
@path_slicer = Proc.new {|path|
Time.now.utc.strftime(path)
}
end
if @azure_container.nil?
raise ConfigError, 'azure_container is needed'
end
@path_slicer = if @localtime
proc do |path|
Time.now.strftime(path)
end
else
proc do |path|
Time.now.utc.strftime(path)
end
end
raise ConfigError, 'azure_storage_account needs to be specified' if @azure_storage_account.nil?
raise ConfigError, 'azure_container needs to be specified' if @azure_container.nil?
if @azure_storage_access_key.nil? && @azure_storage_sas_token.nil?
raise ConfigError, "either 'azure_storage_access_key' or 'azure_storage_sas_token' parameter must be provided"
log.info 'Using MSI since neither azure_storage_access_key nor azure_storage_sas_token was provided.'
@use_msi = true
end
end
def multi_workers_ready?
true
end
def get_access_token
access_key_request = Faraday.new('http://169.254.169.254/metadata/identity/oauth2/token?' \
"api-version=#{@azure_imds_api_version}" \
'&resource=https://storage.azure.com/',
headers: { 'Metadata' => 'true' })
.get
.body
JSON.parse(access_key_request)['access_token']
end
def start
super
@bs_params = {storage_account_name: @azure_storage_account}
if @use_msi
token_credential = Azure::Storage::Common::Core::TokenCredential.new get_access_token
token_signer = Azure::Storage::Common::Core::Auth::TokenSigner.new token_credential
@bs = Azure::Storage::Blob::BlobService.new(storage_account_name: @azure_storage_account, signer: token_signer)
if !@azure_storage_access_key.nil?
@bs_params.merge!({storage_access_key: @azure_storage_access_key})
refresh_interval = @azure_token_refresh_interval * 60
cancelled = false
renew_token = Thread.new do
Thread.stop
until cancelled
sleep(refresh_interval)
token_credential.renew_token get_access_token
end
end
sleep 0.1 while renew_token.status != 'sleep'
renew_token.run
else
@bs_params = { storage_account_name: @azure_storage_account }
if !@azure_storage_access_key.nil?
@bs_params.merge!({ storage_access_key: @azure_storage_access_key })
elsif !@azure_storage_sas_token.nil?
@bs_params.merge!({ storage_sas_token: @azure_storage_sas_token })
end
@bs = Azure::Storage::Blob::BlobService.create(@bs_params)
end
if !@azure_storage_sas_token.nil?
@bs_params.merge!({storage_sas_token: @azure_storage_sas_token})
end
@bs = Azure::Storage::Blob::BlobService.create(@bs_params)
ensure_container
@azure_storage_path = ''
@last_azure_storage_path = ''
@current_index = 0
end
def format(tag, time, record)
r = inject_values_to_record(tag, time, record)
@formatter.format(tag, time, r)
end
def write(chunk)
metadata = chunk.metadata
tmp = Tempfile.new("azure-")
tmp = Tempfile.new('azure-')
begin
chunk.write_to(tmp)
tmp.close
generate_log_name(metadata, @current_index)
if @last_azure_storage_path != @azure_storage_path
@ -107,18 +137,23 @@ module Fluent
generate_log_name(metadata, @current_index)
end
content = File.open(tmp.path, 'rb') { |file| file.read }
content = File.open(tmp.path, 'rb', &:read)
append_blob(content, metadata)
@last_azure_storage_path = @azure_storage_path
ensure
tmp.unlink
begin
tmp.close(true)
rescue StandardError
nil
end
end
end
private
def ensure_container
if ! @bs.list_containers.find { |c| c.name == @azure_container }
unless @bs.list_containers.find { |c| c.name == @azure_container }
if @auto_create_container
@bs.create_container(@azure_container)
else
@ -128,24 +163,26 @@ module Fluent
end
private
def generate_log_name(metadata, index)
time_slice = if metadata.timekey.nil?
''.freeze
else
Time.at(metadata.timekey).utc.strftime(@time_slice_format)
end
end
path = @path_slicer.call(@path)
values_for_object_key = {
"%{path}" => path,
"%{time_slice}" => time_slice,
"%{index}" => index
'%{path}' => path,
'%{time_slice}' => time_slice,
'%{index}' => index
}
storage_path = @azure_object_key_format.gsub(%r(%{[^}]+}), values_for_object_key)
storage_path = @azure_object_key_format.gsub(/%{[^}]+}/, values_for_object_key)
@azure_storage_path = extract_placeholders(storage_path, metadata)
end
private
def append_blob(content, metadata)
position = 0
log.debug "azure_storage_append_blob: append_blob.start: Content size: #{content.length}"
@ -156,8 +193,8 @@ module Fluent
@bs.append_blob_block(@azure_container, @azure_storage_path, content[position..position + size])
position += size
break if position >= content.length
rescue Azure::Core::Http::HTTPError => ex
status_code = ex.status_code
rescue Azure::Core::Http::HTTPError => e
status_code = e.status_code
if status_code == 409 # exceeds azure block limit
@current_index += 1
@ -166,7 +203,7 @@ module Fluent
# If index is not a part of format, rethrow exception.
if old_azure_storage_path == @azure_storage_path
log.warn "azure_storage_append_blob: append_blob: blocks limit reached, you need to use %{index} for the format."
log.warn 'azure_storage_append_blob: append_blob: blocks limit reached, you need to use %{index} for the format.'
raise
end
@ -180,9 +217,8 @@ module Fluent
end
end
end
log.debug "azure_storage_append_blob: append_blob.complete"
log.debug 'azure_storage_append_blob: append_blob.complete'
end
end
end
end

Просмотреть файл

@ -1,8 +1,8 @@
$LOAD_PATH.unshift(File.expand_path("../../", __FILE__))
require "test-unit"
require "fluent/test"
require "fluent/test/driver/output"
require "fluent/test/helpers"
$LOAD_PATH.unshift(File.expand_path('..', __dir__))
require 'test-unit'
require 'fluent/test'
require 'fluent/test/driver/output'
require 'fluent/test/helpers'
Test::Unit::TestCase.include(Fluent::Test::Helpers)
Test::Unit::TestCase.extend(Fluent::Test::Helpers)

Просмотреть файл

@ -8,32 +8,41 @@ class AzureStorageAppendBlobOutTest < Test::Unit::TestCase
Fluent::Test.setup
end
CONFIG = %[
CONFIG = %(
azure_storage_account test_storage_account
azure_storage_access_key MY_FAKE_SECRET
azure_container test_container
time_slice_format %Y%m%d-%H
path log
]
).freeze
def create_driver(conf=CONFIG)
MSI_CONFIG = %(
azure_storage_account test_storage_account
azure_container test_container
azure_imds_api_version 1970-01-01
azure_token_refresh_interval 120
time_slice_format %Y%m%d-%H
path log
).freeze
def create_driver(conf = CONFIG)
Fluent::Test::Driver::Output.new(Fluent::Plugin::AzureStorageAppendBlobOut).configure(conf)
end
sub_test_case 'test config' do
test 'config should reject with no azure container' do
assert_raise Fluent::ConfigError do
create_driver(%[
create_driver(%(
azure_storage_account test_storage_account
azure_storage_access_key MY_FAKE_SECRET
time_slice_format %Y%m%d-%H
time_slice_wait 10m
path log
])
))
end
end
test 'config should set instance variables' do
test 'config with access key should set instance variables' do
d = create_driver
assert_equal 'test_storage_account', d.instance.azure_storage_account
assert_equal 'MY_FAKE_SECRET', d.instance.azure_storage_access_key
@ -41,26 +50,37 @@ class AzureStorageAppendBlobOutTest < Test::Unit::TestCase
assert_equal true, d.instance.auto_create_container
assert_equal '%{path}%{time_slice}-%{index}.log', d.instance.azure_object_key_format
end
test 'config with managed identity enabled should set instance variables' do
d = create_driver(MSI_CONFIG)
assert_equal 'test_storage_account', d.instance.azure_storage_account
assert_equal 'test_container', d.instance.azure_container
assert_equal true, d.instance.use_msi
assert_equal true, d.instance.auto_create_container
assert_equal '%{path}%{time_slice}-%{index}.log', d.instance.azure_object_key_format
assert_equal 120, d.instance.azure_token_refresh_interval
assert_equal '1970-01-01', d.instance.azure_imds_api_version
end
end
sub_test_case 'test path slicing' do
test 'test path_slicing' do
config = CONFIG.clone.gsub(/path\slog/, "path log/%Y/%m/%d")
config = CONFIG.clone.gsub(/path\slog/, 'path log/%Y/%m/%d')
d = create_driver(config)
path_slicer = d.instance.instance_variable_get(:@path_slicer)
path = d.instance.instance_variable_get(:@path)
slice = path_slicer.call(path)
assert_equal slice, Time.now.utc.strftime("log/%Y/%m/%d")
assert_equal slice, Time.now.utc.strftime('log/%Y/%m/%d')
end
test 'path slicing utc' do
config = CONFIG.clone.gsub(/path\slog/, "path log/%Y/%m/%d")
config = CONFIG.clone.gsub(/path\slog/, 'path log/%Y/%m/%d')
config << "\nutc\n"
d = create_driver(config)
path_slicer = d.instance.instance_variable_get(:@path_slicer)
path = d.instance.instance_variable_get(:@path)
slice = path_slicer.call(path)
assert_equal slice, Time.now.utc.strftime("log/%Y/%m/%d")
assert_equal slice, Time.now.utc.strftime('log/%Y/%m/%d')
end
end
end