This commit is contained in:
JJ Jordan 2017-08-08 19:06:41 -07:00
Родитель 7f3707f3c7
Коммит f0d29ebfa9
9 изменённых файлов: 114 добавлений и 23 удалений

3
.gitmodules поставляемый Normal file
Просмотреть файл

@ -0,0 +1,3 @@
[submodule "appinsights/vendor/code.cloudfoundry.org/clock"]
path = vendor/code.cloudfoundry.org/clock
url = https://github.com/cloudfoundry/clock

Просмотреть файл

@ -3,11 +3,10 @@ package appinsights
import "testing"
func TestClientBurstPerformance(t *testing.T) {
config := NewTelemetryConfiguration("")
config.EndpointUrl = ""
telemetryClient := NewTelemetryClientFromConfig(config)
client := NewTelemetryClient("")
client.(*telemetryClient).channel.(*InMemoryChannel).transmitter = &nullTransmitter{}
for i := 0; i < 1000000; i++ {
telemetryClient.TrackTrace("A message")
client.TrackTrace("A message")
}
}

11
appinsights/clock.go Normal file
Просмотреть файл

@ -0,0 +1,11 @@
package appinsights
// We need to mock out the clock for tests; we'll use this to do it.
import "code.cloudfoundry.org/clock"
var currentClock clock.Clock
func init() {
currentClock = clock.NewClock()
}

Просмотреть файл

