/* * * Copyright 2014 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ //go:generate protoc -I grpc_testing --go_out=plugins=grpc:grpc_testing grpc_testing/control.proto grpc_testing/messages.proto grpc_testing/payloads.proto grpc_testing/services.proto grpc_testing/stats.proto /* Package benchmark implements the building blocks to setup end-to-end gRPC benchmarks. */ package benchmark import ( "context" "fmt" "io" "log" "net" "sync" "testing" "time" "google.golang.org/grpc" testpb "google.golang.org/grpc/benchmark/grpc_testing" "google.golang.org/grpc/benchmark/latency" "google.golang.org/grpc/benchmark/stats" "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/status" ) // AddOne add 1 to the features slice func AddOne(features []int, featuresMaxPosition []int) { for i := len(features) - 1; i >= 0; i-- { features[i] = (features[i] + 1) if features[i]/featuresMaxPosition[i] == 0 { break } features[i] = features[i] % featuresMaxPosition[i] } } // Allows reuse of the same testpb.Payload object. func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) { if size < 0 { grpclog.Fatalf("Requested a response with invalid length %d", size) } body := make([]byte, size) switch t { case testpb.PayloadType_COMPRESSABLE: case testpb.PayloadType_UNCOMPRESSABLE: grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported") default: grpclog.Fatalf("Unsupported payload type: %d", t) } p.Type = t p.Body = body } // NewPayload creates a payload with the given type and size. func NewPayload(t testpb.PayloadType, size int) *testpb.Payload { p := new(testpb.Payload) setPayload(p, t, size) return p } type testServer struct { } func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{ Payload: NewPayload(in.ResponseType, int(in.ResponseSize)), }, nil } func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error { response := &testpb.SimpleResponse{ Payload: new(testpb.Payload), } in := new(testpb.SimpleRequest) for { // use ServerStream directly to reuse the same testpb.SimpleRequest object err := stream.(grpc.ServerStream).RecvMsg(in) if err == io.EOF { // read done. return nil } if err != nil { return err } setPayload(response.Payload, in.ResponseType, int(in.ResponseSize)) if err := stream.Send(response); err != nil { return err } } } func (s *testServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error { in := new(testpb.SimpleRequest) // Receive a message to learn response type and size. err := stream.RecvMsg(in) if err == io.EOF { // read done. return nil } if err != nil { return err } response := &testpb.SimpleResponse{ Payload: new(testpb.Payload), } setPayload(response.Payload, in.ResponseType, int(in.ResponseSize)) go func() { for { // Using RecvMsg rather than Recv to prevent reallocation of SimpleRequest. err := stream.RecvMsg(in) switch status.Code(err) { case codes.Canceled: case codes.OK: default: log.Fatalf("server recv error: %v", err) } } }() go func() { for { err := stream.Send(response) switch status.Code(err) { case codes.Unavailable: case codes.OK: default: log.Fatalf("server send error: %v", err) } } }() <-stream.Context().Done() return stream.Context().Err() } // byteBufServer is a gRPC server that sends and receives byte buffer. // The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead. type byteBufServer struct { respSize int32 } // UnaryCall is an empty function and is not used for benchmark. // If bytebuf UnaryCall benchmark is needed later, the function body needs to be updated. func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil } func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error { for { var in []byte err := stream.(grpc.ServerStream).RecvMsg(&in) if err == io.EOF { return nil } if err != nil { return err } out := make([]byte, s.respSize) if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil { return err } } } func (s *byteBufServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error { for { var in []byte err := stream.(grpc.ServerStream).RecvMsg(&in) if err == io.EOF { return nil } if err != nil { return err } out := make([]byte, s.respSize) if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil { return err } } } // ServerInfo contains the information to create a gRPC benchmark server. type ServerInfo struct { // Type is the type of the server. // It should be "protobuf" or "bytebuf". Type string // Metadata is an optional configuration. // For "protobuf", it's ignored. // For "bytebuf", it should be an int representing response size. Metadata interface{} // Listener is the network listener for the server to use Listener net.Listener } // StartServer starts a gRPC server serving a benchmark service according to info. // It returns a function to stop the server. func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() { opts = append(opts, grpc.WriteBufferSize(128*1024)) opts = append(opts, grpc.ReadBufferSize(128*1024)) s := grpc.NewServer(opts...) switch info.Type { case "protobuf": testpb.RegisterBenchmarkServiceServer(s, &testServer{}) case "bytebuf": respSize, ok := info.Metadata.(int32) if !ok { grpclog.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type) } testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize}) default: grpclog.Fatalf("failed to StartServer, unknown Type: %v", info.Type) } go s.Serve(info.Listener) return func() { s.Stop() } } // DoUnaryCall performs an unary RPC with given stub and request and response sizes. func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) error { pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize) req := &testpb.SimpleRequest{ ResponseType: pl.Type, ResponseSize: int32(respSize), Payload: pl, } if _, err := tc.UnaryCall(context.Background(), req); err != nil { return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, ", err) } return nil } // DoStreamingRoundTrip performs a round trip for a single streaming rpc. func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error { pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize) req := &testpb.SimpleRequest{ ResponseType: pl.Type, ResponseSize: int32(respSize), Payload: pl, } if err := stream.Send(req); err != nil { return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want ", err) } if _, err := stream.Recv(); err != nil { // EOF is a valid error here. if err == io.EOF { return nil } return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want ", err) } return nil } // DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer. func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error { out := make([]byte, reqSize) if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil { return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want ", err) } var in []byte if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil { // EOF is a valid error here. if err == io.EOF { return nil } return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want ", err) } return nil } // NewClientConn creates a gRPC client connection to addr. func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn { return NewClientConnWithContext(context.Background(), addr, opts...) } // NewClientConnWithContext creates a gRPC client connection to addr using ctx. func NewClientConnWithContext(ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn { opts = append(opts, grpc.WithWriteBufferSize(128*1024)) opts = append(opts, grpc.WithReadBufferSize(128*1024)) conn, err := grpc.DialContext(ctx, addr, opts...) if err != nil { grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err) } return conn } func runUnary(b *testing.B, benchFeatures stats.Features) { s := stats.AddStats(b, 38) nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu} lis, err := net.Listen("tcp", "localhost:0") if err != nil { grpclog.Fatalf("Failed to listen: %v", err) } target := lis.Addr().String() lis = nw.Listener(lis) stopper := StartServer(ServerInfo{Type: "protobuf", Listener: lis}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) defer stopper() conn := NewClientConn( target, grpc.WithInsecure(), grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) { return nw.ContextDialer((&net.Dialer{}).DialContext)(ctx, "tcp", address) }), ) tc := testpb.NewBenchmarkServiceClient(conn) // Warm up connection. for i := 0; i < 10; i++ { unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) } ch := make(chan int, benchFeatures.MaxConcurrentCalls*4) var ( mu sync.Mutex wg sync.WaitGroup ) wg.Add(benchFeatures.MaxConcurrentCalls) // Distribute the b.N calls over maxConcurrentCalls workers. for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { go func() { for range ch { start := time.Now() unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) elapse := time.Since(start) mu.Lock() s.Add(elapse) mu.Unlock() } wg.Done() }() } b.ResetTimer() for i := 0; i < b.N; i++ { ch <- i } close(ch) wg.Wait() b.StopTimer() conn.Close() } func runStream(b *testing.B, benchFeatures stats.Features) { s := stats.AddStats(b, 38) nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu} lis, err := net.Listen("tcp", "localhost:0") if err != nil { grpclog.Fatalf("Failed to listen: %v", err) } target := lis.Addr().String() lis = nw.Listener(lis) stopper := StartServer(ServerInfo{Type: "protobuf", Listener: lis}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) defer stopper() conn := NewClientConn( target, grpc.WithInsecure(), grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) { return nw.ContextDialer((&net.Dialer{}).DialContext)(ctx, "tcp", address) }), ) tc := testpb.NewBenchmarkServiceClient(conn) // Warm up connection. stream, err := tc.StreamingCall(context.Background()) if err != nil { b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) } for i := 0; i < 10; i++ { streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) } ch := make(chan struct{}, benchFeatures.MaxConcurrentCalls*4) var ( mu sync.Mutex wg sync.WaitGroup ) wg.Add(benchFeatures.MaxConcurrentCalls) // Distribute the b.N calls over maxConcurrentCalls workers. for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { stream, err := tc.StreamingCall(context.Background()) if err != nil { b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) } go func() { for range ch { start := time.Now() streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) elapse := time.Since(start) mu.Lock() s.Add(elapse) mu.Unlock() } wg.Done() }() } b.ResetTimer() for i := 0; i < b.N; i++ { ch <- struct{}{} } close(ch) wg.Wait() b.StopTimer() conn.Close() } func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) { if err := DoUnaryCall(client, reqSize, respSize); err != nil { grpclog.Fatalf("DoUnaryCall failed: %v", err) } } func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) { if err := DoStreamingRoundTrip(stream, reqSize, respSize); err != nil { grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err) } }