Refactor long, complicated loops into functions
This commit is contained in:
Родитель
74990f3d0c
Коммит
0a399a61dc
|
@ -3,6 +3,8 @@ package appinsights
|
|||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"code.cloudfoundry.org/clock"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -97,167 +99,206 @@ func (channel *InMemoryChannel) Close(flush bool, retry bool, timeout time.Durat
|
|||
}
|
||||
|
||||
func (channel *InMemoryChannel) acceptLoop() {
|
||||
buffer := make(TelemetryBufferItems, 0, 16)
|
||||
stopping := false
|
||||
channelState := newInMemoryChannelState(channel)
|
||||
|
||||
mainLoop:
|
||||
for !stopping {
|
||||
if len(buffer) > 16 {
|
||||
// Start out with the size of the previous buffer
|
||||
buffer = make(TelemetryBufferItems, 0, cap(buffer))
|
||||
} else if len(buffer) > 0 {
|
||||
// Start out with at least 16 slots
|
||||
buffer = make(TelemetryBufferItems, 0, 16)
|
||||
for !channelState.stopping {
|
||||
channelState.start()
|
||||
}
|
||||
|
||||
channelState.stop()
|
||||
}
|
||||
|
||||
// Data shared between parts of a channel
|
||||
type inMemoryChannelState struct {
|
||||
channel *InMemoryChannel
|
||||
stopping bool
|
||||
buffer TelemetryBufferItems
|
||||
retry bool
|
||||
retryTimeout time.Duration
|
||||
callback chan bool
|
||||
timer clock.Timer
|
||||
}
|
||||
|
||||
func newInMemoryChannelState(channel *InMemoryChannel) *inMemoryChannelState {
|
||||
return &inMemoryChannelState{
|
||||
channel: channel,
|
||||
buffer: make(TelemetryBufferItems, 0, 16),
|
||||
stopping: false,
|
||||
timer: currentClock.NewTimer(channel.batchInterval),
|
||||
}
|
||||
}
|
||||
|
||||
// Part of channel accept loop: Initialize buffer and accept first message, handle controls.
|
||||
func (state *inMemoryChannelState) start() bool {
|
||||
if len(state.buffer) > 16 {
|
||||
// Start out with the size of the previous buffer
|
||||
state.buffer = make(TelemetryBufferItems, 0, cap(state.buffer))
|
||||
} else if len(state.buffer) > 0 {
|
||||
// Start out with at least 16 slots
|
||||
state.buffer = make(TelemetryBufferItems, 0, 16)
|
||||
}
|
||||
|
||||
// Wait for an event
|
||||
select {
|
||||
case event := <-state.channel.collectChan:
|
||||
if event == nil {
|
||||
// Channel closed? Not intercepted by Send()?
|
||||
panic("Received nil event")
|
||||
}
|
||||
|
||||
// Wait for an event
|
||||
state.buffer = append(state.buffer, event)
|
||||
|
||||
case ctl := <-state.channel.controlChan:
|
||||
// The buffer is empty, so there would be no point in flushing
|
||||
state.channel.signalWhenDone(ctl.callback)
|
||||
|
||||
if ctl.stop {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if len(state.buffer) == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
return state.waitToSend()
|
||||
}
|
||||
|
||||
// Part of channel accept loop: Wait for buffer to fill, timeout to expire, or flush
|
||||
func (state *inMemoryChannelState) waitToSend() bool {
|
||||
// Things that are used by the sender if we receive a control message
|
||||
state.retryTimeout = 0
|
||||
state.retry = true
|
||||
state.callback = nil
|
||||
|
||||
// Delay until timeout passes or buffer fills up
|
||||
state.timer.Reset(state.channel.batchInterval)
|
||||
for {
|
||||
select {
|
||||
case event := <-channel.collectChan:
|
||||
case event := <-state.channel.collectChan:
|
||||
if event == nil {
|
||||
// Channel closed? Not intercepted by Send()?
|
||||
panic("Received nil event")
|
||||
}
|
||||
|
||||
buffer = append(buffer, event)
|
||||
state.buffer = append(state.buffer, event)
|
||||
if len(state.buffer) >= state.channel.batchSize {
|
||||
return state.send()
|
||||
}
|
||||
|
||||
case ctl := <-channel.controlChan:
|
||||
// The buffer is empty, so there would be no point in flushing
|
||||
case ctl := <-state.channel.controlChan:
|
||||
if ctl.stop {
|
||||
stopping = true
|
||||
}
|
||||
|
||||
channel.signalWhenDone(ctl.callback)
|
||||
}
|
||||
|
||||
if len(buffer) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Things that are used by the sender if we receive a control message
|
||||
var retryTimeout time.Duration = 0
|
||||
var retry bool = true
|
||||
var callback chan bool
|
||||
|
||||
// Delay until timeout passes or buffer fills up
|
||||
timer := currentClock.NewTimer(channel.batchInterval)
|
||||
waitLoop:
|
||||
for {
|
||||
select {
|
||||
case event := <-channel.collectChan:
|
||||
if event == nil {
|
||||
// Channel closed? Not intercepted by Send()?
|
||||
panic("Received nil event")
|
||||
}
|
||||
|
||||
buffer = append(buffer, event)
|
||||
if len(buffer) >= channel.batchSize {
|
||||
break waitLoop
|
||||
}
|
||||
|
||||
case ctl := <-channel.controlChan:
|
||||
if ctl.stop {
|
||||
stopping = true
|
||||
retry = ctl.retry
|
||||
if !ctl.flush {
|
||||
// No flush? Just exit.
|
||||
channel.signalWhenDone(ctl.callback)
|
||||
break mainLoop
|
||||
}
|
||||
}
|
||||
|
||||
if ctl.flush {
|
||||
retryTimeout = ctl.timeout
|
||||
callback = ctl.callback
|
||||
break waitLoop
|
||||
}
|
||||
|
||||
case _ = <-timer.C():
|
||||
// Timeout expired
|
||||
timer = nil
|
||||
break waitLoop
|
||||
}
|
||||
}
|
||||
|
||||
if timer != nil && !timer.Stop() {
|
||||
<-timer.C()
|
||||
}
|
||||
|
||||
// Hold up transmission if we're being throttled
|
||||
if !stopping && channel.throttle.IsThrottled() {
|
||||
// Channel is currently throttled. Once the buffer fills, messages will
|
||||
// be lost... If we're exiting, then we'll just try to submit anyway. That
|
||||
// request may be throttled and transmitRetry will perform the backoff correctly.
|
||||
|
||||
diagnosticsWriter.Write("Channel is throttled, events may be dropped.")
|
||||
throttleDone := channel.throttle.NotifyWhenReady()
|
||||
dropped := 0
|
||||
|
||||
throttledLoop:
|
||||
for {
|
||||
select {
|
||||
case <-throttleDone:
|
||||
close(throttleDone)
|
||||
break throttledLoop
|
||||
|
||||
case event := <-channel.collectChan:
|
||||
// If there's still room in the buffer, then go ahead and add it.
|
||||
if len(buffer) < channel.batchSize {
|
||||
buffer = append(buffer, event)
|
||||
} else {
|
||||
if dropped == 0 {
|
||||
diagnosticsWriter.Write("Buffer is full, dropping further events.")
|
||||
}
|
||||
|
||||
dropped++
|
||||
}
|
||||
|
||||
case ctl := <-channel.controlChan:
|
||||
if ctl.stop {
|
||||
stopping = true
|
||||
retry = ctl.retry
|
||||
if !ctl.flush {
|
||||
channel.signalWhenDone(ctl.callback)
|
||||
break mainLoop
|
||||
} else {
|
||||
// Make an exception when stopping
|
||||
break throttledLoop
|
||||
}
|
||||
}
|
||||
|
||||
// Cannot flush
|
||||
// TODO: Figure out what to do about callback?
|
||||
if ctl.flush {
|
||||
channel.signalWhenDone(ctl.callback)
|
||||
}
|
||||
state.stopping = true
|
||||
state.retry = ctl.retry
|
||||
if !ctl.flush {
|
||||
// No flush? Just exit.
|
||||
state.channel.signalWhenDone(ctl.callback)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
diagnosticsWriter.Printf("Channel dropped %d events while throttled", dropped)
|
||||
if ctl.flush {
|
||||
state.retryTimeout = ctl.timeout
|
||||
state.callback = ctl.callback
|
||||
return state.send()
|
||||
}
|
||||
|
||||
case _ = <-state.timer.C():
|
||||
// Timeout expired
|
||||
return state.send()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send
|
||||
if len(buffer) > 0 {
|
||||
channel.waitgroup.Add(1)
|
||||
go func(buffer TelemetryBufferItems, callback chan bool, retry bool, retryTimeout time.Duration) {
|
||||
defer channel.waitgroup.Done()
|
||||
|
||||
// If we have a callback, wait on the waitgroup now that it's
|
||||
// incremented.
|
||||
channel.signalWhenDone(callback)
|
||||
|
||||
channel.transmitRetry(buffer, retry, retryTimeout)
|
||||
}(buffer, callback, retry, retryTimeout)
|
||||
} else if callback != nil {
|
||||
channel.signalWhenDone(callback)
|
||||
// Part of channel accept loop: Check and wait on throttle, submit pending telemetry
|
||||
func (state *inMemoryChannelState) send() bool {
|
||||
// Hold up transmission if we're being throttled
|
||||
if !state.stopping && state.channel.throttle.IsThrottled() {
|
||||
if !state.waitThrottle() {
|
||||
// Stopped
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
close(channel.collectChan)
|
||||
close(channel.controlChan)
|
||||
// Send
|
||||
if len(state.buffer) > 0 {
|
||||
state.channel.waitgroup.Add(1)
|
||||
|
||||
// If we have a callback, wait on the waitgroup now that it's
|
||||
// incremented.
|
||||
state.channel.signalWhenDone(state.callback)
|
||||
|
||||
go func(buffer TelemetryBufferItems, retry bool, retryTimeout time.Duration) {
|
||||
defer state.channel.waitgroup.Done()
|
||||
state.channel.transmitRetry(buffer, retry, retryTimeout)
|
||||
}(state.buffer, state.retry, state.retryTimeout)
|
||||
} else if state.callback != nil {
|
||||
state.channel.signalWhenDone(state.callback)
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// Part of channel accept loop: Wait for throttle to expire while dropping messages
|
||||
func (state *inMemoryChannelState) waitThrottle() bool {
|
||||
// Channel is currently throttled. Once the buffer fills, messages will
|
||||
// be lost... If we're exiting, then we'll just try to submit anyway. That
|
||||
// request may be throttled and transmitRetry will perform the backoff correctly.
|
||||
|
||||
diagnosticsWriter.Write("Channel is throttled, events may be dropped.")
|
||||
throttleDone := state.channel.throttle.NotifyWhenReady()
|
||||
dropped := 0
|
||||
|
||||
defer diagnosticsWriter.Printf("Channel dropped %d events while throttled", dropped)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-throttleDone:
|
||||
close(throttleDone)
|
||||
return true
|
||||
|
||||
case event := <-state.channel.collectChan:
|
||||
// If there's still room in the buffer, then go ahead and add it.
|
||||
if len(state.buffer) < state.channel.batchSize {
|
||||
state.buffer = append(state.buffer, event)
|
||||
} else {
|
||||
if dropped == 0 {
|
||||
diagnosticsWriter.Write("Buffer is full, dropping further events.")
|
||||
}
|
||||
|
||||
dropped++
|
||||
}
|
||||
|
||||
case ctl := <-state.channel.controlChan:
|
||||
if ctl.stop {
|
||||
state.stopping = true
|
||||
state.retry = ctl.retry
|
||||
if !ctl.flush {
|
||||
state.channel.signalWhenDone(ctl.callback)
|
||||
return false
|
||||
} else {
|
||||
// Make an exception when stopping
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// Cannot flush
|
||||
// TODO: Figure out what to do about callback?
|
||||
if ctl.flush {
|
||||
state.channel.signalWhenDone(ctl.callback)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Part of channel accept loop: Clean up and close telemetry channel
|
||||
func (state *inMemoryChannelState) stop() {
|
||||
close(state.channel.collectChan)
|
||||
close(state.channel.controlChan)
|
||||
|
||||
// Throttle can't close until transmitters are done using it.
|
||||
channel.waitgroup.Wait()
|
||||
channel.throttle.Stop()
|
||||
state.channel.waitgroup.Wait()
|
||||
state.channel.throttle.Stop()
|
||||
}
|
||||
|
||||
func (channel *InMemoryChannel) transmitRetry(items TelemetryBufferItems, retry bool, retryTimeout time.Duration) {
|
||||
|
|
|
@ -67,72 +67,78 @@ func (throttle *throttleManager) Stop() {
|
|||
}
|
||||
|
||||
func (throttle *throttleManager) run() {
|
||||
mainLoop:
|
||||
for {
|
||||
// --- Not throttled ---
|
||||
var throttledUntil time.Time
|
||||
|
||||
notThrottledLoop:
|
||||
for {
|
||||
msg := <-throttle.msgs
|
||||
if msg.query {
|
||||
msg.result <- false
|
||||
} else if msg.wait {
|
||||
msg.result <- true
|
||||
} else if msg.stop {
|
||||
break mainLoop
|
||||
} else if msg.throttle {
|
||||
throttledUntil = msg.timestamp
|
||||
break notThrottledLoop
|
||||
}
|
||||
throttledUntil, ok := throttle.waitForThrottle()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
duration := throttledUntil.Sub(currentClock.Now())
|
||||
if duration < 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
var notify []chan bool
|
||||
|
||||
// --- Throttled and waiting ---
|
||||
t := currentClock.NewTimer(duration)
|
||||
|
||||
throttleLoop:
|
||||
for {
|
||||
select {
|
||||
case <-t.C():
|
||||
for _, n := range notify {
|
||||
n <- true
|
||||
}
|
||||
|
||||
break throttleLoop
|
||||
case msg := <-throttle.msgs:
|
||||
if msg.query {
|
||||
msg.result <- true
|
||||
} else if msg.wait {
|
||||
notify = append(notify, msg.result)
|
||||
} else if msg.stop {
|
||||
for _, n := range notify {
|
||||
n <- false
|
||||
}
|
||||
|
||||
msg.result <- true
|
||||
|
||||
break mainLoop
|
||||
} else if msg.throttle {
|
||||
if msg.timestamp.After(throttledUntil) {
|
||||
throttledUntil = msg.timestamp
|
||||
|
||||
if !t.Stop() {
|
||||
<-t.C()
|
||||
}
|
||||
|
||||
t.Reset(throttledUntil.Sub(currentClock.Now()))
|
||||
}
|
||||
}
|
||||
}
|
||||
if !throttle.waitForReady(throttledUntil) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
close(throttle.msgs)
|
||||
}
|
||||
|
||||
func (throttle *throttleManager) waitForThrottle() (time.Time, bool) {
|
||||
for {
|
||||
msg := <-throttle.msgs
|
||||
if msg.query {
|
||||
msg.result <- false
|
||||
} else if msg.wait {
|
||||
msg.result <- true
|
||||
} else if msg.stop {
|
||||
return time.Time{}, false
|
||||
} else if msg.throttle {
|
||||
return msg.timestamp, true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (throttle *throttleManager) waitForReady(throttledUntil time.Time) bool {
|
||||
duration := throttledUntil.Sub(currentClock.Now())
|
||||
if duration <= 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
var notify []chan bool
|
||||
|
||||
// --- Throttled and waiting ---
|
||||
t := currentClock.NewTimer(duration)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-t.C():
|
||||
for _, n := range notify {
|
||||
n <- true
|
||||
}
|
||||
|
||||
return true
|
||||
case msg := <-throttle.msgs:
|
||||
if msg.query {
|
||||
msg.result <- true
|
||||
} else if msg.wait {
|
||||
notify = append(notify, msg.result)
|
||||
} else if msg.stop {
|
||||
for _, n := range notify {
|
||||
n <- false
|
||||
}
|
||||
|
||||
msg.result <- true
|
||||
|
||||
return false
|
||||
} else if msg.throttle {
|
||||
if msg.timestamp.After(throttledUntil) {
|
||||
throttledUntil = msg.timestamp
|
||||
|
||||
if !t.Stop() {
|
||||
<-t.C()
|
||||
}
|
||||
|
||||
t.Reset(throttledUntil.Sub(currentClock.Now()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче