Merge pull request #8273 from vitessio/vtproto-mempool

proto: enable pooling for vreplication
This commit is contained in:
Vicent Martí 2021-06-08 18:31:51 +02:00 коммит произвёл GitHub
Родитель 18dbd43b77 5b5ccd8396
Коммит 4449aa9ccd
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 91 добавлений и 14 удалений

Просмотреть файл

@ -232,7 +232,9 @@ $(PROTO_GO_OUTS): minimaltools install_protoc-gen-go proto/*.proto
--go_out=. --plugin protoc-gen-go="${GOBIN}/protoc-gen-go" \
--go-grpc_out=. --plugin protoc-gen-go-grpc="${GOBIN}/protoc-gen-go-grpc" \
--go-vtproto_out=. --plugin protoc-gen-go-vtproto="${GOBIN}/protoc-gen-go-vtproto" \
--go-vtproto_opt=features=marshal+unmarshal+size \
--go-vtproto_opt=features=marshal+unmarshal+size+pool \
--go-vtproto_opt=pool=vitess.io/vitess/go/vt/proto/query.Row \
--go-vtproto_opt=pool=vitess.io/vitess/go/vt/proto/binlogdata.VStreamRowsResponse \
-I${PWD}/dist/vt-protoc-3.6.1/include:proto proto/$${name}.proto; \
done
cp -Rf vitess.io/vitess/go/vt/proto/* go/vt/proto

Просмотреть файл

@ -9,6 +9,7 @@ import (
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
io "io"
bits "math/bits"
sync "sync"
query "vitess.io/vitess/go/vt/proto/query"
topodata "vitess.io/vitess/go/vt/proto/topodata"
vtrpc "vitess.io/vitess/go/vt/proto/vtrpc"
@ -1884,6 +1885,35 @@ func encodeVarint(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v)
return base
}
var vtprotoPool_VStreamRowsResponse = sync.Pool{
New: func() interface{} {
return &VStreamRowsResponse{}
},
}
func (m *VStreamRowsResponse) ResetVT() {
f0 := m.Fields[:0]
f1 := m.Pkfields[:0]
for _, mm := range m.Rows {
mm.ResetVT()
}
f2 := m.Rows[:0]
m.Lastpk.ReturnToVTPool()
m.Reset()
m.Fields = f0
m.Pkfields = f1
m.Rows = f2
}
func (m *VStreamRowsResponse) ReturnToVTPool() {
if m != nil {
m.ResetVT()
vtprotoPool_VStreamRowsResponse.Put(m)
}
}
func VStreamRowsResponseFromVTPool() *VStreamRowsResponse {
return vtprotoPool_VStreamRowsResponse.Get().(*VStreamRowsResponse)
}
func (m *Charset) SizeVT() (n int) {
if m == nil {
return 0
@ -6322,7 +6352,14 @@ func (m *VStreamRowsResponse) UnmarshalVT(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Fields = append(m.Fields, &query.Field{})
if len(m.Fields) == cap(m.Fields) {
m.Fields = append(m.Fields, &query.Field{})
} else {
m.Fields = m.Fields[:len(m.Fields)+1]
if m.Fields[len(m.Fields)-1] == nil {
m.Fields[len(m.Fields)-1] = &query.Field{}
}
}
if err := m.Fields[len(m.Fields)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
return err
}
@ -6356,7 +6393,14 @@ func (m *VStreamRowsResponse) UnmarshalVT(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Pkfields = append(m.Pkfields, &query.Field{})
if len(m.Pkfields) == cap(m.Pkfields) {
m.Pkfields = append(m.Pkfields, &query.Field{})
} else {
m.Pkfields = m.Pkfields[:len(m.Pkfields)+1]
if m.Pkfields[len(m.Pkfields)-1] == nil {
m.Pkfields[len(m.Pkfields)-1] = &query.Field{}
}
}
if err := m.Pkfields[len(m.Pkfields)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
return err
}
@ -6422,7 +6466,14 @@ func (m *VStreamRowsResponse) UnmarshalVT(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Rows = append(m.Rows, &query.Row{})
if len(m.Rows) == cap(m.Rows) {
m.Rows = append(m.Rows, &query.Row{})
} else {
m.Rows = m.Rows[:len(m.Rows)+1]
if m.Rows[len(m.Rows)-1] == nil {
m.Rows[len(m.Rows)-1] = &query.Row{}
}
}
if err := m.Rows[len(m.Rows)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
return err
}
@ -6457,7 +6508,7 @@ func (m *VStreamRowsResponse) UnmarshalVT(dAtA []byte) error {
return io.ErrUnexpectedEOF
}
if m.Lastpk == nil {
m.Lastpk = &query.Row{}
m.Lastpk = query.RowFromVTPool()
}
if err := m.Lastpk.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
return err

Просмотреть файл

@ -11,6 +11,7 @@ import (
io "io"
math "math"
bits "math/bits"
sync "sync"
topodata "vitess.io/vitess/go/vt/proto/topodata"
vtrpc "vitess.io/vitess/go/vt/proto/vtrpc"
)
@ -4045,6 +4046,29 @@ func encodeVarint(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v)
return base
}
var vtprotoPool_Row = sync.Pool{
New: func() interface{} {
return &Row{}
},
}
func (m *Row) ResetVT() {
f0 := m.Lengths[:0]
f1 := m.Values[:0]
m.Reset()
m.Lengths = f0
m.Values = f1
}
func (m *Row) ReturnToVTPool() {
if m != nil {
m.ResetVT()
vtprotoPool_Row.Put(m)
}
}
func RowFromVTPool() *Row {
return vtprotoPool_Row.Get().(*Row)
}
func (m *Target) SizeVT() (n int) {
if m == nil {
return 0
@ -7100,7 +7124,7 @@ func (m *Row) UnmarshalVT(dAtA []byte) error {
}
}
elementCount = count
if elementCount != 0 && len(m.Lengths) == 0 {
if elementCount != 0 && len(m.Lengths) == 0 && cap(m.Lengths) < elementCount {
m.Lengths = make([]int64, 0, elementCount)
}
for iNdEx < postIndex {

Просмотреть файл

@ -17,10 +17,10 @@ limitations under the License.
package grpcqueryservice
import (
"google.golang.org/grpc"
"context"
"google.golang.org/grpc"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/callinfo"

Просмотреть файл

@ -675,18 +675,18 @@ func (conn *gRPCQueryClient) VStreamRows(ctx context.Context, target *querypb.Ta
return err
}
for {
r, err := stream.Recv()
r := binlogdatapb.VStreamRowsResponseFromVTPool()
err := stream.RecvMsg(r)
if err != nil {
return tabletconn.ErrorFromGRPC(err)
}
select {
case <-ctx.Done():
if ctx.Err() != nil {
return ctx.Err()
default:
}
if err := send(r); err != nil {
return err
}
r.ReturnToVTPool()
}
}

Просмотреть файл

@ -244,13 +244,13 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
}
fieldEvent := &binlogdatapb.FieldEvent{
TableName: initialPlan.SendRule.Match,
Fields: rows.Fields,
}
fieldEvent.Fields = append(fieldEvent.Fields, rows.Fields...)
vc.tablePlan, err = plan.buildExecutionPlan(fieldEvent)
if err != nil {
return err
}
pkfields = rows.Pkfields
pkfields = append(pkfields, rows.Pkfields...)
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("update _vt.copy_state set lastpk=%a where vrepl_id=%s and table_name=%s", ":lastpk", strconv.Itoa(int(vc.vr.id)), encodeString(tableName))
updateCopyState = buf.ParsedQuery()