From d359daaa487e68d187cc30c9da8fc08a158c7f79 Mon Sep 17 00:00:00 2001 From: Silvan Jegen Date: Sat, 15 Oct 2016 16:12:15 +0200 Subject: [PATCH] Clean up journald logger We clean up the journald logger with these four changes. 1. Make field array static 2. Make function name more appropriate 3. Initialize the file descriptors only once 4. Avoid copying the journald cursor Point 4 is the most significant change: instead of treating the journald cursor like a Go string we use it as a raw C.char pointer. That way we avoid the copying by the C.CString and C.GoString functions. Signed-off-by: Silvan Jegen --- daemon/logger/journald/read.go | 56 ++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index 04370fdbc0..d91eb809bc 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -56,7 +56,7 @@ package journald //} //static int is_attribute_field(const char *msg, size_t length) //{ -// const struct known_field { +// static const struct known_field { // const char *name; // size_t length; // } fields[] = { @@ -101,21 +101,23 @@ package journald // } // return rc; //} -//static int wait_for_data_or_close(sd_journal *j, int pipefd) +//static int wait_for_data_cancelable(sd_journal *j, int pipefd) //{ // struct pollfd fds[2]; // uint64_t when = 0; // int timeout, jevents, i; // struct timespec ts; // uint64_t now; +// +// memset(&fds, 0, sizeof(fds)); +// fds[0].fd = pipefd; +// fds[0].events = POLLHUP; +// fds[1].fd = sd_journal_get_fd(j); +// if (fds[1].fd < 0) { +// return fds[1].fd; +// } +// // do { -// memset(&fds, 0, sizeof(fds)); -// fds[0].fd = pipefd; -// fds[0].events = POLLHUP; -// fds[1].fd = sd_journal_get_fd(j); -// if (fds[1].fd < 0) { -// return fds[1].fd; -// } // jevents = sd_journal_get_events(j); // if (jevents < 0) { // return jevents; @@ -167,7 +169,7 @@ func (s *journald) Close() error { return nil } -func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, oldCursor string) string { +func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, oldCursor *C.char) *C.char { var msg, data, cursor *C.char var length C.size_t var stamp C.uint64_t @@ -177,10 +179,8 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.Rea drain: for { // Try not to send a given entry twice. - if oldCursor != "" { - ccursor := C.CString(oldCursor) - defer C.free(unsafe.Pointer(ccursor)) - for C.sd_journal_test_cursor(j, ccursor) > 0 { + if oldCursor != nil { + for C.sd_journal_test_cursor(j, oldCursor) > 0 { if C.sd_journal_next(j) <= 0 { break drain } @@ -234,25 +234,24 @@ drain: break } } - retCursor := "" - if C.sd_journal_get_cursor(j, &cursor) == 0 { - retCursor = C.GoString(cursor) - C.free(unsafe.Pointer(cursor)) - } - return retCursor + + // free(NULL) is safe + C.free(unsafe.Pointer(oldCursor)) + C.sd_journal_get_cursor(j, &cursor) + return cursor } -func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor string) { +func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor *C.char) *C.char { s.readers.mu.Lock() s.readers.readers[logWatcher] = logWatcher s.readers.mu.Unlock() go func() { // Keep copying journal data out until we're notified to stop // or we hit an error. - status := C.wait_for_data_or_close(j, pfd[0]) + status := C.wait_for_data_cancelable(j, pfd[0]) for status == 1 { cursor = s.drainJournal(logWatcher, config, j, cursor) - status = C.wait_for_data_or_close(j, pfd[0]) + status = C.wait_for_data_cancelable(j, pfd[0]) } if status < 0 { cerrstr := C.strerror(C.int(-status)) @@ -274,15 +273,16 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.Re // Notify the other goroutine that its work is done. C.close(pfd[1]) } + + return cursor } func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) { var j *C.sd_journal - var cmatch *C.char + var cmatch, cursor *C.char var stamp C.uint64_t var sinceUnixMicro uint64 var pipes [2]C.int - cursor := "" // Get a handle to the journal. rc := C.sd_journal_open(&j, C.int(0)) @@ -370,7 +370,7 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon return } } - cursor = s.drainJournal(logWatcher, config, j, "") + cursor = s.drainJournal(logWatcher, config, j, nil) if config.Follow { // Allocate a descriptor for following the journal, if we'll // need one. Do it here so that we can report if it fails. @@ -382,13 +382,15 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon if C.pipe(&pipes[0]) == C.int(-1) { logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe") } else { - s.followJournal(logWatcher, config, j, pipes, cursor) + cursor = s.followJournal(logWatcher, config, j, pipes, cursor) // Let followJournal handle freeing the journal context // object and closing the channel. following = true } } } + + C.free(unsafe.Pointer(cursor)) return }