From 066448c1c4418501b4d2159c47deb93cb628ac01 Mon Sep 17 00:00:00 2001 From: Tad Glines Date: Sat, 14 Nov 2020 14:25:40 -0800 Subject: [PATCH] Fix IO:DiscardAll bug and cgroup limits - Fixed bug in IO::DiscardAll that could lead to buffer overrun - Added unit test - Fixed PriorityQueue::Put to enforce item size limit - Modified auoms.service to include CPU limits - Modified so that auoms/auomscollect will continue to monitor and re-apply cgroup membership - Cleaned up some return value handling --- CGroups.cpp | 27 ++++++----- CGroups.h | 6 +-- Event.cpp | 2 +- Event.h | 7 ++- EventProcessorTests.cpp | 4 +- EventQueue.h | 3 +- EventTests.cpp | 40 ++++++++--------- ExecveConverterTests.cpp | 4 +- IO.cpp | 4 +- InputBuffer.h | 2 +- Metrics.cpp | 2 +- OperationalStatus.cpp | 2 +- OutputInputTests.cpp | 96 ++++++++++++++++++++++++++++++++++++++-- PriorityQueue.cpp | 10 +++-- PriorityQueue.h | 3 +- PriorityQueueTests.cpp | 71 +++++++++++++++++++---------- RawEventAccumulator.cpp | 65 ++++++++++++++------------- RawEventAccumulator.h | 4 +- RawEventRecord.cpp | 20 ++++----- RawEventRecord.h | 2 +- TestEventQueue.h | 4 +- auoms.cpp | 20 ++++++--- auoms.version | 4 +- auomscollect.cpp | 44 ++++++++++-------- installer/auoms.service | 2 + 25 files changed, 296 insertions(+), 152 deletions(-) diff --git a/CGroups.cpp b/CGroups.cpp index eefea0d..bb98288 100644 --- a/CGroups.cpp +++ b/CGroups.cpp @@ -48,34 +48,41 @@ uint64_t ReadUint64(const std::string& path) { } void CGroupCPU::AddSelf() { - AppendUint64(_dir + CGROUP_PROCS_FILE, 0); + auto self_pid = getpid(); + auto pids = GetProcs(); + if (pids.count(self_pid) == 0) { + AppendUint64(_dir + CGROUP_PROCS_FILE, 0); + } } void CGroupCPU::AddSelfThread() { - auto tid = syscall(SYS_gettid); - AppendUint64(_dir + CGROUP_TASKS_FILE, tid); + auto tid = CGroups::GetSelfThreadId(); + AddThread(tid); } void CGroupCPU::AddThread(long tid) { - AppendUint64(_dir + CGROUP_TASKS_FILE, tid); + auto tids = GetTasks(); + if (tids.count(tid) == 0) { + AppendUint64(_dir + CGROUP_TASKS_FILE, tid); + } } -std::vector CGroupCPU::GetProcs() { +std::unordered_set CGroupCPU::GetProcs() { auto lines = ReadFile(_dir + CGROUP_PROCS_FILE); - std::vector pids; + std::unordered_set pids; pids.reserve(lines.size()); for (auto& line : lines) { - pids.emplace_back(stoll(line)); + pids.emplace(stoll(line)); } return pids; } -std::vector CGroupCPU::GetTasks() { +std::unordered_set CGroupCPU::GetTasks() { auto lines = ReadFile(_dir + CGROUP_TASKS_FILE); - std::vector tids; + std::unordered_set tids; tids.reserve(lines.size()); for (auto& line : lines) { - tids.emplace_back(stoll(line)); + tids.emplace(stoll(line)); } return tids; } diff --git a/CGroups.h b/CGroups.h index f950c2b..48a79b8 100644 --- a/CGroups.h +++ b/CGroups.h @@ -17,7 +17,7 @@ #ifndef AUOMS_CGROUPS_H #define AUOMS_CGROUPS_H -#include +#include #include class CGroupCPU { @@ -28,8 +28,8 @@ public: void AddSelfThread(); void AddThread(long tid); - std::vector GetProcs(); - std::vector GetTasks(); + std::unordered_set GetProcs(); + std::unordered_set GetTasks(); uint64_t GetShares(); void SetShares(uint64_t val); diff --git a/Event.cpp b/Event.cpp index d2a290c..2097d22 100644 --- a/Event.cpp +++ b/Event.cpp @@ -328,7 +328,7 @@ int32_t EventBuilder::GetEventPid() { return EVENT_PID(_data); } -bool EventBuilder::EndEvent() { +int EventBuilder::EndEvent() { if (_data == nullptr) { throw std::runtime_error("Event not started!"); } diff --git a/Event.h b/Event.h index a1a8386..af08f43 100644 --- a/Event.h +++ b/Event.h @@ -239,8 +239,11 @@ private: class IEventBuilderAllocator { public: + // Return true on success, false if closed virtual bool Allocate(void** data, size_t size) = 0; - virtual bool Commit() = 0; + // Return 1 on success, 0 on closed, and -1 if item too large + virtual int Commit() = 0; + // Return true on success, false if closed virtual bool Rollback() = 0; }; @@ -279,7 +282,7 @@ public: uint16_t GetEventFlags(); void SetEventPid(int32_t pid); int32_t GetEventPid(); - bool EndEvent(); + int EndEvent(); bool CancelEvent(); bool BeginRecord(uint32_t record_type, const char* record_name, const char* record_text, uint16_t num_fields); bool BeginRecord(uint32_t record_type, const std::string_view& record_name, const std::string_view& record_text, uint16_t num_fields); diff --git a/EventProcessorTests.cpp b/EventProcessorTests.cpp index 6f20fcb..7940cce 100644 --- a/EventProcessorTests.cpp +++ b/EventProcessorTests.cpp @@ -59,10 +59,10 @@ public: return true; } - bool Commit() override { + int Commit() override { _proc->ProcessData(_buffer.data(), _size); _size = 0; - return true; + return 1; } bool Rollback() override { diff --git a/EventQueue.h b/EventQueue.h index d98e502..8067ddd 100644 --- a/EventQueue.h +++ b/EventQueue.h @@ -34,7 +34,8 @@ public: return true; } - bool Commit() override { + // Return 1 on success, 0 on queue closed, and -1 if item was too large + int Commit() override { Event event(_buffer.data(), _size); auto ret = _queue->Put(event.Priority(), _buffer.data(), _size); _size = 0; diff --git a/EventTests.cpp b/EventTests.cpp index 022f674..e96523e 100644 --- a/EventTests.cpp +++ b/EventTests.cpp @@ -35,44 +35,44 @@ BOOST_AUTO_TEST_CASE( test ) EventBuilder builder(event_queue, DefaultPrioritizer::Create(0)); - int ret = builder.BeginEvent(1, 3, 4, 2); - if (ret != 1) { + bool ret = builder.BeginEvent(1, 3, 4, 2); + if (!ret) { BOOST_FAIL("BeginEvent failed: " + std::to_string(ret)); } ret = builder.BeginRecord(1, "test1", "raw record text1", 3); - if (ret != 1) { + if (!ret) { BOOST_FAIL("BeginRecord failed: " + std::to_string(ret)); } ret = builder.AddField("field1", "raw1", "interp1", field_type_t::UNCLASSIFIED); - if (ret != 1) { + if (!ret) { BOOST_FAIL("AddField failed: " + std::to_string(ret)); } ret = builder.AddField("field2", "2", "user2", field_type_t::UID); - if (ret != 1) { + if (!ret) { BOOST_FAIL("AddField failed: " + std::to_string(ret)); } ret = builder.AddField("field3", "raw3", "interp3", field_type_t::UNCLASSIFIED); - if (ret != 1) { + if (!ret) { BOOST_FAIL("AddField failed: " + std::to_string(ret)); } ret = builder.EndRecord(); - if (ret != 1) { + if (!ret) { BOOST_FAIL("EndRecord failed: " + std::to_string(ret)); } ret = builder.BeginRecord(2, "test2", "raw record text2", 2); - if (ret != 1) { + if (!ret) { BOOST_FAIL("BeginRecord failed: " + std::to_string(ret)); } ret = builder.AddField("field1", "raw1", nullptr, field_type_t::UNCLASSIFIED); - if (ret != 1) { + if (!ret) { BOOST_FAIL("AddField failed: " + std::to_string(ret)); } ret = builder.AddField("field2", "raw2", "interp2", field_type_t::UNCLASSIFIED); - if (ret != 1) { + if (!ret) { BOOST_FAIL("AddField failed: " + std::to_string(ret)); } ret = builder.EndRecord(); - if (ret != 1) { + if (!ret) { BOOST_FAIL("EndRecord failed: " + std::to_string(ret)); } builder.SetEventFlags(5); @@ -180,39 +180,39 @@ BOOST_AUTO_TEST_CASE( test ) BOOST_CHECK_EQUAL(rec, event.end()); ret = builder.BeginEvent(1, 3, 4, 1); - if (ret != 1) { + if (!ret) { BOOST_FAIL("BeginEvent failed: " + std::to_string(ret)); } ret = builder.BeginRecord(1, "test1", "raw text", 6); - if (ret != 1) { + if (!ret) { BOOST_FAIL("BeginRecord failed: " + std::to_string(ret)); } ret = builder.AddField("field3", "raw3", "interp3", field_type_t::UNCLASSIFIED); - if (ret != 1) { + if (!ret) { BOOST_FAIL("AddField failed: " + std::to_string(ret)); } ret = builder.AddField("field6", "raw6", "interp6", field_type_t::UNCLASSIFIED); - if (ret != 1) { + if (!ret) { BOOST_FAIL("AddField failed: " + std::to_string(ret)); } ret = builder.AddField("field1", "raw1", "interp1", field_type_t::UNCLASSIFIED); - if (ret != 1) { + if (!ret) { BOOST_FAIL("AddField failed: " + std::to_string(ret)); } ret = builder.AddField("field4", "raw4", "interp4", field_type_t::UNCLASSIFIED); - if (ret != 1) { + if (!ret) { BOOST_FAIL("AddField failed: " + std::to_string(ret)); } ret = builder.AddField("field5", "raw5", "interp5", field_type_t::UNCLASSIFIED); - if (ret != 1) { + if (!ret) { BOOST_FAIL("AddField failed: " + std::to_string(ret)); } ret = builder.AddField("field2", "raw2", "interp2", field_type_t::UNCLASSIFIED); - if (ret != 1) { + if (!ret) { BOOST_FAIL("AddField failed: " + std::to_string(ret)); } ret = builder.EndRecord(); - if (ret != 1) { + if (!ret) { BOOST_FAIL("EndRecord failed: " + std::to_string(ret)); } ret = builder.EndEvent(); diff --git a/ExecveConverterTests.cpp b/ExecveConverterTests.cpp index db97a6f..f4651b9 100644 --- a/ExecveConverterTests.cpp +++ b/ExecveConverterTests.cpp @@ -48,7 +48,7 @@ public: return true; } - bool Commit() override { + int Commit() override { Event event(_buffer.data(), _size); std::vector recs; for(auto& rec :event) { @@ -59,7 +59,7 @@ public: _converter.Convert(recs, _cmdline); _cmdlines.emplace_back(_cmdline); _size = 0; - return true; + return 1; } bool Rollback() override { diff --git a/IO.cpp b/IO.cpp index 7ca37a7..945e3ac 100644 --- a/IO.cpp +++ b/IO.cpp @@ -212,7 +212,7 @@ ssize_t IOBase::ReadAll(void *buf, size_t size, const std::function& fn) ssize_t IOBase::DiscardAll(size_t size, const std::function& fn) { - uint8_t buffer[4096]; + uint8_t buffer[1024*32]; size_t nleft = size; do { int fd = _fd.load(); @@ -221,7 +221,7 @@ ssize_t IOBase::DiscardAll(size_t size, const std::function& fn) } errno = 0; size_t n = nleft; - if (nleft < sizeof(buffer)) { + if (n > sizeof(buffer)) { n = sizeof(buffer); } ssize_t nr = read(fd, buffer, n); diff --git a/InputBuffer.h b/InputBuffer.h index 8e84188..c7e68f0 100644 --- a/InputBuffer.h +++ b/InputBuffer.h @@ -69,7 +69,7 @@ public: _cond.notify_all(); return true; } - return _close; + return false; } void Close() { diff --git a/Metrics.cpp b/Metrics.cpp index bc28e89..a3fc012 100644 --- a/Metrics.cpp +++ b/Metrics.cpp @@ -131,7 +131,7 @@ bool Metrics::send_metrics() { if (!_builder->EndRecord()) { return false; } - if (!_builder->EndEvent()) { + if (!_builder->EndEvent() != 0) { return false; } } diff --git a/OperationalStatus.cpp b/OperationalStatus.cpp index 6805593..72080f8 100644 --- a/OperationalStatus.cpp +++ b/OperationalStatus.cpp @@ -214,5 +214,5 @@ bool OperationalStatus::send_status() { if(!_builder.EndRecord()) { return false; } - return _builder.EndEvent(); + return _builder.EndEvent() != 0; } \ No newline at end of file diff --git a/OutputInputTests.cpp b/OutputInputTests.cpp index 3ed1201..45e3542 100644 --- a/OutputInputTests.cpp +++ b/OutputInputTests.cpp @@ -30,20 +30,21 @@ #include "Gate.h" #include "Signals.h" #include "StringUtils.h" +#include "UnixDomainWriter.h" bool BuildEvent(std::shared_ptr& builder, uint64_t sec, uint32_t msec, uint64_t serial, int seq) { - if (builder->BeginEvent(sec, msec, serial, 1) != 1) { + if (!builder->BeginEvent(sec, msec, serial, 1)) { return false; } - if (builder->BeginRecord(1, "TEST", "", 1) != 1) { + if (!builder->BeginRecord(1, "TEST", "", 1)) { builder->CancelEvent(); return false; } - if (builder->AddField("seq", std::to_string(seq), nullptr, field_type_t::UNCLASSIFIED) != 1) { + if (!builder->AddField("seq", std::to_string(seq), nullptr, field_type_t::UNCLASSIFIED)) { builder->CancelEvent(); return false; } - if(builder->EndRecord() != 1) { + if(!builder->EndRecord()) { builder->CancelEvent(); return false; } @@ -515,3 +516,90 @@ BOOST_AUTO_TEST_CASE( dropped_conn_test ) { BOOST_REQUIRE_EQUAL(i, event_seq); } } + +BOOST_AUTO_TEST_CASE( oversized_event_test ) { + TempDir dir("/tmp/OutputInputTests"); + + std::string socket_path = dir.Path() + "/input.socket"; + std::string status_socket_path = dir.Path() + "/status.socket"; + + std::mutex log_mutex; + std::vector log_lines; + Logger::SetLogFunction([&log_mutex,&log_lines](const char* ptr, size_t size){ + std::lock_guard lock(log_mutex); + log_lines.emplace_back(ptr, size); + }); + + Signals::Init(); + Signals::Start(); + + auto operational_status = std::make_shared("", nullptr); + + Inputs inputs(socket_path, operational_status); + if (!inputs.Initialize()) { + BOOST_FAIL("Failed to initialize inputs"); + } + + Gate done_gate; + std::vector _outputs; + + std::thread input_thread([&]() { + Signals::InitThread(); + while (!Signals::IsExit()) { + if (!inputs.HandleData([&_outputs](void* ptr, size_t size) { + _outputs.emplace_back(reinterpret_cast(ptr), size); + })) { + break; + }; + } + done_gate.Open(); + }); + + inputs.Start(); + + UnixDomainWriter udw(socket_path); + + if (!udw.Open()) { + BOOST_FAIL("Failed to open inputs socket"); + } + + std::array _data; + _data.fill(0); + uint32_t header; + header = static_cast(1) << 24; + header |= static_cast(InputBuffer::MAX_DATA_SIZE+1); + reinterpret_cast(_data.data())[0] = header; + + if (dynamic_cast(&udw)->WriteAll(_data.data(), _data.size()) != IO::OK) { + BOOST_FAIL("Failed write data to input socket"); + } + + if (dynamic_cast(&udw)->WriteAll(_data.data(), _data.size()) != IO::OK) { + BOOST_FAIL("Failed write data to input socket"); + } + + if (dynamic_cast(&udw)->WriteAll(_data.data(), _data.size()) != IO::OK) { + BOOST_FAIL("Failed write data to input socket"); + } + + udw.Close(); + + inputs.Stop(); + + if (!done_gate.Wait(Gate::OPEN, 1000)) { + BOOST_FAIL("Time out waiting for inputs thread to exit"); + } + + input_thread.join(); + + int lcnt = 0; + for (auto& msg : log_lines) { + if (msg == "RawEventReader: Message size (262145) in header is too large (> 262144), reading and discarding message contents\n") { + lcnt += 1; + } + } + + if (lcnt != 3) { + BOOST_FAIL("Expected 3 'header it too large' messages"); + } +} diff --git a/PriorityQueue.cpp b/PriorityQueue.cpp index 90ca94f..1dea4f6 100644 --- a/PriorityQueue.cpp +++ b/PriorityQueue.cpp @@ -655,11 +655,15 @@ void PriorityQueue::Close(const std::shared_ptr& cursor_handl _cursor_handles.erase(cursor_handle->_id); } -bool PriorityQueue::Put(uint32_t priority, const void* data, size_t size) { +int PriorityQueue::Put(uint32_t priority, const void* data, size_t size) { std::unique_lock lock(_mutex); + if (size > MAX_ITEM_SIZE) { + return -1; + } + if (_closed) { - return false; + return 0; } if (priority >= _num_priorities) { @@ -692,7 +696,7 @@ bool PriorityQueue::Put(uint32_t priority, const void* data, size_t size) { c->notify(priority, item->Sequence()); } - return true; + return 1; } void PriorityQueue::Save(long save_delay, bool final_save) { diff --git a/PriorityQueue.h b/PriorityQueue.h index 5246b39..b174373 100644 --- a/PriorityQueue.h +++ b/PriorityQueue.h @@ -368,7 +368,8 @@ public: void Commit(const std::shared_ptr& cursor_handle, uint32_t priority, uint64_t seq); void Close(const std::shared_ptr& cursor_handle); - bool Put(uint32_t priority, const void* data, size_t size); + // Return 1 on success, 0 on queue closed, and -1 if item too large + int Put(uint32_t priority, const void* data, size_t size); void Save(long save_delay, bool final_save = false); void Saver(long save_delay); diff --git a/PriorityQueueTests.cpp b/PriorityQueueTests.cpp index 2992db6..33c9197 100644 --- a/PriorityQueueTests.cpp +++ b/PriorityQueueTests.cpp @@ -66,7 +66,7 @@ BOOST_AUTO_TEST_CASE( queue_simple ) { for (uint8_t i = 1; i <= 10; i++) { data[0] = i; - if (!queue->Put(0, data.data(), data.size())) { + if (queue->Put(0, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } @@ -109,6 +109,29 @@ BOOST_AUTO_TEST_CASE( queue_simple ) { BOOST_CHECK_EQUAL(stats._total._bytes_written, 0); } +BOOST_AUTO_TEST_CASE( queue_oversided_item ) { + TempDir dir("/tmp/PriorityQueueTests"); + + auto queue = PriorityQueue::Open(dir.Path(), 8, 1024*1024, 16, 0, 0, 0); + if (!queue) { + BOOST_FAIL("Failed to open queue"); + } + + auto cursor_handle = queue->OpenCursor("test"); + + std::array data; + data.fill(0); + + data[0] = 1; + if (queue->Put(0, data.data(), data.size()) != -1) { + BOOST_FAIL("queue->Put() failed to reject oversided item!"); + } + + if (queue->Put(0, data.data(), data.size()-1) != 1) { + BOOST_FAIL("queue->Put() failed add item!"); + } +} + BOOST_AUTO_TEST_CASE( queue_cursor_rollback ) { TempDir dir("/tmp/PriorityQueueTests"); @@ -124,7 +147,7 @@ BOOST_AUTO_TEST_CASE( queue_cursor_rollback ) { for (uint8_t i = 1; i <= 10; i++) { data[0] = i; - if (!queue->Put(0, data.data(), data.size())) { + if (queue->Put(0, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -199,7 +222,7 @@ BOOST_AUTO_TEST_CASE( queue_simple_multi_cursor ) { for (uint8_t i = 1; i < 6; i++) { data[0] = i; - if (!queue->Put(0, data.data(), data.size())) { + if (queue->Put(0, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -208,7 +231,7 @@ BOOST_AUTO_TEST_CASE( queue_simple_multi_cursor ) { for (uint8_t i = 6; i <= 10; i++) { data[0] = i; - if (!queue->Put(0, data.data(), data.size())) { + if (queue->Put(0, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -300,7 +323,7 @@ BOOST_AUTO_TEST_CASE( queue_simple_multi_cursor_reopen ) { for (uint8_t i = 1; i < 6; i++) { data[0] = i; - if (!queue->Put(0, data.data(), data.size())) { + if (queue->Put(0, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -309,7 +332,7 @@ BOOST_AUTO_TEST_CASE( queue_simple_multi_cursor_reopen ) { for (uint8_t i = 6; i <= 10; i++) { data[0] = i; - if (!queue->Put(0, data.data(), data.size())) { + if (queue->Put(0, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -455,7 +478,7 @@ BOOST_AUTO_TEST_CASE( queue_simple_priority ) { for (auto& in: input_pairs) { data[0] = in.first; - if (!queue->Put(in.second, data.data(), data.size())) { + if (queue->Put(in.second, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -508,7 +531,7 @@ BOOST_AUTO_TEST_CASE( queue_simple_priority2 ) { for (uint8_t i = 0; i < 2; i++) { auto expected = (p*2)+i; data[0] = expected; - if (!queue->Put(p, data.data(), data.size())) { + if (queue->Put(p, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } @@ -749,7 +772,7 @@ BOOST_AUTO_TEST_CASE( queue_max_unsaved_files ) { // This first set of inputs should reach the max mem limit for (auto& in: input_pairs) { data[0] = in.first; - if (!queue->Put(in.second, data.data(), data.size())) { + if (queue->Put(in.second, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -769,7 +792,7 @@ BOOST_AUTO_TEST_CASE( queue_max_unsaved_files ) { // This set of inputs should exceed the max mem limits for (auto& in: input2_pairs) { data[0] = in.first; - if (!queue->Put(in.second, data.data(), data.size())) { + if (queue->Put(in.second, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -1010,7 +1033,7 @@ BOOST_AUTO_TEST_CASE( queue_multi_cursor_fs_loss ) { // This first set of inputs should reach the max mem limit for (auto& in: input_pairs) { data[0] = in.first; - if (!queue->Put(in.second, data.data(), data.size())) { + if (queue->Put(in.second, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -1120,7 +1143,7 @@ BOOST_AUTO_TEST_CASE( queue_multi_cursor_concurrent ) { return c1_ready && c2_ready; }); reinterpret_cast(data.data())[0] = i; - if (!queue->Put(0, data.data(), data.size())) { + if (queue->Put(0, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -1162,7 +1185,7 @@ BOOST_AUTO_TEST_CASE( queue_fs_clean_multi_cursor ) { for (int i = 0; i < num_items; i++) { reinterpret_cast(data.data())[0] = i; - if (!queue->Put(0, data.data(), data.size())) { + if (queue->Put(0, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -1257,7 +1280,7 @@ BOOST_AUTO_TEST_CASE( queue_fs_clean_remove_cursor ) { for (int i = 0; i < num_items; i++) { reinterpret_cast(data.data())[0] = i; - if (!queue->Put(0, data.data(), data.size())) { + if (queue->Put(0, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -1326,7 +1349,7 @@ BOOST_AUTO_TEST_CASE( queue_fs_clean_delete_cursor ) { for (int i = 0; i < num_items; i++) { reinterpret_cast(data.data())[0] = i; - if (!queue->Put(0, data.data(), data.size())) { + if (queue->Put(0, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -1407,7 +1430,7 @@ BOOST_AUTO_TEST_CASE( queue_max_fs_bytes ) { for (int i = 0; i < num_items; i++) { reinterpret_cast(data.data())[0] = i; - if (!queue->Put(0, data.data(), data.size())) { + if (queue->Put(0, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -1467,7 +1490,7 @@ BOOST_AUTO_TEST_CASE( queue_max_fs_pct ) { for (int i = 0; i < num_items; i++) { reinterpret_cast(data.data())[0] = i; - if (!queue->Put(0, data.data(), data.size())) { + if (queue->Put(0, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -1521,7 +1544,7 @@ BOOST_AUTO_TEST_CASE( queue_min_fs_free_pct ) { for (int i = 0; i < num_items; i++) { reinterpret_cast(data.data())[0] = i; - if (!queue->Put(0, data.data(), data.size())) { + if (queue->Put(0, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -1555,7 +1578,7 @@ BOOST_AUTO_TEST_CASE( queue_save_delay ) { for (int i = 0; i < 9; i++) { reinterpret_cast(data.data())[0] = i; - if (!queue->Put(0, data.data(), data.size())) { + if (queue->Put(0, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -1597,7 +1620,7 @@ BOOST_AUTO_TEST_CASE( queue_cursor_commit ) { for (int i = 0; i < 32; i++) { reinterpret_cast(data.data())[0] = i; - if (!queue->Put(0, data.data(), data.size())) { + if (queue->Put(0, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -1755,7 +1778,7 @@ BOOST_AUTO_TEST_CASE( queue_fs_force_clean ) { for (auto& in: input_pairs1) { data[0] = in.first; - if (!queue->Put(in.second, data.data(), data.size())) { + if (queue->Put(in.second, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -1782,7 +1805,7 @@ BOOST_AUTO_TEST_CASE( queue_fs_force_clean ) { for (auto& in: input_pairs2) { data[0] = in.first; - if (!queue->Put(in.second, data.data(), data.size())) { + if (queue->Put(in.second, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -1839,7 +1862,7 @@ BOOST_AUTO_TEST_CASE( queue_empty_cursor_reset ) { for (int i = 0; i < 32; i++) { reinterpret_cast(data.data())[0] = i; - if (!queue->Put(0, data.data(), data.size())) { + if (queue->Put(0, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } @@ -1872,7 +1895,7 @@ BOOST_AUTO_TEST_CASE( queue_empty_cursor_reset ) { for (int i = 0; i < 32; i++) { reinterpret_cast(data.data())[0] = i; - if (!queue->Put(0, data.data(), data.size())) { + if (queue->Put(0, data.data(), data.size()) != 1) { BOOST_FAIL("queue->Put() failed!"); } } diff --git a/RawEventAccumulator.cpp b/RawEventAccumulator.cpp index 67eb932..74e81de 100644 --- a/RawEventAccumulator.cpp +++ b/RawEventAccumulator.cpp @@ -73,16 +73,14 @@ int RawEvent::AddEvent(EventBuilder& builder) { if (_num_dropped_records > 0 && _drop_count.size() > 0) { num_records += 1; } - auto ret = builder.BeginEvent(_event_id.Seconds(), _event_id.Milliseconds(), _event_id.Serial(), num_records); - if (ret != 1) { - return ret; + if (!builder.BeginEvent(_event_id.Seconds(), _event_id.Milliseconds(), _event_id.Serial(), num_records)) { + return 0; } if (_syscall_rec_idx > -1) { - ret = _records[_syscall_rec_idx]->AddRecord(builder); - if (ret != 1) { + if (!_records[_syscall_rec_idx]->AddRecord(builder)) { builder.CancelEvent(); - return ret; + return 0; } _records[_syscall_rec_idx].reset(nullptr); } @@ -91,44 +89,39 @@ int RawEvent::AddEvent(EventBuilder& builder) { if (!rec) { continue; } - ret = rec->AddRecord(builder); - if (ret != 1) { + if (!rec->AddRecord(builder)) { builder.CancelEvent(); - return ret; + return 0; } if (rec->GetRecordType() == RecordType::EXECVE) { for (std::unique_ptr& rec: _execve_records) { - ret = rec->AddRecord(builder); - if (ret != 1) { + if (!rec->AddRecord(builder)) { builder.CancelEvent(); - return ret; + return 0; } } } } if (_num_dropped_records > 0 && _drop_count.size() > 0) { - ret = builder.BeginRecord(static_cast(RecordType::AUOMS_DROPPED_RECORDS), std::string_view(RecordTypeToName(RecordType::AUOMS_DROPPED_RECORDS)), std::string_view(""), static_cast(_drop_count.size())); - if (ret != 1) { + if (!builder.BeginRecord(static_cast(RecordType::AUOMS_DROPPED_RECORDS), std::string_view(RecordTypeToName(RecordType::AUOMS_DROPPED_RECORDS)), std::string_view(""), static_cast(_drop_count.size()))) { builder.CancelEvent(); - return ret; + return 0; } for (auto& e: _drop_count) { - ret = builder.AddField(RecordTypeToName(e.first), std::to_string(e.second), "", field_type_t::UNCLASSIFIED); - if (ret != 1) { + if (!builder.AddField(RecordTypeToName(e.first), std::to_string(e.second), "", field_type_t::UNCLASSIFIED)) { builder.CancelEvent(); - return ret; + return 0; } } - ret = builder.EndRecord(); - if (ret != 1) { + if (!builder.EndRecord()) { builder.CancelEvent(); - return ret; + return 0; } } return builder.EndEvent(); } -int RawEventAccumulator::AddRecord(std::unique_ptr record) { +bool RawEventAccumulator::AddRecord(std::unique_ptr record) { std::lock_guard lock(_mutex); _bytes_metric->Update(static_cast(record->GetSize())); @@ -136,14 +129,15 @@ int RawEventAccumulator::AddRecord(std::unique_ptr record) { // Drop empty records unless it is the EOE record. if (record->IsEmpty() && record->GetRecordType() != RecordType::EOE) { - return 0; + return false; } auto event_id = record->GetEventId(); - int ret = 0; - auto found = _events.on(event_id, [this,&record,&ret](size_t entry_count, const std::chrono::steady_clock::time_point& last_touched, std::shared_ptr& event) { + auto found = _events.on(event_id, [this,&record](size_t entry_count, const std::chrono::steady_clock::time_point& last_touched, std::shared_ptr& event) { if (event->AddRecord(std::move(record))) { - ret = event->AddEvent(*_builder); + if (event->AddEvent(*_builder) == -1) { + _dropped_event_metric->Update(1.0); + } return CacheEntryOP::REMOVE; } else { return CacheEntryOP::TOUCH; @@ -153,7 +147,10 @@ int RawEventAccumulator::AddRecord(std::unique_ptr record) { auto event = std::make_shared(record->GetEventId()); if (event->AddRecord(std::move(record))) { _event_metric->Update(1.0); - return event->AddEvent(*_builder); + if (event->AddEvent(*_builder) == -1) { + _dropped_event_metric->Update(1.0); + } + return true; } else { _events.add(event_id, event); } @@ -161,13 +158,15 @@ int RawEventAccumulator::AddRecord(std::unique_ptr record) { // Don't wait for Flush to be called, preemptively flush oldest if the cache size limit is exceeded _events.for_all_oldest_first([this](size_t entry_count, const std::chrono::steady_clock::time_point& last_touched, const EventId& key, std::shared_ptr& event) { if (entry_count > MAX_CACHE_ENTRY) { - event->AddEvent(*_builder); + if (event->AddEvent(*_builder) == -1) { + _dropped_event_metric->Update(1.0); + } _event_metric->Update(1.0); return CacheEntryOP::REMOVE; } return CacheEntryOP::STOP; }); - return 1; + return true; } void RawEventAccumulator::Flush(long milliseconds) { @@ -177,7 +176,9 @@ void RawEventAccumulator::Flush(long milliseconds) { _events.for_all_oldest_first([this,now,milliseconds](size_t entry_count, const std::chrono::steady_clock::time_point& last_touched, const EventId& key, std::shared_ptr& event) { if (entry_count > MAX_CACHE_ENTRY || std::chrono::duration_cast(now.time_since_epoch()-last_touched.time_since_epoch()) > std::chrono::milliseconds(milliseconds)) { - event->AddEvent(*_builder); + if (event->AddEvent(*_builder) == -1) { + _dropped_event_metric->Update(1.0); + } _event_metric->Update(1.0); return CacheEntryOP::REMOVE; } @@ -185,7 +186,9 @@ void RawEventAccumulator::Flush(long milliseconds) { }); } else { _events.for_all_oldest_first([this](size_t entry_count, const std::chrono::steady_clock::time_point& last_touched, const EventId& key, std::shared_ptr& event) { - event->AddEvent(*_builder); + if (event->AddEvent(*_builder) == -1) { + _dropped_event_metric->Update(1.0); + } _event_metric->Update(1.0); return CacheEntryOP::REMOVE; }); diff --git a/RawEventAccumulator.h b/RawEventAccumulator.h index 77a497e..6a430c1 100644 --- a/RawEventAccumulator.h +++ b/RawEventAccumulator.h @@ -58,9 +58,10 @@ public: _bytes_metric = _metrics->AddMetric(MetricType::METRIC_BY_ACCUMULATION, "raw_data", "bytes", MetricPeriod::SECOND, MetricPeriod::HOUR); _record_metric = _metrics->AddMetric(MetricType::METRIC_BY_ACCUMULATION, "raw_data", "records", MetricPeriod::SECOND, MetricPeriod::HOUR); _event_metric = _metrics->AddMetric(MetricType::METRIC_BY_ACCUMULATION, "raw_data", "events", MetricPeriod::SECOND, MetricPeriod::HOUR); + _dropped_event_metric = _metrics->AddMetric(MetricType::METRIC_BY_ACCUMULATION, "raw_data", "dropped_events", MetricPeriod::SECOND, MetricPeriod::HOUR); } - int AddRecord(std::unique_ptr record); + bool AddRecord(std::unique_ptr record); void Flush(long milliseconds); private: @@ -71,6 +72,7 @@ private: std::shared_ptr _bytes_metric; std::shared_ptr _record_metric; std::shared_ptr _event_metric; + std::shared_ptr _dropped_event_metric; Cache> _events; }; diff --git a/RawEventRecord.cpp b/RawEventRecord.cpp index 10846bb..2cde37a 100644 --- a/RawEventRecord.cpp +++ b/RawEventRecord.cpp @@ -161,7 +161,7 @@ bool RawEventRecord::Parse(RecordType record_type, size_t size) { return false; } -int RawEventRecord::AddRecord(EventBuilder& builder) { +bool RawEventRecord::AddRecord(EventBuilder& builder) { static auto SV_NODE = "node"sv; static auto SV_UNPARSED_TEXT = "unparsed_text"sv; @@ -170,36 +170,34 @@ int RawEventRecord::AddRecord(EventBuilder& builder) { num_fields++; } - auto ret = builder.BeginRecord(static_cast(_record_type), _type_name, std::string_view(_data.data(), _size), num_fields); - if (ret != 1) { - return ret; + if (!builder.BeginRecord(static_cast(_record_type), _type_name, std::string_view(_data.data(), _size), num_fields)) { + return false; } if (!_node.empty()) { - ret = builder.AddField(SV_NODE, _node, nullptr, field_type_t::UNCLASSIFIED); - if (ret != 1) { - return ret; + if (!builder.AddField(SV_NODE, _node, nullptr, field_type_t::UNCLASSIFIED)) { + return false; } } // If record is marked as unparsable, then the text (after the 'audit():' section is included as the only value in // _record_fields if (_unparsable) { - ret = builder.AddField(SV_UNPARSED_TEXT, _record_fields[0], nullptr, field_type_t::UNESCAPED); - if (ret != 1) { - return ret; + if (!builder.AddField(SV_UNPARSED_TEXT, _record_fields[0], nullptr, field_type_t::UNESCAPED)) { + return false; } return builder.EndRecord(); } for (auto f: _record_fields) { auto idx = f.find_first_of('='); + bool ret; if (idx == std::string_view::npos) { ret = builder.AddField(f, std::string_view(), nullptr, field_type_t::UNCLASSIFIED); } else { ret = builder.AddField(f.substr(0, idx), f.substr(idx + 1), nullptr, field_type_t::UNCLASSIFIED); } - if (ret != 1) { + if (!ret) { return ret; } } diff --git a/RawEventRecord.h b/RawEventRecord.h index f26ea22..21e4e4a 100644 --- a/RawEventRecord.h +++ b/RawEventRecord.h @@ -34,7 +34,7 @@ public: inline char* Data() { return _data.data(); }; bool Parse(RecordType record_type, size_t size); - int AddRecord(EventBuilder& builder); + bool AddRecord(EventBuilder& builder); inline EventId GetEventId() { return _event_id; } inline RecordType GetRecordType() { return _record_type; } diff --git a/TestEventQueue.h b/TestEventQueue.h index c2558ba..51050fd 100644 --- a/TestEventQueue.h +++ b/TestEventQueue.h @@ -30,9 +30,9 @@ public: return true; } - virtual bool Commit() { + virtual int Commit() { _events.emplace_back(std::make_shared>(_buffer.begin(), _buffer.end())); - return true; + return 1; } virtual bool Rollback() { diff --git a/auoms.cpp b/auoms.cpp index 21bcc36..cad4faf 100644 --- a/auoms.cpp +++ b/auoms.cpp @@ -99,7 +99,7 @@ int main(int argc, char**argv) { bool netlink_only = false; int opt; - while ((opt = getopt(argc, argv, "nc:")) != -1) { + while ((opt = getopt(argc, argv, "c:ns")) != -1) { switch (opt) { case 'c': config_file = optarg; @@ -339,12 +339,18 @@ int main(int argc, char**argv) { // systemd may not have put auoms into the default cgroup at this point // Wait a few seconds before moving into the right cgroup so we avoid getting moved back out by systemd std::thread cg_thread([&cgcpu]() { - sleep(5); - try { - cgcpu->AddSelf(); - } catch (const std::exception &ex) { - Logger::Error("Failed to configure cpu cgroup: %s", ex.what()); - Logger::Warn("CPU Limits cannot be enforced"); + int sleep_time = 5; + // Loop forever to make sure we stay in our cgroup + while (!Signals::IsExit()) { + sleep(sleep_time); + sleep_time = 60; + try { + cgcpu->AddSelf(); + } catch (const std::exception &ex) { + Logger::Error("Failed to configure cpu cgroup: %s", ex.what()); + Logger::Warn("CPU Limits cannot be enforced"); + return; + } } }); cg_thread.detach(); diff --git a/auoms.version b/auoms.version index 66fde98..ec76df7 100755 --- a/auoms.version +++ b/auoms.version @@ -7,7 +7,7 @@ AUOMS_BUILDVERSION_MAJOR=2 AUOMS_BUILDVERSION_MINOR=3 -AUOMS_BUILDVERSION_PATCH=0 +AUOMS_BUILDVERSION_PATCH=1 AUOMS_BUILDVERSION_BUILDNR=0 -AUOMS_BUILDVERSION_DATE=20200903 +AUOMS_BUILDVERSION_DATE=20201113 AUOMS_BUILDVERSION_STATUS=Developer_Build diff --git a/auomscollect.cpp b/auomscollect.cpp index ac09ad3..8db657b 100644 --- a/auomscollect.cpp +++ b/auomscollect.cpp @@ -511,6 +511,7 @@ int main(int argc, char**argv) { } Logger::Info("Acquire singleton lock"); + std::atomic_long ingest_thread_id(0); std::shared_ptr cgcpu_root; std::shared_ptr cgcpu; if (!disable_cgroups) { @@ -519,13 +520,29 @@ int main(int argc, char**argv) { cgcpu = CPULimits::CGFromConfig(config, "auomscollect"); // systemd may not have put auomscollect into the default cgroup at this point // Wait a few seconds before moving into the right cgroup so we avoid getting moved back out by systemd - std::thread cg_thread([&cgcpu]() { - sleep(5); - try { - cgcpu->AddSelf(); - } catch (const std::exception &ex) { - Logger::Error("Failed to configure cpu cgroup: %s", ex.what()); - Logger::Warn("CPU Limits cannot be enforced"); + std::thread cg_thread([&cgcpu_root,&cgcpu,&ingest_thread_id]() { + int sleep_time = 10; + // Loop forever to make sure we stay in our cgroup + while (!Signals::IsExit()) { + sleep(sleep_time); + sleep_time = 60; + try { + cgcpu->AddSelf(); + } catch (const std::exception &ex) { + Logger::Error("Failed to configure cpu cgroup: %s", ex.what()); + Logger::Warn("CPU Limits cannot be enforced"); + return; + } + long tid = ingest_thread_id.load(); + if (tid != 0) { + try { + cgcpu_root->AddThread(tid); + } catch (std::runtime_error &ex) { + Logger::Error("Failed to move ingest thread to root cgroup: %s", ex.what()); + // Set the id back to 0 so we don't keep trying. + ingest_thread_id.store(0); + } + } } }); cg_thread.detach(); @@ -622,18 +639,7 @@ int main(int argc, char**argv) { std::thread ingest_thread([&]() { auto thread_id = CGroups::GetSelfThreadId(); Logger::Info("Starting ingest thead (%ld)", thread_id); - // Move this thread back to the root cgroup (thus outside the auomscollect specific cgroup - if (!disable_cgroups && cgcpu_root) { - std::thread cg_thread([&cgcpu_root, thread_id]() { - sleep(10); - try { - cgcpu_root->AddThread(thread_id); - } catch (std::runtime_error &ex) { - Logger::Error("Failed to move ingest thread to root cgroup: %s", ex.what()); - } - }); - cg_thread.detach(); - } + ingest_thread_id.store(thread_id); if (netlink_mode) { bool restart; do { diff --git a/installer/auoms.service b/installer/auoms.service index eae0258..fd47b0d 100644 --- a/installer/auoms.service +++ b/installer/auoms.service @@ -11,6 +11,8 @@ KillMode=process ExecStart=/etc/init.d/auoms start ExecStop=/etc/init.d/auoms stop ExecReload=/etc/init.d/auoms reload +CPUWeight=5 +CPUQuota=25% [Install] WantedBy=multi-user.target