@ -20,6 +20,7 @@ type InMemoryChannel struct {
batchInterval time.Duration
waitgroup sync.WaitGroup
throttle *throttleManager
transmitter transmitter
}
type inMemoryChannelControl struct {
@ -47,6 +48,7 @@ func NewInMemoryChannel(config *TelemetryConfiguration) *InMemoryChannel {
batchSize: config.MaxBatchSize,
batchInterval: config.MaxBatchInterval,
throttle: newThrottleManager(),
transmitter: newTransmitter(config.EndpointUrl),
}
go channel.acceptLoop()
@ -137,7 +139,7 @@ mainLoop:
var callback chan bool
// Delay until timeout passes or buffer fills up
timer := time.NewTimer(channel.batchInterval)
timer := currentClock.NewTimer(channel.batchInterval)
waitLoop:
for {
select {
@ -169,7 +171,7 @@ mainLoop:
break waitLoop
}
case _ = <-timer.C:
case _ = <-timer.C():
// Timeout expired
timer = nil
break waitLoop
@ -177,7 +179,7 @@ mainLoop:
}
if timer != nil && !timer.Stop() {
<-timer.C
<-timer.C()
}
// Hold up transmission if we're being throttled
@ -262,7 +264,7 @@ func (channel *InMemoryChannel) transmitRetry(items TelemetryBufferItems, retry
retryTimeRemaining := retryTimeout
for _, wait := range submit_retries {
result, err := transmit(payload, items, channel.endpointAddress)
result, err := channel.transmitter.Transmit(payload, items)
if err == nil && result != nil && result.IsSuccess() {
return
}
@ -302,7 +304,7 @@ func (channel *InMemoryChannel) transmitRetry(items TelemetryBufferItems, retry
if retryTimeRemaining < wait {
// One more chance left -- we'll wait the max time we can
// and then retry on the way out.
time.Sleep(retryTimeRemaining)
currentClock.Sleep(retryTimeRemaining)
break
} else {
// Still have time left to go through the rest of the regular
@ -312,7 +314,7 @@ func (channel *InMemoryChannel) transmitRetry(items TelemetryBufferItems, retry
}
diagnosticsWriter.Printf("Waiting %s to retry submission", wait)
time.Sleep(wait)
currentClock.Sleep(wait)
// Wait if the channel is throttled and we're not on a schedule
if channel.IsThrottled() && retryTimeout == 0 {
@ -328,7 +330,7 @@ func (channel *InMemoryChannel) transmitRetry(items TelemetryBufferItems, retry
}
// One final try
_, err := transmit(payload, items, channel.endpointAddress)
_, err := channel.transmitter.Transmit(payload, items)
if err != nil {
diagnosticsWriter.Write("Gave up transmitting payload; exhausted retries")
}

Просмотреть файл

@ -0,0 +1 @@
package appinsights

Просмотреть файл

@ -87,7 +87,7 @@ mainLoop:
}
}
duration := throttledUntil.Sub(time.Now())
duration := throttledUntil.Sub(currentClock.Now())
if duration < 0 {
continue
}
@ -95,12 +95,12 @@ mainLoop:
var notify []chan bool
// --- Throttled and waiting ---
t := time.NewTimer(duration)
t := currentClock.NewTimer(duration)
throttleLoop:
for {
select {
case <-t.C:
case <-t.C():
for _, n := range notify {
n <- true
}
@ -124,10 +124,10 @@ mainLoop:
throttledUntil = msg.timestamp
if !t.Stop() {
<-t.C
<-t.C()
}
t.Reset(throttledUntil.Sub(time.Now()))
t.Reset(throttledUntil.Sub(currentClock.Now()))
}
}
}

Просмотреть файл

@ -9,6 +9,16 @@ import (
"time"
)
type transmitter interface {
Transmit(payload []byte, items TelemetryBufferItems) (*transmissionResult, error)
}
type httpTransmitter struct {
endpoint string
}
type nullTransmitter struct {}
type transmissionResult struct {
statusCode int
retryAfter *time.Time
@ -38,17 +48,15 @@ const (
serviceUnavailableResponse = 503
)
func transmit(payload []byte, items TelemetryBufferItems, endpoint string) (*transmissionResult, error) {
if endpoint == "" {
// Special case for tests: don't actually send telemetry to empty endpoint address
diagnosticsWriter.Write("Refusing to transmit telemetry to empty endpoint")
return &transmissionResult{statusCode: successResponse}, nil
}
func newTransmitter(endpointAddress string) transmitter {
return &httpTransmitter{endpointAddress}
}
func (transmitter *httpTransmitter) Transmit(payload []byte, items TelemetryBufferItems) (*transmissionResult, error) {
diagnosticsWriter.Printf("----------- Transmitting %d items ---------", len(items))
startTime := time.Now()
req, err := http.NewRequest("POST", endpoint, bytes.NewReader(payload))
req, err := http.NewRequest("POST", transmitter.endpoint, bytes.NewReader(payload))
if err != nil {
return nil, err
}
@ -107,6 +115,10 @@ func transmit(payload []byte, items TelemetryBufferItems, endpoint string) (*tra
return result, nil
}
func (transmitter *nullTransmitter) Transmit(payload []byte, items TelemetryBufferItems) (*transmissionResult, error) {
return &transmissionResult{statusCode: successResponse}, nil
}
func (result *transmissionResult) IsSuccess() bool {
return result.statusCode == successResponse ||
// Partial response but all items accepted

Просмотреть файл

@ -0,0 +1,62 @@
package appinsights
import (
"fmt"
"net/http"
"net/http/httptest"
)
type testServer struct {
server *httptest.Server
notify chan *testRequest
handler func(http.ResponseWriter, *http.Request)
responseData []byte
responseCode int
responseHeaders map[string]string
}
type testRequest struct {
request *http.Request
}
func (server *testServer) Close() {
server.server.Close()
close(server.notify)
}
func (server *testServer) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
if server.handler != nil {
server.handler(writer, req)
} else {
server.defaultHandler(writer, req)
}
server.notify <- &testRequest{req}
}
func (server *testServer) defaultHandler(writer http.ResponseWriter, req *http.Request) {
hdr := writer.Header()
for k, v := range server.responseHeaders {
hdr[k] = []string{v}
}
writer.WriteHeader(server.responseCode)
writer.Write(server.responseData)
}
func newTestClientServer() (TelemetryClient, *testServer) {
server := &testServer{}
server.server = httptest.NewServer(server)
server.notify = make(chan *testRequest)
server.responseCode = 200
server.responseData = make([]byte, 0)
server.responseHeaders = make(map[string]string)
config := NewTelemetryConfiguration("00000000-0000-0000-000000000000")
config.EndpointUrl = fmt.Sprintf("http://%s/v2/track", server.server.Listener.Addr().String())
client := NewTelemetryClientFromConfig(config)
return client, server
}

1
vendor/code.cloudfoundry.org/clock сгенерированный поставляемый Submodule

@ -0,0 +1 @@
Subproject commit 2269160ae1757f96bbb8c6475e6fa36c805e73e0