fix: move bufio reader creation out of for loop to fix telemetry unmarshal errors (#2789)

* move bufio reader creation out of for loop

if the bufio reader is created in the for loop we get unmarshaling errors

* fix linter issue

* add fixed ut

* fix existing unit test flake due to closing pipe on error

a previous fix ensured the socket closed on error, but this caused an existing ut to nondeterministically fail
without the previous fix, the socket wouldn't have been closed on error

* make read inline
This commit is contained in:
QxBytes 2024-06-21 11:42:27 -07:00 коммит произвёл GitHub
Родитель 44fa6a0660
Коммит ff0c3ea315
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
2 изменённых файлов: 37 добавлений и 15 удалений

Просмотреть файл

@ -136,12 +136,14 @@ func (tb *TelemetryBuffer) StartServer() error {
tb.connections = remove(tb.connections, index)
}
}()
reader := bufio.NewReader(conn)
for {
reportStr, err := read(conn)
if err != nil {
reportStr, readErr := reader.ReadBytes(Delimiter)
if readErr != nil {
return
}
reportStr = reportStr[:len(reportStr)-1]
var tmp map[string]interface{}
err = json.Unmarshal(reportStr, &tmp)
if err != nil {
@ -228,16 +230,6 @@ func (tb *TelemetryBuffer) PushData(ctx context.Context) {
}
}
// read - read from the file descriptor
func read(conn net.Conn) (b []byte, err error) {
b, err = bufio.NewReader(conn).ReadBytes(Delimiter)
if err == nil {
b = b[:len(b)-1]
}
return
}
// Write - write to the file descriptor.
func (tb *TelemetryBuffer) Write(b []byte) (c int, err error) {
buf := make([]byte, len(b))

Просмотреть файл

@ -70,6 +70,36 @@ func TestClientConnClose(t *testing.T) {
tbClient.Close()
}
func TestCloseOnWriteError(t *testing.T) {
tbServer, closeTBServer := createTBServer(t)
defer closeTBServer()
tbClient := NewTelemetryBuffer(nil)
err := tbClient.Connect()
require.NoError(t, err)
defer tbClient.Close()
data := []byte("{\"good\":1}")
_, err = tbClient.Write(data)
require.NoError(t, err)
// need to wait for connection to populate in server
time.Sleep(1 * time.Second)
tbServer.mutex.Lock()
conns := tbServer.connections
tbServer.mutex.Unlock()
require.Len(t, conns, 1)
// the connection should be automatically closed on failure
badData := []byte("} malformed json }}}")
_, err = tbClient.Write(badData)
require.NoError(t, err)
time.Sleep(1 * time.Second)
tbServer.mutex.Lock()
conns = tbServer.connections
tbServer.mutex.Unlock()
require.Empty(t, conns)
}
func TestWrite(t *testing.T) {
_, closeTBServer := createTBServer(t)
defer closeTBServer()
@ -87,8 +117,8 @@ func TestWrite(t *testing.T) {
}{
{
name: "write",
data: []byte("testdata"),
want: len("testdata") + 1, // +1 due to Delimiter('\n)
data: []byte("{\"testdata\":1}"),
want: len("{\"testdata\":1}") + 1, // +1 due to Delimiter('\n)
wantErr: false,
},
{