Initial commit of the application insights plugin

This commit is contained in:
yantang 2018-05-01 09:27:31 -07:00
Родитель 1bb4412acb
Коммит 8641f2228a
8 изменённых файлов: 817 добавлений и 9 удалений

3
Gemfile Normal file
Просмотреть файл

@ -0,0 +1,3 @@
source "https://rubygems.org"
gemspec

103
README.md
Просмотреть файл

@ -1,14 +1,99 @@
# fluent-plugin-application-insights
This is the [Fluentd](https://fluentd.org/) output plugin for [Azure Application Insights](https://docs.microsoft.com/en-us/azure/application-insights/)
# Contributing
Application Insights is an extensible Application Performance Management (APM) service for web developers on multiple platforms.
Use it to monitor your live web application. It will automatically detect performance anomalies. It includes powerful analytics
tools to help you diagnose issues and to understand what users actually do with your app.
It's designed to help you continuously improve performance and usability.
This project welcomes contributions and suggestions. Most contributions require you to agree to a
Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us
the rights to use your contribution. For details, visit https://cla.microsoft.com.
## Installation
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide
a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions
provided by the bot. You will only need to do this once across all repos using our CLA.
```
$ gem install fluent-plugin-application-insights
```
## Configuration
To send data to Application Insights, add the following piece to your fluentd configuration file:
```
<match **>
@type application_insights
instrumentation_key <your instrumentation key>
</match>
```
Here is the configuration options for this plugin:
* `instrumentation_key` - Required, the Application Insights instrumentation key
* `send_buffer_size` - The batch size to send data to Application Insights service (default `1000`). Setting this to a large size will usually have better output throughput.
* `standard_schema` - The parameter indicating whether the record is in standard schema. i.e., the format that is recognized by Application Insights backend (default `false`).
If the record is not in standard schema, it will be tracked as Application Insights trace telemetry. Otherwise, the record is just forwarded to the backend. See [Standard Schema](#standard-schema) for more info.
* `message_property` - The property name for the trace message (default `message`).
* `time_property` - The property name for the timestamp (default `nil`). Fluentd will assign a timestamp for each record by default, use this property only if you want a custom timestamp.
* `severity_property` - The property name for severity level (default `severity`). If the severity property doesn't exist, the record will be treated as information level. See [Severity Level](https://docs.microsoft.com/en-us/azure/application-insights/application-insights-data-model-trace-telemetry#severity-level) for more info.
* `severity_level_verbose` - The value of severity property that maps to Application Insights' verbose severity level (default `verbose`).
* `severity_level_information` - The value of severity property that maps to Application Insights' information severity level (default `information`).
* `severity_level_warning` - The value of severity property that maps to Application Insights' warning severity level (default `warning`).
* `severity_level_error` - The value of severity property that maps to Application Insights' error severity level (default `error`).
* `severity_level_critical` - The value of severity property that maps to Application Insights' critical severity level (default `critical`).
* `context_tag_sources` - The dictionary that instructs the Application Insights plugin to set Application Insights context tags using record properties. In this dictionary keys are Application Insights context tags to set, and values are names of properties to use as source of data. For example:
```
context_tag_sources {
"ai.cloud.role": "kubernetes_container_name",
"ai.cloud.roleInstance": "kubernetes_pod_name"
}
```
Here is the list of all [context tag keys](https://github.com/Microsoft/ApplicationInsights-dotnet/blob/develop/Schema/PublicSchema/ContextTagKeys.bond)
## Standard Schema
The standard schema for Application Insights telemetry is defined [here](https://github.com/Microsoft/ApplicationInsights-dotnet/tree/develop/Schema/PublicSchema).
Below is an example of a Request telemetry in standard schema format. `name`, `time`, `data`, `data.baseType` and `data.baseData` are required properties. Different telemetry types will have different properties associated with the `baseData` object.
```
{
"name": "Microsoft.ApplicationInsights.Request",
"time": "2018-02-28T00:24:00.5676240Z",
"tags":{
"ai.cloud.role": "webfront",
"ai.cloud.roleInstance":"85a1e424491d07b6c1ed032f"
},
"data": {
"baseType": "RequestData",
"baseData": {
"ver":2,
"id":"|85a1e424-491d07b6c1ed032f.",
"name":"PUT Flights/StartNewFlightAsync",
"duration":"00:00:01.0782934",
"success":true,
"responseCode":"204",
"url":"http://localhost:5023/api/flights
}
}
```
## Development
### Build Gem
Run ```gem build fluent-plugin-application-insights.gemspec```.
### Run Test
Make sure you have bundler installed, you can install it by ```sudo gem install bundler```. And run ```bundler install``` once to install all dependencies.
Run ```rake test```.
## Contributing
This project welcomes contributions and suggestions. Most contributions require you to
agree to a Contributor License Agreement (CLA) declaring that you have the right to,
and actually do, grant us the rights to use your contribution. For details, visit
https://cla.microsoft.com.
When you submit a pull request, a CLA-bot will automatically determine whether you need
to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the
instructions provided by the bot. You will only need to do this once across all repositories using our CLA.
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or
contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/)
or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.

13
Rakefile Normal file
Просмотреть файл

@ -0,0 +1,13 @@
require "bundler"
Bundler::GemHelper.install_tasks
require "rake/testtask"
Rake::TestTask.new(:test) do |t|
t.libs.push("lib", "test")
t.test_files = FileList["test/**/test_*.rb"]
t.verbose = true
t.warning = true
end
task default: [:test]

Просмотреть файл

@ -0,0 +1,34 @@
lib = File.expand_path("../lib", __FILE__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
Gem::Specification.new do |spec|
spec.name = "fluent-plugin-application-insights"
spec.version = "0.1.0"
spec.authors = ["Microsoft Corporation"]
spec.email = ["azdiag@microsoft.com"]
spec.summary = "This is the fluentd output plugin for Azure Application Insights."
spec.description = <<-eos
Fluentd output plugin for Azure Application Insights.
Application Insights is an extensible Application Performance Management (APM) service for web developers on multiple platforms.
Use it to monitor your live web application. It will automatically detect performance anomalies. It includes powerful analytics
tools to help you diagnose issues and to understand what users actually do with your app.
It's designed to help you continuously improve performance and usability.
eos
spec.homepage = "https://github.com/Microsoft/fluent-plugin-application-insights"
spec.license = "MIT"
test_files, files = `git ls-files -z`.split("\x0").partition do |f|
f.match(%r{^(test|spec|features)/})
end
spec.files = files.push('lib/fluent/plugin/out_application_insights.rb')
spec.executables = files.grep(%r{^bin/}) { |f| File.basename(f) }
spec.test_files = test_files
spec.require_paths = ["lib"]
spec.add_development_dependency "bundler", "~> 1.14"
spec.add_development_dependency "rake", "~> 12.0"
spec.add_development_dependency "test-unit", "~> 3.0"
spec.add_runtime_dependency "fluentd", [">= 0.14.10", "< 2"]
spec.add_runtime_dependency "application_insights", "~> 0.5.5"
end

Просмотреть файл

@ -0,0 +1,201 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
require 'json'
require 'fluent/plugin/output'
require 'application_insights'
module Fluent::Plugin
class ApplicationInsightsOutput < Output
Fluent::Plugin.register_output("application_insights", self)
attr_accessor :tc
# The Application Insights instrumentation key
config_param :instrumentation_key, :string
# The batch size to send data to Application Insights service.
config_param :send_buffer_size, :integer, default: 1000
# The parameter indication whether the record is in standard schema. i.e., the format that is recognized by Application Insights backend.
config_param :standard_schema, :bool, default: false
# The property name for the message. It will be ignored if the record is in standard schema.
config_param :message_property, :string, default: 'message'
# The property name for the timestamp. It will be ignored if the record is in standard schema.
config_param :time_property, :string, default: nil
# The property name for severity level. It will be ignored if the record is in standard schema.
config_param :severity_property, :string, default: 'severity'
# The value of severity property that maps to Application Insights' verbose severity level.
config_param :severity_level_verbose, :string, default: 'verbose'
# The value of severity property that maps to Application Insights' information severity level.
config_param :severity_level_information, :string, default: 'information'
# The value of severity property that maps to Application Insights' warning severity level.
config_param :severity_level_warning, :string, default: 'warning'
# The value of severity property that maps to Application Insights' error severity level.
config_param :severity_level_error, :string, default: 'error'
# The value of severity property that maps to Application Insights' critical severity level.
config_param :severity_level_critical, :string, default: 'critical'
# The dictionary that instructs the Application Insights plugin to set Application Insights context tags using record properties.
# In this dictionary keys are Application Insights context tags to set, and values are names of properties to use as source of data.
config_param :context_tag_sources, :hash, default: {}, value_type: :string
TELEMETRY_TYPES = ["RequestData", "RemoteDependencyData", "MessageData", "ExceptionData", "EventData", "MetricData", "PageViewData", "AvailabilityData"]
def configure(conf)
super
@severity_level_mapping = {}
@severity_level_mapping[@severity_level_verbose.downcase] = Channel::Contracts::SeverityLevel::VERBOSE
@severity_level_mapping[@severity_level_information.downcase] = Channel::Contracts::SeverityLevel::INFORMATION
@severity_level_mapping[@severity_level_warning.downcase] = Channel::Contracts::SeverityLevel::WARNING
@severity_level_mapping[@severity_level_error.downcase] = Channel::Contracts::SeverityLevel::ERROR
@severity_level_mapping[@severity_level_critical.downcase] = Channel::Contracts::SeverityLevel::CRITICAL
context_tag_keys = []
context_tag_keys.concat Channel::Contracts::Application.json_mappings.values
context_tag_keys.concat Channel::Contracts::Cloud.json_mappings.values
context_tag_keys.concat Channel::Contracts::Device.json_mappings.values
context_tag_keys.concat Channel::Contracts::Internal.json_mappings.values
context_tag_keys.concat Channel::Contracts::Location.json_mappings.values
context_tag_keys.concat Channel::Contracts::Operation.json_mappings.values
context_tag_keys.concat Channel::Contracts::Session.json_mappings.values
context_tag_keys.concat Channel::Contracts::User.json_mappings.values
context_tag_sources.keys.each do |tag|
raise ArgumentError.new("Context tag '#{tag}' is invalid!") unless context_tag_keys.include?(tag)
end
end
def start
super
sender = Channel::AsynchronousSender.new
queue = Channel::AsynchronousQueue.new sender
channel = Channel::TelemetryChannel.new nil, queue
@tc = TelemetryClient.new @instrumentation_key, channel
@tc.channel.queue.max_queue_length = @send_buffer_size
@tc.channel.sender.send_buffer_size = @send_buffer_size
end
def shutdown
super
# Draining the events in the queue.
# We need to make sure the work thread has finished. Otherwise, it's possible that the queue is empty, but the http request to send the data is not finished.
# However, a drawback of waiting for the work thread to finish is that even if the events have been drained, it will still poll the queue for some time (default is 3 seconds, set by sender.send_time).
# This can be improved if the SDK exposes another variable indicating whether the work thread is sending data or just polling the queue.
while !@tc.channel.queue.empty? || @tc.channel.sender.work_thread != nil
# It's possible that the work thread has already exited but there are still items in the queue.
# https://github.com/Microsoft/ApplicationInsights-Ruby/blob/master/lib/application_insights/channel/asynchronous_sender.rb#L115
# Trigger flush to make the work thread working again in this case.
if @tc.channel.sender.work_thread == nil && !@tc.channel.queue.empty?
@tc.flush
end
sleep(1)
end
end
def process(tag, es)
es.each do |time, record|
# Convert the fluentd EventTime object to ruby Time object
time_ruby = Time.at(time.sec, time.nsec / 1000).utc
if @standard_schema
process_standard_schema_log record, time_ruby
else
process_non_standard_schema_log record, time_ruby
end
end
end
def process_standard_schema_log(record, time)
if record["name"] && record["data"] && record["data"].is_a?(Hash) && record["data"]["baseType"] && record["data"]["baseData"]
base_type = record["data"]["baseType"]
if TELEMETRY_TYPES.include? base_type
# If the record is processed by json parser plugin, e.g., in_http use it by default, the time property will be removed. Add it back in this case.
record["time"] ||= time.iso8601(7)
record["iKey"] = @instrumentation_key
set_context_standard_schema record
envelope = Channel::Contracts::Envelope.new
Channel::Contracts::Envelope.json_mappings.each do |attr, name|
envelope.send(:"#{attr}=", record[name]) if record[name]
end
@tc.channel.queue.push(envelope)
else
log.warn "Unknown telemetry type #{base_type}. Event will be treated as as non standard schema event."
process_non_standard_schema_log record, time
end
else
log.warn "The event does not meet the standard schema of Application Insights output. Missing name, data, baseType or baseData property. Event will be treated as as non standard schema event."
process_non_standard_schema_log record, time
end
end
def set_context_standard_schema(record)
return if @context_tag_sources.length == 0
record["tags"] = record["tags"] || {}
@context_tag_sources.each do |context_tag, source_property|
context_value = record.delete source_property
record["tags"][context_tag] = context_value if context_value
end
end
def process_non_standard_schema_log(record, time)
time = record.delete(@time_property) || time
context = get_context_non_standard_schema(record)
message = record.delete @message_property
severity_level_value = record.delete @severity_property
severity_level = severity_level_value ? @severity_level_mapping[severity_level_value.to_s.downcase] : nil
props = stringify_properties(record)
data = Channel::Contracts::MessageData.new(
:message => message || 'Null',
:severity_level => severity_level || Channel::Contracts::SeverityLevel::INFORMATION,
:properties => props || {}
)
@tc.channel.write(data, context, time)
end
def get_context_non_standard_schema(record)
context = Channel::TelemetryContext.new
context.instrumentation_key = @instrumentation_key
return context if @context_tag_sources.length == 0
@context_tag_sources.each do |context_tag, source_property|
if record[source_property]
set_context_tag context, context_tag, record[source_property]
end
end
return context
end
def set_context_tag(context, tag_name, tag_value)
context_set = [context.application, context.cloud, context.device, context.location, context.operation, context.session, context.user]
context_set.each do |c|
c.class.json_mappings.each do |attr, name|
if (name == tag_name)
c.send(:"#{attr}=", tag_value)
return
end
end
end
end
def stringify_properties(record)
# If the property value is a json object or array, e.g., {"prop": {"inner_prop": value}}, it needs to be serialized.
# Otherwise, the property will become {"prop": "[object Object]"} in the final telemetry.
# The stringified property can be queried as described here: https://docs.loganalytics.io/docs/Language-Reference/Scalar-functions/parse_json()
record.each do |key, value|
if value.is_a?(Hash) || value.is_a?(Array)
record[key] = JSON.generate(value)
end
end
record
end
end
end

8
test/helper.rb Normal file
Просмотреть файл

@ -0,0 +1,8 @@
$LOAD_PATH.unshift(File.expand_path("../../", __FILE__))
require "test-unit"
require "fluent/test"
require "fluent/test/driver/output"
require "fluent/test/helpers"
Test::Unit::TestCase.include(Fluent::Test::Helpers)
Test::Unit::TestCase.extend(Fluent::Test::Helpers)

Просмотреть файл

@ -0,0 +1,37 @@
require "application_insights"
class MockSender < Channel::SenderBase
def initialize
end
def start
end
end
class MockQueue
attr_accessor :sender
attr_accessor :queue
def initialize(sender)
@sender = sender
@queue = []
end
def push(item)
return unless item
@queue << item
end
def empty?
@queue.empty?
end
def [](index)
@queue[index]
end
def flush
@queue = []
end
end

Просмотреть файл

@ -0,0 +1,427 @@
require "helper"
require "fluent/plugin/out_application_insights.rb"
require_relative "mock_client.rb"
class ApplicationInsightsOutputTest < Test::Unit::TestCase
setup do
Fluent::Test.setup
end
CONFIG = %[
instrumentation_key ikey
]
def create_driver(conf = CONFIG)
driver = Fluent::Test::Driver::Output.new(Fluent::Plugin::ApplicationInsightsOutput).configure(conf)
driver.instance.start
sender = MockSender.new
queue = MockQueue.new sender
channel = ApplicationInsights::Channel::TelemetryChannel.new nil, queue
driver.instance.tc = ApplicationInsights::TelemetryClient.new "iKey", channel
return driver
end
sub_test_case 'configure' do
test 'invalid context tag key' do
config = %[
instrumentation_key ikey
context_tag_sources invalid_tag_name:kubernetes_container_name
]
assert_raise ArgumentError.new("Context tag 'invalid_tag_name' is invalid!") do
create_driver config
end
end
end
sub_test_case 'process standard schema event' do
setup do
config = %[
instrumentation_key ikey
standard_schema true
]
@d = create_driver config
end
test 'ikey and timestamps are added if empty' do
time = event_time("2011-01-02 13:14:15 UTC")
@d.run(default_tag: 'test', shutdown: false) do
@d.feed(time, {"name" => "telemetry name", "data" => { "baseType" => "RequestData", "baseData" => {} }})
end
envelope = @d.instance.tc.channel.queue[0]
assert_equal "ikey", envelope.i_key
assert_equal "2011-01-02T13:14:15.0000000Z", envelope.time
end
test 'event missing required properties is treated as non standard schema' do
time = event_time("2011-01-02 13:14:15 UTC")
@d.run(default_tag: 'test', shutdown: false) do
@d.feed(time, {"data" => {"baseType" => "RequestData", "baseData" => "data"}})
@d.feed(time, {"name" => "telemetry name"})
@d.feed(time, {"name" => "telemetry name", "data" => 2})
@d.feed(time, {"name" => "telemetry name", "data" => {}})
@d.feed(time, {"name" => "telemetry name", "data" => {"someprop" => "value"}})
@d.feed(time, {"name" => "telemetry name", "data" => {"baseType" => "type"}})
@d.feed(time, {"name" => "telemetry name", "data" => {"baseData" => "data"}})
end
logs = @d.instance.log.out.logs
assert_equal 7, logs.length
assert_true logs.all?{ |log| log.include?("The event does not meet the standard schema of Application Insights output. Missing name, data, baseType or baseData property.") }
end
test 'event with unknown data type is treated as non standard schema' do
time = event_time("2011-01-02 13:14:15 UTC")
@d.run(default_tag: 'test', shutdown: false) do
@d.feed(time, {"name" => "telemetry name", "data" => {"baseType" => "unknown", "baseData" => {}}})
end
logs = @d.instance.log.out.logs
assert_true logs.all?{ |log| log.include?("Unknown telemetry type unknown") }
end
end
sub_test_case 'set context on standard schema event' do
test 'context tag sources is empty' do
config = %[
instrumentation_key ikey
standard_schema true
context_tag_sources {}
]
d = create_driver config
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test', shutdown: false) do
d.feed(time, {
"name" => "telemetry name",
"data" => { "baseType" => "RequestData", "baseData" => {} },
"kubernetes_container_name" => "frontend"
})
end
envelope = d.instance.tc.channel.queue[0]
assert_true envelope.tags.length == 0
end
test 'context tag sources does not exist on record' do
config = %[
instrumentation_key ikey
standard_schema true
context_tag_sources ai.cloud.role:kubernetes_container_name
]
d = create_driver config
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test', shutdown: false) do
d.feed(time, {
"name" => "telemetry name",
"data" => { "baseType" => "RequestData", "baseData" => {} }
})
end
envelope = d.instance.tc.channel.queue[0]
assert_not_nil envelope.tags
assert_nil envelope.tags["ai.cloud.role"]
end
test 'context is updated according to context tag keys' do
config = %[
instrumentation_key ikey
standard_schema true
context_tag_sources ai.cloud.role:kubernetes_container_name
]
d = create_driver config
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test', shutdown: false) do
d.feed(time, {
"name" => "telemetry name",
"data" => { "baseType" => "RequestData", "baseData" => {} },
"kubernetes_container_name" => "frontend",
"other_prop" => "prop value"
})
end
envelope = d.instance.tc.channel.queue[0]
assert_not_nil envelope.tags
assert_equal "frontend", envelope.tags["ai.cloud.role"]
# Extra property that is not part of Envelope (kubernetes_container_name, other_prop) is ignored
assert_nil envelope.data["baseData"]["properties"]
end
test 'multiple context tag keys' do
config = %[
instrumentation_key ikey
standard_schema true
context_tag_sources {
"ai.cloud.role": "kubernetes_container_name",
"ai.cloud.roleInstance": "kubernetes_container_id"
}
]
d = create_driver config
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test', shutdown: false) do
d.feed(time, {
"name" => "telemetry name",
"data" => { "baseType" => "RequestData", "baseData" => {} },
"kubernetes_container_name" => "frontend",
"kubernetes_container_id" => "c42c557e1615511dd3a3cb1d6e8f14984464bb0f"
})
end
envelope = d.instance.tc.channel.queue[0]
assert_not_nil envelope.tags
assert_equal "frontend", envelope.tags["ai.cloud.role"]
assert_equal "c42c557e1615511dd3a3cb1d6e8f14984464bb0f", envelope.tags["ai.cloud.roleInstance"]
end
end
sub_test_case 'process non standard schema event' do
test 'empty message' do
d = create_driver
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test', shutdown: false) do
d.feed(time, {"prop" => "value"})
end
assert_equal 1, d.instance.tc.channel.queue.queue.length
envelope = d.instance.tc.channel.queue[0]
assert_equal "Null", envelope.data.base_data.message
assert_equal envelope.data.base_data.properties, {"prop" => "value"}
end
test 'custom timestamp take precedence over fluentd timestamp' do
config = %[
instrumentation_key ikey
time_property time
]
d = create_driver config
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test', shutdown: false) do
d.feed(time, {"time" => "2010-10-01"})
end
envelope = d.instance.tc.channel.queue[0]
assert_equal "2010-10-01", envelope.time
end
test 'custom timestamp format is not ensured to be valid' do
config = %[
instrumentation_key ikey
time_property time
]
d = create_driver config
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test', shutdown: false) do
d.feed(time, {"time" => "custom time"})
end
envelope = d.instance.tc.channel.queue[0]
assert_equal "custom time", envelope.time
end
test 'timestamp is in iso8601 format' do
d = create_driver
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test', shutdown: false) do
d.feed(time, {"message" => "log message"})
end
envelope = d.instance.tc.channel.queue[0]
assert_equal "2011-01-02T13:14:15.0000000Z", envelope.time
end
test 'custom message property' do
config = %[
instrumentation_key ikey
message_property custom_message_property
]
d = create_driver config
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test', shutdown: false) do
d.feed(time, {"custom_message_property" => "custom message", "message" => "my message"})
end
envelope = d.instance.tc.channel.queue[0]
assert_equal "custom message", envelope.data.base_data.message
end
test 'custom severity level mapping' do
config = %[
instrumentation_key ikey
severity_property custom_severity_property
severity_level_error 100
]
d = create_driver config
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test', shutdown: false) do
d.feed(time, {"custom_severity_property" => 100, "message" => "my message"})
end
envelope = d.instance.tc.channel.queue[0]
assert_equal ApplicationInsights::Channel::Contracts::SeverityLevel::ERROR, envelope.data.base_data.severity_level
end
test 'properties are stringified' do
d = create_driver
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test', shutdown: false) do
d.feed(time, {"prop" => {"inner_prop1" => "value1", "inner_prop2" => "value2"}})
end
envelope = d.instance.tc.channel.queue[0]
assert_equal 1, envelope.data.base_data.properties.length
assert_equal "{\"inner_prop1\":\"value1\",\"inner_prop2\":\"value2\"}", envelope.data.base_data.properties["prop"]
end
end
sub_test_case 'set context on non standard schema event' do
test 'context tag sources is empty' do
config = %[
instrumentation_key ikey
context_tag_sources {}
]
d = create_driver config
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test', shutdown: false) do
d.feed(time, {
"message" => "my message",
"kubernetes_container_name" => "frontend"
})
end
envelope = d.instance.tc.channel.queue[0]
# The only tag is "ai.internal.sdkVersion", which is irrelevant to contaxt_tag_sources
assert_true envelope.tags.length == 1
assert_equal "ai.internal.sdkVersion", envelope.tags.keys[0]
assert_equal "ikey", envelope.i_key
end
test 'context tag sources does not exist on record' do
config = %[
instrumentation_key ikey
context_tag_sources ai.cloud.role:kubernetes_container_name
]
d = create_driver config
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test', shutdown: false) do
d.feed(time, {
"message" => "my message"
})
end
envelope = d.instance.tc.channel.queue[0]
assert_not_nil envelope.tags
assert_nil envelope.tags["ai.cloud.role"]
end
test 'context is updated according to context tag keys' do
config = %[
instrumentation_key ikey
context_tag_sources ai.cloud.role:kubernetes_container_name
]
d = create_driver config
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test', shutdown: false) do
d.feed(time, {
"message" => "my message",
"kubernetes_container_name" => "frontend",
"other_prop" => "prop value"
})
end
envelope = d.instance.tc.channel.queue[0]
assert_not_nil envelope.tags
assert_equal "frontend", envelope.tags["ai.cloud.role"]
assert_not_nil envelope.data.base_data.properties["kubernetes_container_name"]
assert_not_nil envelope.data.base_data.properties["other_prop"]
end
test 'multiple context tag keys' do
config = %[
instrumentation_key ikey
context_tag_sources {
"ai.cloud.role": "kubernetes_container_name",
"ai.cloud.roleInstance": "kubernetes_container_id"
}
]
d = create_driver config
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test', shutdown: false) do
d.feed(time, {
"message" => "my message",
"kubernetes_container_name" => "frontend",
"kubernetes_container_id" => "c42c557e1615511dd3a3cb1d6e8f14984464bb0f"
})
end
envelope = d.instance.tc.channel.queue[0]
assert_not_nil envelope.tags
assert_true envelope.tags.length == 3
assert_equal "frontend", envelope.tags["ai.cloud.role"]
assert_equal "c42c557e1615511dd3a3cb1d6e8f14984464bb0f", envelope.tags["ai.cloud.roleInstance"]
end
end
sub_test_case 'stringify_properties' do
test 'simple data type are not stringified' do
plugin = create_driver.instance
record = {prop1: 1, prop2: true, prop3: "value"}
actual = plugin.stringify_properties(record)
expected = {prop1: 1, prop2: true, prop3: "value"}
assert_equal expected, actual
end
test 'json and array property values are stringified' do
plugin = create_driver.instance
record = {prop1: 1, prop2: [1, 2, 3], prop3: {inner_prop: "value"}}
actual = plugin.stringify_properties(record)
expected = {prop1: 1, prop2: "[1,2,3]", prop3: "{\"inner_prop\":\"value\"}"}
assert_equal expected, actual
end
test 'stringify complicated property value' do
plugin = create_driver.instance
record = {
arr: [1, [2, [3, {inner: "value"}]]],
obj: {
arr: [1, {inarr: "inarr"}],
inobj: {
ininobj: {
prop: "value"
},
num: 3.14
}
}
}
actual = plugin.stringify_properties(record)
expected = {
:arr=> "[1,[2,[3,{\"inner\":\"value\"}]]]",
:obj=> "{\"arr\":[1,{\"inarr\":\"inarr\"}],\"inobj\":{\"ininobj\":{\"prop\":\"value\"},\"num\":3.14}}"
}
assert_equal expected, actual
end
end
end