Fix IO:DiscardAll bug and cgroup limits

- Fixed bug in IO::DiscardAll that could lead to buffer overrun
- Added unit test
- Fixed PriorityQueue::Put to enforce item size limit
- Modified auoms.service to include CPU limits
- Modified so that auoms/auomscollect will continue to monitor
  and re-apply cgroup membership
- Cleaned up some return value handling
This commit is contained in:
Tad Glines 2020-11-14 14:25:40 -08:00
Родитель 2c23438b5d
Коммит 066448c1c4
25 изменённых файлов: 296 добавлений и 152 удалений

Просмотреть файл

@ -48,34 +48,41 @@ uint64_t ReadUint64(const std::string& path) {
}
void CGroupCPU::AddSelf() {
AppendUint64(_dir + CGROUP_PROCS_FILE, 0);
auto self_pid = getpid();
auto pids = GetProcs();
if (pids.count(self_pid) == 0) {
AppendUint64(_dir + CGROUP_PROCS_FILE, 0);
}
}
void CGroupCPU::AddSelfThread() {
auto tid = syscall(SYS_gettid);
AppendUint64(_dir + CGROUP_TASKS_FILE, tid);
auto tid = CGroups::GetSelfThreadId();
AddThread(tid);
}
void CGroupCPU::AddThread(long tid) {
AppendUint64(_dir + CGROUP_TASKS_FILE, tid);
auto tids = GetTasks();
if (tids.count(tid) == 0) {
AppendUint64(_dir + CGROUP_TASKS_FILE, tid);
}
}
std::vector<uint64_t> CGroupCPU::GetProcs() {
std::unordered_set<uint64_t> CGroupCPU::GetProcs() {
auto lines = ReadFile(_dir + CGROUP_PROCS_FILE);
std::vector<uint64_t> pids;
std::unordered_set<uint64_t> pids;
pids.reserve(lines.size());
for (auto& line : lines) {
pids.emplace_back(stoll(line));
pids.emplace(stoll(line));
}
return pids;
}
std::vector<uint64_t> CGroupCPU::GetTasks() {
std::unordered_set<uint64_t> CGroupCPU::GetTasks() {
auto lines = ReadFile(_dir + CGROUP_TASKS_FILE);
std::vector<uint64_t> tids;
std::unordered_set<uint64_t> tids;
tids.reserve(lines.size());
for (auto& line : lines) {
tids.emplace_back(stoll(line));
tids.emplace(stoll(line));
}
return tids;
}

Просмотреть файл

@ -17,7 +17,7 @@
#ifndef AUOMS_CGROUPS_H
#define AUOMS_CGROUPS_H
#include <vector>
#include <unordered_set>
#include <memory>
class CGroupCPU {
@ -28,8 +28,8 @@ public:
void AddSelfThread();
void AddThread(long tid);
std::vector<uint64_t> GetProcs();
std::vector<uint64_t> GetTasks();
std::unordered_set<uint64_t> GetProcs();
std::unordered_set<uint64_t> GetTasks();
uint64_t GetShares();
void SetShares(uint64_t val);

Просмотреть файл

@ -328,7 +328,7 @@ int32_t EventBuilder::GetEventPid() {
return EVENT_PID(_data);
}
bool EventBuilder::EndEvent() {
int EventBuilder::EndEvent() {
if (_data == nullptr) {
throw std::runtime_error("Event not started!");
}

Просмотреть файл

@ -239,8 +239,11 @@ private:
class IEventBuilderAllocator {
public:
// Return true on success, false if closed
virtual bool Allocate(void** data, size_t size) = 0;
virtual bool Commit() = 0;
// Return 1 on success, 0 on closed, and -1 if item too large
virtual int Commit() = 0;
// Return true on success, false if closed
virtual bool Rollback() = 0;
};
@ -279,7 +282,7 @@ public:
uint16_t GetEventFlags();
void SetEventPid(int32_t pid);
int32_t GetEventPid();
bool EndEvent();
int EndEvent();
bool CancelEvent();
bool BeginRecord(uint32_t record_type, const char* record_name, const char* record_text, uint16_t num_fields);
bool BeginRecord(uint32_t record_type, const std::string_view& record_name, const std::string_view& record_text, uint16_t num_fields);

Просмотреть файл

@ -59,10 +59,10 @@ public:
return true;
}
bool Commit() override {
int Commit() override {
_proc->ProcessData(_buffer.data(), _size);
_size = 0;
return true;
return 1;
}
bool Rollback() override {

Просмотреть файл

@ -34,7 +34,8 @@ public:
return true;
}
bool Commit() override {
// Return 1 on success, 0 on queue closed, and -1 if item was too large
int Commit() override {
Event event(_buffer.data(), _size);
auto ret = _queue->Put(event.Priority(), _buffer.data(), _size);
_size = 0;

Просмотреть файл

@ -35,44 +35,44 @@ BOOST_AUTO_TEST_CASE( test )
EventBuilder builder(event_queue, DefaultPrioritizer::Create(0));
int ret = builder.BeginEvent(1, 3, 4, 2);
if (ret != 1) {
bool ret = builder.BeginEvent(1, 3, 4, 2);
if (!ret) {
BOOST_FAIL("BeginEvent failed: " + std::to_string(ret));
}
ret = builder.BeginRecord(1, "test1", "raw record text1", 3);
if (ret != 1) {
if (!ret) {
BOOST_FAIL("BeginRecord failed: " + std::to_string(ret));
}
ret = builder.AddField("field1", "raw1", "interp1", field_type_t::UNCLASSIFIED);
if (ret != 1) {
if (!ret) {
BOOST_FAIL("AddField failed: " + std::to_string(ret));
}
ret = builder.AddField("field2", "2", "user2", field_type_t::UID);
if (ret != 1) {
if (!ret) {
BOOST_FAIL("AddField failed: " + std::to_string(ret));
}
ret = builder.AddField("field3", "raw3", "interp3", field_type_t::UNCLASSIFIED);
if (ret != 1) {
if (!ret) {
BOOST_FAIL("AddField failed: " + std::to_string(ret));
}
ret = builder.EndRecord();
if (ret != 1) {
if (!ret) {
BOOST_FAIL("EndRecord failed: " + std::to_string(ret));
}
ret = builder.BeginRecord(2, "test2", "raw record text2", 2);
if (ret != 1) {
if (!ret) {
BOOST_FAIL("BeginRecord failed: " + std::to_string(ret));
}
ret = builder.AddField("field1", "raw1", nullptr, field_type_t::UNCLASSIFIED);
if (ret != 1) {
if (!ret) {
BOOST_FAIL("AddField failed: " + std::to_string(ret));
}
ret = builder.AddField("field2", "raw2", "interp2", field_type_t::UNCLASSIFIED);
if (ret != 1) {
if (!ret) {
BOOST_FAIL("AddField failed: " + std::to_string(ret));
}
ret = builder.EndRecord();
if (ret != 1) {
if (!ret) {
BOOST_FAIL("EndRecord failed: " + std::to_string(ret));
}
builder.SetEventFlags(5);
@ -180,39 +180,39 @@ BOOST_AUTO_TEST_CASE( test )
BOOST_CHECK_EQUAL(rec, event.end());
ret = builder.BeginEvent(1, 3, 4, 1);
if (ret != 1) {
if (!ret) {
BOOST_FAIL("BeginEvent failed: " + std::to_string(ret));
}
ret = builder.BeginRecord(1, "test1", "raw text", 6);
if (ret != 1) {
if (!ret) {
BOOST_FAIL("BeginRecord failed: " + std::to_string(ret));
}
ret = builder.AddField("field3", "raw3", "interp3", field_type_t::UNCLASSIFIED);
if (ret != 1) {
if (!ret) {
BOOST_FAIL("AddField failed: " + std::to_string(ret));
}
ret = builder.AddField("field6", "raw6", "interp6", field_type_t::UNCLASSIFIED);
if (ret != 1) {
if (!ret) {
BOOST_FAIL("AddField failed: " + std::to_string(ret));
}
ret = builder.AddField("field1", "raw1", "interp1", field_type_t::UNCLASSIFIED);
if (ret != 1) {
if (!ret) {
BOOST_FAIL("AddField failed: " + std::to_string(ret));
}
ret = builder.AddField("field4", "raw4", "interp4", field_type_t::UNCLASSIFIED);
if (ret != 1) {
if (!ret) {
BOOST_FAIL("AddField failed: " + std::to_string(ret));
}
ret = builder.AddField("field5", "raw5", "interp5", field_type_t::UNCLASSIFIED);
if (ret != 1) {
if (!ret) {
BOOST_FAIL("AddField failed: " + std::to_string(ret));
}
ret = builder.AddField("field2", "raw2", "interp2", field_type_t::UNCLASSIFIED);
if (ret != 1) {
if (!ret) {
BOOST_FAIL("AddField failed: " + std::to_string(ret));
}
ret = builder.EndRecord();
if (ret != 1) {
if (!ret) {
BOOST_FAIL("EndRecord failed: " + std::to_string(ret));
}
ret = builder.EndEvent();

Просмотреть файл

@ -48,7 +48,7 @@ public:
return true;
}
bool Commit() override {
int Commit() override {
Event event(_buffer.data(), _size);
std::vector<EventRecord> recs;
for(auto& rec :event) {
@ -59,7 +59,7 @@ public:
_converter.Convert(recs, _cmdline);
_cmdlines.emplace_back(_cmdline);
_size = 0;
return true;
return 1;
}
bool Rollback() override {

4
IO.cpp
Просмотреть файл

@ -212,7 +212,7 @@ ssize_t IOBase::ReadAll(void *buf, size_t size, const std::function<bool()>& fn)
ssize_t IOBase::DiscardAll(size_t size, const std::function<bool()>& fn)
{
uint8_t buffer[4096];
uint8_t buffer[1024*32];
size_t nleft = size;
do {
int fd = _fd.load();
@ -221,7 +221,7 @@ ssize_t IOBase::DiscardAll(size_t size, const std::function<bool()>& fn)
}
errno = 0;
size_t n = nleft;
if (nleft < sizeof(buffer)) {
if (n > sizeof(buffer)) {
n = sizeof(buffer);
}
ssize_t nr = read(fd, buffer, n);

Просмотреть файл

@ -69,7 +69,7 @@ public:
_cond.notify_all();
return true;
}
return _close;
return false;
}
void Close() {

Просмотреть файл

@ -131,7 +131,7 @@ bool Metrics::send_metrics() {
if (!_builder->EndRecord()) {
return false;
}
if (!_builder->EndEvent()) {
if (!_builder->EndEvent() != 0) {
return false;
}
}

Просмотреть файл

@ -214,5 +214,5 @@ bool OperationalStatus::send_status() {
if(!_builder.EndRecord()) {
return false;
}
return _builder.EndEvent();
return _builder.EndEvent() != 0;
}

Просмотреть файл

@ -30,20 +30,21 @@
#include "Gate.h"
#include "Signals.h"
#include "StringUtils.h"
#include "UnixDomainWriter.h"
bool BuildEvent(std::shared_ptr<EventBuilder>& builder, uint64_t sec, uint32_t msec, uint64_t serial, int seq) {
if (builder->BeginEvent(sec, msec, serial, 1) != 1) {
if (!builder->BeginEvent(sec, msec, serial, 1)) {
return false;
}
if (builder->BeginRecord(1, "TEST", "", 1) != 1) {
if (!builder->BeginRecord(1, "TEST", "", 1)) {
builder->CancelEvent();
return false;
}
if (builder->AddField("seq", std::to_string(seq), nullptr, field_type_t::UNCLASSIFIED) != 1) {
if (!builder->AddField("seq", std::to_string(seq), nullptr, field_type_t::UNCLASSIFIED)) {
builder->CancelEvent();
return false;
}
if(builder->EndRecord() != 1) {
if(!builder->EndRecord()) {
builder->CancelEvent();
return false;
}
@ -515,3 +516,90 @@ BOOST_AUTO_TEST_CASE( dropped_conn_test ) {
BOOST_REQUIRE_EQUAL(i, event_seq);
}
}
BOOST_AUTO_TEST_CASE( oversized_event_test ) {
TempDir dir("/tmp/OutputInputTests");
std::string socket_path = dir.Path() + "/input.socket";
std::string status_socket_path = dir.Path() + "/status.socket";
std::mutex log_mutex;
std::vector<std::string> log_lines;
Logger::SetLogFunction([&log_mutex,&log_lines](const char* ptr, size_t size){
std::lock_guard<std::mutex> lock(log_mutex);
log_lines.emplace_back(ptr, size);
});
Signals::Init();
Signals::Start();
auto operational_status = std::make_shared<OperationalStatus>("", nullptr);
Inputs inputs(socket_path, operational_status);
if (!inputs.Initialize()) {
BOOST_FAIL("Failed to initialize inputs");
}
Gate done_gate;
std::vector<std::string> _outputs;
std::thread input_thread([&]() {
Signals::InitThread();
while (!Signals::IsExit()) {
if (!inputs.HandleData([&_outputs](void* ptr, size_t size) {
_outputs.emplace_back(reinterpret_cast<char*>(ptr), size);
})) {
break;
};
}
done_gate.Open();
});
inputs.Start();
UnixDomainWriter udw(socket_path);
if (!udw.Open()) {
BOOST_FAIL("Failed to open inputs socket");
}
std::array<uint8_t, InputBuffer::MAX_DATA_SIZE+1> _data;
_data.fill(0);
uint32_t header;
header = static_cast<uint32_t>(1) << 24;
header |= static_cast<uint32_t>(InputBuffer::MAX_DATA_SIZE+1);
reinterpret_cast<uint32_t*>(_data.data())[0] = header;
if (dynamic_cast<IWriter*>(&udw)->WriteAll(_data.data(), _data.size()) != IO::OK) {
BOOST_FAIL("Failed write data to input socket");
}
if (dynamic_cast<IWriter*>(&udw)->WriteAll(_data.data(), _data.size()) != IO::OK) {
BOOST_FAIL("Failed write data to input socket");
}
if (dynamic_cast<IWriter*>(&udw)->WriteAll(_data.data(), _data.size()) != IO::OK) {
BOOST_FAIL("Failed write data to input socket");
}
udw.Close();
inputs.Stop();
if (!done_gate.Wait(Gate::OPEN, 1000)) {
BOOST_FAIL("Time out waiting for inputs thread to exit");
}
input_thread.join();
int lcnt = 0;
for (auto& msg : log_lines) {
if (msg == "RawEventReader: Message size (262145) in header is too large (> 262144), reading and discarding message contents\n") {
lcnt += 1;
}
}
if (lcnt != 3) {
BOOST_FAIL("Expected 3 'header it too large' messages");
}
}

Просмотреть файл

@ -655,11 +655,15 @@ void PriorityQueue::Close(const std::shared_ptr<QueueCursorHandle>& cursor_handl
_cursor_handles.erase(cursor_handle->_id);
}
bool PriorityQueue::Put(uint32_t priority, const void* data, size_t size) {
int PriorityQueue::Put(uint32_t priority, const void* data, size_t size) {
std::unique_lock<std::mutex> lock(_mutex);
if (size > MAX_ITEM_SIZE) {
return -1;
}
if (_closed) {
return false;
return 0;
}
if (priority >= _num_priorities) {
@ -692,7 +696,7 @@ bool PriorityQueue::Put(uint32_t priority, const void* data, size_t size) {
c->notify(priority, item->Sequence());
}
return true;
return 1;
}
void PriorityQueue::Save(long save_delay, bool final_save) {

Просмотреть файл

@ -368,7 +368,8 @@ public:
void Commit(const std::shared_ptr<QueueCursorHandle>& cursor_handle, uint32_t priority, uint64_t seq);
void Close(const std::shared_ptr<QueueCursorHandle>& cursor_handle);
bool Put(uint32_t priority, const void* data, size_t size);
// Return 1 on success, 0 on queue closed, and -1 if item too large
int Put(uint32_t priority, const void* data, size_t size);
void Save(long save_delay, bool final_save = false);
void Saver(long save_delay);

Просмотреть файл

@ -66,7 +66,7 @@ BOOST_AUTO_TEST_CASE( queue_simple ) {
for (uint8_t i = 1; i <= 10; i++) {
data[0] = i;
if (!queue->Put(0, data.data(), data.size())) {
if (queue->Put(0, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
@ -109,6 +109,29 @@ BOOST_AUTO_TEST_CASE( queue_simple ) {
BOOST_CHECK_EQUAL(stats._total._bytes_written, 0);
}
BOOST_AUTO_TEST_CASE( queue_oversided_item ) {
TempDir dir("/tmp/PriorityQueueTests");
auto queue = PriorityQueue::Open(dir.Path(), 8, 1024*1024, 16, 0, 0, 0);
if (!queue) {
BOOST_FAIL("Failed to open queue");
}
auto cursor_handle = queue->OpenCursor("test");
std::array<uint8_t, PriorityQueue::MAX_ITEM_SIZE+1> data;
data.fill(0);
data[0] = 1;
if (queue->Put(0, data.data(), data.size()) != -1) {
BOOST_FAIL("queue->Put() failed to reject oversided item!");
}
if (queue->Put(0, data.data(), data.size()-1) != 1) {
BOOST_FAIL("queue->Put() failed add item!");
}
}
BOOST_AUTO_TEST_CASE( queue_cursor_rollback ) {
TempDir dir("/tmp/PriorityQueueTests");
@ -124,7 +147,7 @@ BOOST_AUTO_TEST_CASE( queue_cursor_rollback ) {
for (uint8_t i = 1; i <= 10; i++) {
data[0] = i;
if (!queue->Put(0, data.data(), data.size())) {
if (queue->Put(0, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -199,7 +222,7 @@ BOOST_AUTO_TEST_CASE( queue_simple_multi_cursor ) {
for (uint8_t i = 1; i < 6; i++) {
data[0] = i;
if (!queue->Put(0, data.data(), data.size())) {
if (queue->Put(0, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -208,7 +231,7 @@ BOOST_AUTO_TEST_CASE( queue_simple_multi_cursor ) {
for (uint8_t i = 6; i <= 10; i++) {
data[0] = i;
if (!queue->Put(0, data.data(), data.size())) {
if (queue->Put(0, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -300,7 +323,7 @@ BOOST_AUTO_TEST_CASE( queue_simple_multi_cursor_reopen ) {
for (uint8_t i = 1; i < 6; i++) {
data[0] = i;
if (!queue->Put(0, data.data(), data.size())) {
if (queue->Put(0, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -309,7 +332,7 @@ BOOST_AUTO_TEST_CASE( queue_simple_multi_cursor_reopen ) {
for (uint8_t i = 6; i <= 10; i++) {
data[0] = i;
if (!queue->Put(0, data.data(), data.size())) {
if (queue->Put(0, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -455,7 +478,7 @@ BOOST_AUTO_TEST_CASE( queue_simple_priority ) {
for (auto& in: input_pairs) {
data[0] = in.first;
if (!queue->Put(in.second, data.data(), data.size())) {
if (queue->Put(in.second, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -508,7 +531,7 @@ BOOST_AUTO_TEST_CASE( queue_simple_priority2 ) {
for (uint8_t i = 0; i < 2; i++) {
auto expected = (p*2)+i;
data[0] = expected;
if (!queue->Put(p, data.data(), data.size())) {
if (queue->Put(p, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
@ -749,7 +772,7 @@ BOOST_AUTO_TEST_CASE( queue_max_unsaved_files ) {
// This first set of inputs should reach the max mem limit
for (auto& in: input_pairs) {
data[0] = in.first;
if (!queue->Put(in.second, data.data(), data.size())) {
if (queue->Put(in.second, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -769,7 +792,7 @@ BOOST_AUTO_TEST_CASE( queue_max_unsaved_files ) {
// This set of inputs should exceed the max mem limits
for (auto& in: input2_pairs) {
data[0] = in.first;
if (!queue->Put(in.second, data.data(), data.size())) {
if (queue->Put(in.second, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -1010,7 +1033,7 @@ BOOST_AUTO_TEST_CASE( queue_multi_cursor_fs_loss ) {
// This first set of inputs should reach the max mem limit
for (auto& in: input_pairs) {
data[0] = in.first;
if (!queue->Put(in.second, data.data(), data.size())) {
if (queue->Put(in.second, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -1120,7 +1143,7 @@ BOOST_AUTO_TEST_CASE( queue_multi_cursor_concurrent ) {
return c1_ready && c2_ready;
});
reinterpret_cast<int*>(data.data())[0] = i;
if (!queue->Put(0, data.data(), data.size())) {
if (queue->Put(0, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -1162,7 +1185,7 @@ BOOST_AUTO_TEST_CASE( queue_fs_clean_multi_cursor ) {
for (int i = 0; i < num_items; i++) {
reinterpret_cast<int*>(data.data())[0] = i;
if (!queue->Put(0, data.data(), data.size())) {
if (queue->Put(0, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -1257,7 +1280,7 @@ BOOST_AUTO_TEST_CASE( queue_fs_clean_remove_cursor ) {
for (int i = 0; i < num_items; i++) {
reinterpret_cast<int*>(data.data())[0] = i;
if (!queue->Put(0, data.data(), data.size())) {
if (queue->Put(0, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -1326,7 +1349,7 @@ BOOST_AUTO_TEST_CASE( queue_fs_clean_delete_cursor ) {
for (int i = 0; i < num_items; i++) {
reinterpret_cast<int *>(data.data())[0] = i;
if (!queue->Put(0, data.data(), data.size())) {
if (queue->Put(0, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -1407,7 +1430,7 @@ BOOST_AUTO_TEST_CASE( queue_max_fs_bytes ) {
for (int i = 0; i < num_items; i++) {
reinterpret_cast<int*>(data.data())[0] = i;
if (!queue->Put(0, data.data(), data.size())) {
if (queue->Put(0, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -1467,7 +1490,7 @@ BOOST_AUTO_TEST_CASE( queue_max_fs_pct ) {
for (int i = 0; i < num_items; i++) {
reinterpret_cast<int*>(data.data())[0] = i;
if (!queue->Put(0, data.data(), data.size())) {
if (queue->Put(0, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -1521,7 +1544,7 @@ BOOST_AUTO_TEST_CASE( queue_min_fs_free_pct ) {
for (int i = 0; i < num_items; i++) {
reinterpret_cast<int*>(data.data())[0] = i;
if (!queue->Put(0, data.data(), data.size())) {
if (queue->Put(0, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -1555,7 +1578,7 @@ BOOST_AUTO_TEST_CASE( queue_save_delay ) {
for (int i = 0; i < 9; i++) {
reinterpret_cast<int*>(data.data())[0] = i;
if (!queue->Put(0, data.data(), data.size())) {
if (queue->Put(0, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -1597,7 +1620,7 @@ BOOST_AUTO_TEST_CASE( queue_cursor_commit ) {
for (int i = 0; i < 32; i++) {
reinterpret_cast<int *>(data.data())[0] = i;
if (!queue->Put(0, data.data(), data.size())) {
if (queue->Put(0, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -1755,7 +1778,7 @@ BOOST_AUTO_TEST_CASE( queue_fs_force_clean ) {
for (auto& in: input_pairs1) {
data[0] = in.first;
if (!queue->Put(in.second, data.data(), data.size())) {
if (queue->Put(in.second, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -1782,7 +1805,7 @@ BOOST_AUTO_TEST_CASE( queue_fs_force_clean ) {
for (auto& in: input_pairs2) {
data[0] = in.first;
if (!queue->Put(in.second, data.data(), data.size())) {
if (queue->Put(in.second, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -1839,7 +1862,7 @@ BOOST_AUTO_TEST_CASE( queue_empty_cursor_reset ) {
for (int i = 0; i < 32; i++) {
reinterpret_cast<int *>(data.data())[0] = i;
if (!queue->Put(0, data.data(), data.size())) {
if (queue->Put(0, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}
@ -1872,7 +1895,7 @@ BOOST_AUTO_TEST_CASE( queue_empty_cursor_reset ) {
for (int i = 0; i < 32; i++) {
reinterpret_cast<int *>(data.data())[0] = i;
if (!queue->Put(0, data.data(), data.size())) {
if (queue->Put(0, data.data(), data.size()) != 1) {
BOOST_FAIL("queue->Put() failed!");
}
}

Просмотреть файл

@ -73,16 +73,14 @@ int RawEvent::AddEvent(EventBuilder& builder) {
if (_num_dropped_records > 0 && _drop_count.size() > 0) {
num_records += 1;
}
auto ret = builder.BeginEvent(_event_id.Seconds(), _event_id.Milliseconds(), _event_id.Serial(), num_records);
if (ret != 1) {
return ret;
if (!builder.BeginEvent(_event_id.Seconds(), _event_id.Milliseconds(), _event_id.Serial(), num_records)) {
return 0;
}
if (_syscall_rec_idx > -1) {
ret = _records[_syscall_rec_idx]->AddRecord(builder);
if (ret != 1) {
if (!_records[_syscall_rec_idx]->AddRecord(builder)) {
builder.CancelEvent();
return ret;
return 0;
}
_records[_syscall_rec_idx].reset(nullptr);
}
@ -91,44 +89,39 @@ int RawEvent::AddEvent(EventBuilder& builder) {
if (!rec) {
continue;
}
ret = rec->AddRecord(builder);
if (ret != 1) {
if (!rec->AddRecord(builder)) {
builder.CancelEvent();
return ret;
return 0;
}
if (rec->GetRecordType() == RecordType::EXECVE) {
for (std::unique_ptr<RawEventRecord>& rec: _execve_records) {
ret = rec->AddRecord(builder);
if (ret != 1) {
if (!rec->AddRecord(builder)) {
builder.CancelEvent();
return ret;
return 0;
}
}
}
}
if (_num_dropped_records > 0 && _drop_count.size() > 0) {
ret = builder.BeginRecord(static_cast<uint32_t>(RecordType::AUOMS_DROPPED_RECORDS), std::string_view(RecordTypeToName(RecordType::AUOMS_DROPPED_RECORDS)), std::string_view(""), static_cast<uint16_t>(_drop_count.size()));
if (ret != 1) {
if (!builder.BeginRecord(static_cast<uint32_t>(RecordType::AUOMS_DROPPED_RECORDS), std::string_view(RecordTypeToName(RecordType::AUOMS_DROPPED_RECORDS)), std::string_view(""), static_cast<uint16_t>(_drop_count.size()))) {
builder.CancelEvent();
return ret;
return 0;
}
for (auto& e: _drop_count) {
ret = builder.AddField(RecordTypeToName(e.first), std::to_string(e.second), "", field_type_t::UNCLASSIFIED);
if (ret != 1) {
if (!builder.AddField(RecordTypeToName(e.first), std::to_string(e.second), "", field_type_t::UNCLASSIFIED)) {
builder.CancelEvent();
return ret;
return 0;
}
}
ret = builder.EndRecord();
if (ret != 1) {
if (!builder.EndRecord()) {
builder.CancelEvent();
return ret;
return 0;
}
}
return builder.EndEvent();
}
int RawEventAccumulator::AddRecord(std::unique_ptr<RawEventRecord> record) {
bool RawEventAccumulator::AddRecord(std::unique_ptr<RawEventRecord> record) {
std::lock_guard<std::mutex> lock(_mutex);
_bytes_metric->Update(static_cast<double>(record->GetSize()));
@ -136,14 +129,15 @@ int RawEventAccumulator::AddRecord(std::unique_ptr<RawEventRecord> record) {
// Drop empty records unless it is the EOE record.
if (record->IsEmpty() && record->GetRecordType() != RecordType::EOE) {
return 0;
return false;
}
auto event_id = record->GetEventId();
int ret = 0;
auto found = _events.on(event_id, [this,&record,&ret](size_t entry_count, const std::chrono::steady_clock::time_point& last_touched, std::shared_ptr<RawEvent>& event) {
auto found = _events.on(event_id, [this,&record](size_t entry_count, const std::chrono::steady_clock::time_point& last_touched, std::shared_ptr<RawEvent>& event) {
if (event->AddRecord(std::move(record))) {
ret = event->AddEvent(*_builder);
if (event->AddEvent(*_builder) == -1) {
_dropped_event_metric->Update(1.0);
}
return CacheEntryOP::REMOVE;
} else {
return CacheEntryOP::TOUCH;
@ -153,7 +147,10 @@ int RawEventAccumulator::AddRecord(std::unique_ptr<RawEventRecord> record) {
auto event = std::make_shared<RawEvent>(record->GetEventId());
if (event->AddRecord(std::move(record))) {
_event_metric->Update(1.0);
return event->AddEvent(*_builder);
if (event->AddEvent(*_builder) == -1) {
_dropped_event_metric->Update(1.0);
}
return true;
} else {
_events.add(event_id, event);
}
@ -161,13 +158,15 @@ int RawEventAccumulator::AddRecord(std::unique_ptr<RawEventRecord> record) {
// Don't wait for Flush to be called, preemptively flush oldest if the cache size limit is exceeded
_events.for_all_oldest_first([this](size_t entry_count, const std::chrono::steady_clock::time_point& last_touched, const EventId& key, std::shared_ptr<RawEvent>& event) {
if (entry_count > MAX_CACHE_ENTRY) {
event->AddEvent(*_builder);
if (event->AddEvent(*_builder) == -1) {
_dropped_event_metric->Update(1.0);
}
_event_metric->Update(1.0);
return CacheEntryOP::REMOVE;
}
return CacheEntryOP::STOP;
});
return 1;
return true;
}
void RawEventAccumulator::Flush(long milliseconds) {
@ -177,7 +176,9 @@ void RawEventAccumulator::Flush(long milliseconds) {
_events.for_all_oldest_first([this,now,milliseconds](size_t entry_count, const std::chrono::steady_clock::time_point& last_touched, const EventId& key, std::shared_ptr<RawEvent>& event) {
if (entry_count > MAX_CACHE_ENTRY || std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()-last_touched.time_since_epoch()) > std::chrono::milliseconds(milliseconds)) {
event->AddEvent(*_builder);
if (event->AddEvent(*_builder) == -1) {
_dropped_event_metric->Update(1.0);
}
_event_metric->Update(1.0);
return CacheEntryOP::REMOVE;
}
@ -185,7 +186,9 @@ void RawEventAccumulator::Flush(long milliseconds) {
});
} else {
_events.for_all_oldest_first([this](size_t entry_count, const std::chrono::steady_clock::time_point& last_touched, const EventId& key, std::shared_ptr<RawEvent>& event) {
event->AddEvent(*_builder);
if (event->AddEvent(*_builder) == -1) {
_dropped_event_metric->Update(1.0);
}
_event_metric->Update(1.0);
return CacheEntryOP::REMOVE;
});

Просмотреть файл

@ -58,9 +58,10 @@ public:
_bytes_metric = _metrics->AddMetric(MetricType::METRIC_BY_ACCUMULATION, "raw_data", "bytes", MetricPeriod::SECOND, MetricPeriod::HOUR);
_record_metric = _metrics->AddMetric(MetricType::METRIC_BY_ACCUMULATION, "raw_data", "records", MetricPeriod::SECOND, MetricPeriod::HOUR);
_event_metric = _metrics->AddMetric(MetricType::METRIC_BY_ACCUMULATION, "raw_data", "events", MetricPeriod::SECOND, MetricPeriod::HOUR);
_dropped_event_metric = _metrics->AddMetric(MetricType::METRIC_BY_ACCUMULATION, "raw_data", "dropped_events", MetricPeriod::SECOND, MetricPeriod::HOUR);
}
int AddRecord(std::unique_ptr<RawEventRecord> record);
bool AddRecord(std::unique_ptr<RawEventRecord> record);
void Flush(long milliseconds);
private:
@ -71,6 +72,7 @@ private:
std::shared_ptr<Metric> _bytes_metric;
std::shared_ptr<Metric> _record_metric;
std::shared_ptr<Metric> _event_metric;
std::shared_ptr<Metric> _dropped_event_metric;
Cache<EventId, std::shared_ptr<RawEvent>> _events;
};

Просмотреть файл

@ -161,7 +161,7 @@ bool RawEventRecord::Parse(RecordType record_type, size_t size) {
return false;
}
int RawEventRecord::AddRecord(EventBuilder& builder) {
bool RawEventRecord::AddRecord(EventBuilder& builder) {
static auto SV_NODE = "node"sv;
static auto SV_UNPARSED_TEXT = "unparsed_text"sv;
@ -170,36 +170,34 @@ int RawEventRecord::AddRecord(EventBuilder& builder) {
num_fields++;
}
auto ret = builder.BeginRecord(static_cast<uint32_t>(_record_type), _type_name, std::string_view(_data.data(), _size), num_fields);
if (ret != 1) {
return ret;
if (!builder.BeginRecord(static_cast<uint32_t>(_record_type), _type_name, std::string_view(_data.data(), _size), num_fields)) {
return false;
}
if (!_node.empty()) {
ret = builder.AddField(SV_NODE, _node, nullptr, field_type_t::UNCLASSIFIED);
if (ret != 1) {
return ret;
if (!builder.AddField(SV_NODE, _node, nullptr, field_type_t::UNCLASSIFIED)) {
return false;
}
}
// If record is marked as unparsable, then the text (after the 'audit():' section is included as the only value in
// _record_fields
if (_unparsable) {
ret = builder.AddField(SV_UNPARSED_TEXT, _record_fields[0], nullptr, field_type_t::UNESCAPED);
if (ret != 1) {
return ret;
if (!builder.AddField(SV_UNPARSED_TEXT, _record_fields[0], nullptr, field_type_t::UNESCAPED)) {
return false;
}
return builder.EndRecord();
}
for (auto f: _record_fields) {
auto idx = f.find_first_of('=');
bool ret;
if (idx == std::string_view::npos) {
ret = builder.AddField(f, std::string_view(), nullptr, field_type_t::UNCLASSIFIED);
} else {
ret = builder.AddField(f.substr(0, idx), f.substr(idx + 1), nullptr, field_type_t::UNCLASSIFIED);
}
if (ret != 1) {
if (!ret) {
return ret;
}
}

Просмотреть файл

@ -34,7 +34,7 @@ public:
inline char* Data() { return _data.data(); };
bool Parse(RecordType record_type, size_t size);
int AddRecord(EventBuilder& builder);
bool AddRecord(EventBuilder& builder);
inline EventId GetEventId() { return _event_id; }
inline RecordType GetRecordType() { return _record_type; }

Просмотреть файл

@ -30,9 +30,9 @@ public:
return true;
}
virtual bool Commit() {
virtual int Commit() {
_events.emplace_back(std::make_shared<std::vector<uint8_t>>(_buffer.begin(), _buffer.end()));
return true;
return 1;
}
virtual bool Rollback() {

Просмотреть файл

@ -99,7 +99,7 @@ int main(int argc, char**argv) {
bool netlink_only = false;
int opt;
while ((opt = getopt(argc, argv, "nc:")) != -1) {
while ((opt = getopt(argc, argv, "c:ns")) != -1) {
switch (opt) {
case 'c':
config_file = optarg;
@ -339,12 +339,18 @@ int main(int argc, char**argv) {
// systemd may not have put auoms into the default cgroup at this point
// Wait a few seconds before moving into the right cgroup so we avoid getting moved back out by systemd
std::thread cg_thread([&cgcpu]() {
sleep(5);
try {
cgcpu->AddSelf();
} catch (const std::exception &ex) {
Logger::Error("Failed to configure cpu cgroup: %s", ex.what());
Logger::Warn("CPU Limits cannot be enforced");
int sleep_time = 5;
// Loop forever to make sure we stay in our cgroup
while (!Signals::IsExit()) {
sleep(sleep_time);
sleep_time = 60;
try {
cgcpu->AddSelf();
} catch (const std::exception &ex) {
Logger::Error("Failed to configure cpu cgroup: %s", ex.what());
Logger::Warn("CPU Limits cannot be enforced");
return;
}
}
});
cg_thread.detach();

Просмотреть файл

@ -7,7 +7,7 @@
AUOMS_BUILDVERSION_MAJOR=2
AUOMS_BUILDVERSION_MINOR=3
AUOMS_BUILDVERSION_PATCH=0
AUOMS_BUILDVERSION_PATCH=1
AUOMS_BUILDVERSION_BUILDNR=0
AUOMS_BUILDVERSION_DATE=20200903
AUOMS_BUILDVERSION_DATE=20201113
AUOMS_BUILDVERSION_STATUS=Developer_Build

Просмотреть файл

@ -511,6 +511,7 @@ int main(int argc, char**argv) {
}
Logger::Info("Acquire singleton lock");
std::atomic_long ingest_thread_id(0);
std::shared_ptr<CGroupCPU> cgcpu_root;
std::shared_ptr<CGroupCPU> cgcpu;
if (!disable_cgroups) {
@ -519,13 +520,29 @@ int main(int argc, char**argv) {
cgcpu = CPULimits::CGFromConfig(config, "auomscollect");
// systemd may not have put auomscollect into the default cgroup at this point
// Wait a few seconds before moving into the right cgroup so we avoid getting moved back out by systemd
std::thread cg_thread([&cgcpu]() {
sleep(5);
try {
cgcpu->AddSelf();
} catch (const std::exception &ex) {
Logger::Error("Failed to configure cpu cgroup: %s", ex.what());
Logger::Warn("CPU Limits cannot be enforced");
std::thread cg_thread([&cgcpu_root,&cgcpu,&ingest_thread_id]() {
int sleep_time = 10;
// Loop forever to make sure we stay in our cgroup
while (!Signals::IsExit()) {
sleep(sleep_time);
sleep_time = 60;
try {
cgcpu->AddSelf();
} catch (const std::exception &ex) {
Logger::Error("Failed to configure cpu cgroup: %s", ex.what());
Logger::Warn("CPU Limits cannot be enforced");
return;
}
long tid = ingest_thread_id.load();
if (tid != 0) {
try {
cgcpu_root->AddThread(tid);
} catch (std::runtime_error &ex) {
Logger::Error("Failed to move ingest thread to root cgroup: %s", ex.what());
// Set the id back to 0 so we don't keep trying.
ingest_thread_id.store(0);
}
}
}
});
cg_thread.detach();
@ -622,18 +639,7 @@ int main(int argc, char**argv) {
std::thread ingest_thread([&]() {
auto thread_id = CGroups::GetSelfThreadId();
Logger::Info("Starting ingest thead (%ld)", thread_id);
// Move this thread back to the root cgroup (thus outside the auomscollect specific cgroup
if (!disable_cgroups && cgcpu_root) {
std::thread cg_thread([&cgcpu_root, thread_id]() {
sleep(10);
try {
cgcpu_root->AddThread(thread_id);
} catch (std::runtime_error &ex) {
Logger::Error("Failed to move ingest thread to root cgroup: %s", ex.what());
}
});
cg_thread.detach();
}
ingest_thread_id.store(thread_id);
if (netlink_mode) {
bool restart;
do {

Просмотреть файл

@ -11,6 +11,8 @@ KillMode=process
ExecStart=/etc/init.d/auoms start
ExecStop=/etc/init.d/auoms stop
ExecReload=/etc/init.d/auoms reload
CPUWeight=5
CPUQuota=25%
[Install]
WantedBy=multi-user.target