ack settings frame before applying the settings.
This commit is contained in:
Родитель
91c8b79535
Коммит
57c62fd0e0
|
@ -61,8 +61,8 @@ func (windowUpdate) isItem() bool {
|
|||
}
|
||||
|
||||
type settings struct {
|
||||
ack bool
|
||||
setting []http2.Setting
|
||||
ack bool
|
||||
settings []http2.Setting
|
||||
}
|
||||
|
||||
func (settings) isItem() bool {
|
||||
|
|
|
@ -567,40 +567,13 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
|
|||
if f.IsAck() {
|
||||
return
|
||||
}
|
||||
var ss []http2.Setting
|
||||
f.ForeachSetting(func(s http2.Setting) error {
|
||||
if v, ok := f.Value(s.ID); ok {
|
||||
switch s.ID {
|
||||
case http2.SettingMaxConcurrentStreams:
|
||||
// TODO(zhaoq): This is a hack to avoid significant refactoring of the
|
||||
// code to deal with the unrealistic int32 overflow. Probably will try
|
||||
// to find a better way to handle this later.
|
||||
if v > math.MaxInt32 {
|
||||
v = math.MaxInt32
|
||||
}
|
||||
t.mu.Lock()
|
||||
reset := t.streamsQuota != nil
|
||||
if !reset {
|
||||
t.streamsQuota = newQuotaPool(int(v))
|
||||
}
|
||||
ms := t.maxStreams
|
||||
t.maxStreams = int(v)
|
||||
t.mu.Unlock()
|
||||
if reset {
|
||||
t.streamsQuota.reset(int(v) - ms)
|
||||
}
|
||||
case http2.SettingInitialWindowSize:
|
||||
t.mu.Lock()
|
||||
for _, s := range t.activeStreams {
|
||||
// Adjust the sending quota for each s.
|
||||
s.sendQuotaPool.reset(int(v - t.streamSendQuota))
|
||||
}
|
||||
t.streamSendQuota = v
|
||||
t.mu.Unlock()
|
||||
}
|
||||
}
|
||||
ss = append(ss, s)
|
||||
return nil
|
||||
})
|
||||
t.controlBuf.put(&settings{ack: true})
|
||||
// The settings will be applied once the ack is sent.
|
||||
t.controlBuf.put(&settings{ack: true, settings: ss})
|
||||
}
|
||||
|
||||
func (t *http2Client) handlePing(f *http2.PingFrame) {
|
||||
|
@ -608,7 +581,7 @@ func (t *http2Client) handlePing(f *http2.PingFrame) {
|
|||
}
|
||||
|
||||
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||
// TODO(zhaoq): GoAwayFrame handler to be implemented"
|
||||
// TODO(zhaoq): GoAwayFrame handler to be implemented
|
||||
}
|
||||
|
||||
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
|
||||
|
@ -728,6 +701,39 @@ func (t *http2Client) reader() {
|
|||
}
|
||||
}
|
||||
|
||||
func (t *http2Client) applySettings(ss []http2.Setting) {
|
||||
for _, s := range ss {
|
||||
switch s.ID {
|
||||
case http2.SettingMaxConcurrentStreams:
|
||||
// TODO(zhaoq): This is a hack to avoid significant refactoring of the
|
||||
// code to deal with the unrealistic int32 overflow. Probably will try
|
||||
// to find a better way to handle this later.
|
||||
if s.Val > math.MaxInt32 {
|
||||
s.Val = math.MaxInt32
|
||||
}
|
||||
t.mu.Lock()
|
||||
reset := t.streamsQuota != nil
|
||||
if !reset {
|
||||
t.streamsQuota = newQuotaPool(int(s.Val))
|
||||
}
|
||||
ms := t.maxStreams
|
||||
t.maxStreams = int(s.Val)
|
||||
t.mu.Unlock()
|
||||
if reset {
|
||||
t.streamsQuota.reset(int(s.Val) - ms)
|
||||
}
|
||||
case http2.SettingInitialWindowSize:
|
||||
t.mu.Lock()
|
||||
for _, stream := range t.activeStreams {
|
||||
// Adjust the sending quota for each stream.
|
||||
stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
|
||||
}
|
||||
t.streamSendQuota = s.Val
|
||||
t.mu.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// controller running in a separate goroutine takes charge of sending control
|
||||
// frames (e.g., window update, reset stream, setting, etc.) to the server.
|
||||
func (t *http2Client) controller() {
|
||||
|
@ -743,8 +749,9 @@ func (t *http2Client) controller() {
|
|||
case *settings:
|
||||
if i.ack {
|
||||
t.framer.writeSettingsAck(true)
|
||||
t.applySettings(i.settings)
|
||||
} else {
|
||||
t.framer.writeSettings(true, i.setting...)
|
||||
t.framer.writeSettings(true, i.settings...)
|
||||
}
|
||||
case *resetStream:
|
||||
t.framer.writeRSTStream(true, i.streamID, i.code)
|
||||
|
|
|
@ -367,18 +367,13 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
|
|||
if f.IsAck() {
|
||||
return
|
||||
}
|
||||
var ss []http2.Setting
|
||||
f.ForeachSetting(func(s http2.Setting) error {
|
||||
if v, ok := f.Value(http2.SettingInitialWindowSize); ok {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
for _, s := range t.activeStreams {
|
||||
s.sendQuotaPool.reset(int(v - t.streamSendQuota))
|
||||
}
|
||||
t.streamSendQuota = v
|
||||
}
|
||||
ss = append(ss, s)
|
||||
return nil
|
||||
})
|
||||
t.controlBuf.put(&settings{ack: true})
|
||||
// The settings will be applied once the ack is sent.
|
||||
t.controlBuf.put(&settings{ack: true, settings: ss})
|
||||
}
|
||||
|
||||
func (t *http2Server) handlePing(f *http2.PingFrame) {
|
||||
|
@ -584,6 +579,20 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
|
|||
|
||||
}
|
||||
|
||||
func (t *http2Server) applySettings(ss []http2.Setting) {
|
||||
for _, s := range ss {
|
||||
if s.ID == http2.SettingInitialWindowSize {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
for _, stream := range t.activeStreams {
|
||||
stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
|
||||
}
|
||||
t.streamSendQuota = s.Val
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// controller running in a separate goroutine takes charge of sending control
|
||||
// frames (e.g., window update, reset stream, setting, etc.) to the server.
|
||||
func (t *http2Server) controller() {
|
||||
|
@ -599,8 +608,9 @@ func (t *http2Server) controller() {
|
|||
case *settings:
|
||||
if i.ack {
|
||||
t.framer.writeSettingsAck(true)
|
||||
t.applySettings(i.settings)
|
||||
} else {
|
||||
t.framer.writeSettings(true, i.setting...)
|
||||
t.framer.writeSettings(true, i.settings...)
|
||||
}
|
||||
case *resetStream:
|
||||
t.framer.writeRSTStream(true, i.streamID, i.code)
|
||||
|
|
Загрузка…
Ссылка в новой задаче