You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

437 regels
13 KiB

  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. //go:generate protoc -I grpc_testing --go_out=plugins=grpc:grpc_testing grpc_testing/control.proto grpc_testing/messages.proto grpc_testing/payloads.proto grpc_testing/services.proto grpc_testing/stats.proto
  19. /*
  20. Package benchmark implements the building blocks to setup end-to-end gRPC benchmarks.
  21. */
  22. package benchmark
  23. import (
  24. "context"
  25. "fmt"
  26. "io"
  27. "log"
  28. "net"
  29. "sync"
  30. "testing"
  31. "time"
  32. "google.golang.org/grpc"
  33. testpb "google.golang.org/grpc/benchmark/grpc_testing"
  34. "google.golang.org/grpc/benchmark/latency"
  35. "google.golang.org/grpc/benchmark/stats"
  36. "google.golang.org/grpc/codes"
  37. "google.golang.org/grpc/grpclog"
  38. "google.golang.org/grpc/status"
  39. )
  40. // AddOne add 1 to the features slice
  41. func AddOne(features []int, featuresMaxPosition []int) {
  42. for i := len(features) - 1; i >= 0; i-- {
  43. features[i] = (features[i] + 1)
  44. if features[i]/featuresMaxPosition[i] == 0 {
  45. break
  46. }
  47. features[i] = features[i] % featuresMaxPosition[i]
  48. }
  49. }
  50. // Allows reuse of the same testpb.Payload object.
  51. func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) {
  52. if size < 0 {
  53. grpclog.Fatalf("Requested a response with invalid length %d", size)
  54. }
  55. body := make([]byte, size)
  56. switch t {
  57. case testpb.PayloadType_COMPRESSABLE:
  58. case testpb.PayloadType_UNCOMPRESSABLE:
  59. grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported")
  60. default:
  61. grpclog.Fatalf("Unsupported payload type: %d", t)
  62. }
  63. p.Type = t
  64. p.Body = body
  65. }
  66. // NewPayload creates a payload with the given type and size.
  67. func NewPayload(t testpb.PayloadType, size int) *testpb.Payload {
  68. p := new(testpb.Payload)
  69. setPayload(p, t, size)
  70. return p
  71. }
  72. type testServer struct {
  73. }
  74. func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  75. return &testpb.SimpleResponse{
  76. Payload: NewPayload(in.ResponseType, int(in.ResponseSize)),
  77. }, nil
  78. }
  79. func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
  80. response := &testpb.SimpleResponse{
  81. Payload: new(testpb.Payload),
  82. }
  83. in := new(testpb.SimpleRequest)
  84. for {
  85. // use ServerStream directly to reuse the same testpb.SimpleRequest object
  86. err := stream.(grpc.ServerStream).RecvMsg(in)
  87. if err == io.EOF {
  88. // read done.
  89. return nil
  90. }
  91. if err != nil {
  92. return err
  93. }
  94. setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
  95. if err := stream.Send(response); err != nil {
  96. return err
  97. }
  98. }
  99. }
  100. func (s *testServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error {
  101. in := new(testpb.SimpleRequest)
  102. // Receive a message to learn response type and size.
  103. err := stream.RecvMsg(in)
  104. if err == io.EOF {
  105. // read done.
  106. return nil
  107. }
  108. if err != nil {
  109. return err
  110. }
  111. response := &testpb.SimpleResponse{
  112. Payload: new(testpb.Payload),
  113. }
  114. setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
  115. go func() {
  116. for {
  117. // Using RecvMsg rather than Recv to prevent reallocation of SimpleRequest.
  118. err := stream.RecvMsg(in)
  119. switch status.Code(err) {
  120. case codes.Canceled:
  121. case codes.OK:
  122. default:
  123. log.Fatalf("server recv error: %v", err)
  124. }
  125. }
  126. }()
  127. go func() {
  128. for {
  129. err := stream.Send(response)
  130. switch status.Code(err) {
  131. case codes.Unavailable:
  132. case codes.OK:
  133. default:
  134. log.Fatalf("server send error: %v", err)
  135. }
  136. }
  137. }()
  138. <-stream.Context().Done()
  139. return stream.Context().Err()
  140. }
  141. // byteBufServer is a gRPC server that sends and receives byte buffer.
  142. // The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead.
  143. type byteBufServer struct {
  144. respSize int32
  145. }
  146. // UnaryCall is an empty function and is not used for benchmark.
  147. // If bytebuf UnaryCall benchmark is needed later, the function body needs to be updated.
  148. func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  149. return &testpb.SimpleResponse{}, nil
  150. }
  151. func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
  152. for {
  153. var in []byte
  154. err := stream.(grpc.ServerStream).RecvMsg(&in)
  155. if err == io.EOF {
  156. return nil
  157. }
  158. if err != nil {
  159. return err
  160. }
  161. out := make([]byte, s.respSize)
  162. if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
  163. return err
  164. }
  165. }
  166. }
  167. func (s *byteBufServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error {
  168. for {
  169. var in []byte
  170. err := stream.(grpc.ServerStream).RecvMsg(&in)
  171. if err == io.EOF {
  172. return nil
  173. }
  174. if err != nil {
  175. return err
  176. }
  177. out := make([]byte, s.respSize)
  178. if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
  179. return err
  180. }
  181. }
  182. }
  183. // ServerInfo contains the information to create a gRPC benchmark server.
  184. type ServerInfo struct {
  185. // Type is the type of the server.
  186. // It should be "protobuf" or "bytebuf".
  187. Type string
  188. // Metadata is an optional configuration.
  189. // For "protobuf", it's ignored.
  190. // For "bytebuf", it should be an int representing response size.
  191. Metadata interface{}
  192. // Listener is the network listener for the server to use
  193. Listener net.Listener
  194. }
  195. // StartServer starts a gRPC server serving a benchmark service according to info.
  196. // It returns a function to stop the server.
  197. func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() {
  198. opts = append(opts, grpc.WriteBufferSize(128*1024))
  199. opts = append(opts, grpc.ReadBufferSize(128*1024))
  200. s := grpc.NewServer(opts...)
  201. switch info.Type {
  202. case "protobuf":
  203. testpb.RegisterBenchmarkServiceServer(s, &testServer{})
  204. case "bytebuf":
  205. respSize, ok := info.Metadata.(int32)
  206. if !ok {
  207. grpclog.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type)
  208. }
  209. testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
  210. default:
  211. grpclog.Fatalf("failed to StartServer, unknown Type: %v", info.Type)
  212. }
  213. go s.Serve(info.Listener)
  214. return func() {
  215. s.Stop()
  216. }
  217. }
  218. // DoUnaryCall performs an unary RPC with given stub and request and response sizes.
  219. func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) error {
  220. pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
  221. req := &testpb.SimpleRequest{
  222. ResponseType: pl.Type,
  223. ResponseSize: int32(respSize),
  224. Payload: pl,
  225. }
  226. if _, err := tc.UnaryCall(context.Background(), req); err != nil {
  227. return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
  228. }
  229. return nil
  230. }
  231. // DoStreamingRoundTrip performs a round trip for a single streaming rpc.
  232. func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
  233. pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
  234. req := &testpb.SimpleRequest{
  235. ResponseType: pl.Type,
  236. ResponseSize: int32(respSize),
  237. Payload: pl,
  238. }
  239. if err := stream.Send(req); err != nil {
  240. return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err)
  241. }
  242. if _, err := stream.Recv(); err != nil {
  243. // EOF is a valid error here.
  244. if err == io.EOF {
  245. return nil
  246. }
  247. return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err)
  248. }
  249. return nil
  250. }
  251. // DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer.
  252. func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
  253. out := make([]byte, reqSize)
  254. if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
  255. return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err)
  256. }
  257. var in []byte
  258. if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil {
  259. // EOF is a valid error here.
  260. if err == io.EOF {
  261. return nil
  262. }
  263. return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err)
  264. }
  265. return nil
  266. }
  267. // NewClientConn creates a gRPC client connection to addr.
  268. func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
  269. return NewClientConnWithContext(context.Background(), addr, opts...)
  270. }
  271. // NewClientConnWithContext creates a gRPC client connection to addr using ctx.
  272. func NewClientConnWithContext(ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn {
  273. opts = append(opts, grpc.WithWriteBufferSize(128*1024))
  274. opts = append(opts, grpc.WithReadBufferSize(128*1024))
  275. conn, err := grpc.DialContext(ctx, addr, opts...)
  276. if err != nil {
  277. grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err)
  278. }
  279. return conn
  280. }
  281. func runUnary(b *testing.B, benchFeatures stats.Features) {
  282. s := stats.AddStats(b, 38)
  283. nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
  284. lis, err := net.Listen("tcp", "localhost:0")
  285. if err != nil {
  286. grpclog.Fatalf("Failed to listen: %v", err)
  287. }
  288. target := lis.Addr().String()
  289. lis = nw.Listener(lis)
  290. stopper := StartServer(ServerInfo{Type: "protobuf", Listener: lis}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
  291. defer stopper()
  292. conn := NewClientConn(
  293. target, grpc.WithInsecure(),
  294. grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
  295. return nw.ContextDialer((&net.Dialer{}).DialContext)(ctx, "tcp", address)
  296. }),
  297. )
  298. tc := testpb.NewBenchmarkServiceClient(conn)
  299. // Warm up connection.
  300. for i := 0; i < 10; i++ {
  301. unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
  302. }
  303. ch := make(chan int, benchFeatures.MaxConcurrentCalls*4)
  304. var (
  305. mu sync.Mutex
  306. wg sync.WaitGroup
  307. )
  308. wg.Add(benchFeatures.MaxConcurrentCalls)
  309. // Distribute the b.N calls over maxConcurrentCalls workers.
  310. for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
  311. go func() {
  312. for range ch {
  313. start := time.Now()
  314. unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
  315. elapse := time.Since(start)
  316. mu.Lock()
  317. s.Add(elapse)
  318. mu.Unlock()
  319. }
  320. wg.Done()
  321. }()
  322. }
  323. b.ResetTimer()
  324. for i := 0; i < b.N; i++ {
  325. ch <- i
  326. }
  327. close(ch)
  328. wg.Wait()
  329. b.StopTimer()
  330. conn.Close()
  331. }
  332. func runStream(b *testing.B, benchFeatures stats.Features) {
  333. s := stats.AddStats(b, 38)
  334. nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
  335. lis, err := net.Listen("tcp", "localhost:0")
  336. if err != nil {
  337. grpclog.Fatalf("Failed to listen: %v", err)
  338. }
  339. target := lis.Addr().String()
  340. lis = nw.Listener(lis)
  341. stopper := StartServer(ServerInfo{Type: "protobuf", Listener: lis}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
  342. defer stopper()
  343. conn := NewClientConn(
  344. target, grpc.WithInsecure(),
  345. grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
  346. return nw.ContextDialer((&net.Dialer{}).DialContext)(ctx, "tcp", address)
  347. }),
  348. )
  349. tc := testpb.NewBenchmarkServiceClient(conn)
  350. // Warm up connection.
  351. stream, err := tc.StreamingCall(context.Background())
  352. if err != nil {
  353. b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
  354. }
  355. for i := 0; i < 10; i++ {
  356. streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
  357. }
  358. ch := make(chan struct{}, benchFeatures.MaxConcurrentCalls*4)
  359. var (
  360. mu sync.Mutex
  361. wg sync.WaitGroup
  362. )
  363. wg.Add(benchFeatures.MaxConcurrentCalls)
  364. // Distribute the b.N calls over maxConcurrentCalls workers.
  365. for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
  366. stream, err := tc.StreamingCall(context.Background())
  367. if err != nil {
  368. b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
  369. }
  370. go func() {
  371. for range ch {
  372. start := time.Now()
  373. streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
  374. elapse := time.Since(start)
  375. mu.Lock()
  376. s.Add(elapse)
  377. mu.Unlock()
  378. }
  379. wg.Done()
  380. }()
  381. }
  382. b.ResetTimer()
  383. for i := 0; i < b.N; i++ {
  384. ch <- struct{}{}
  385. }
  386. close(ch)
  387. wg.Wait()
  388. b.StopTimer()
  389. conn.Close()
  390. }
  391. func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
  392. if err := DoUnaryCall(client, reqSize, respSize); err != nil {
  393. grpclog.Fatalf("DoUnaryCall failed: %v", err)
  394. }
  395. }
  396. func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
  397. if err := DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
  398. grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err)
  399. }
  400. }