You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

552 lines
16 KiB

  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package test
  19. import (
  20. "context"
  21. "fmt"
  22. "io"
  23. "os"
  24. "reflect"
  25. "strconv"
  26. "strings"
  27. "testing"
  28. "time"
  29. "github.com/golang/protobuf/proto"
  30. "google.golang.org/grpc"
  31. "google.golang.org/grpc/codes"
  32. "google.golang.org/grpc/internal/envconfig"
  33. "google.golang.org/grpc/metadata"
  34. "google.golang.org/grpc/status"
  35. testpb "google.golang.org/grpc/test/grpc_testing"
  36. )
  37. func enableRetry() func() {
  38. old := envconfig.Retry
  39. envconfig.Retry = true
  40. return func() { envconfig.Retry = old }
  41. }
  42. func (s) TestRetryUnary(t *testing.T) {
  43. defer enableRetry()()
  44. i := -1
  45. ss := &stubServer{
  46. emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
  47. i++
  48. switch i {
  49. case 0, 2, 5:
  50. return &testpb.Empty{}, nil
  51. case 6, 8, 11:
  52. return nil, status.New(codes.Internal, "non-retryable error").Err()
  53. }
  54. return nil, status.New(codes.AlreadyExists, "retryable error").Err()
  55. },
  56. }
  57. if err := ss.Start([]grpc.ServerOption{}); err != nil {
  58. t.Fatalf("Error starting endpoint server: %v", err)
  59. }
  60. defer ss.Stop()
  61. ss.r.NewServiceConfig(`{
  62. "methodConfig": [{
  63. "name": [{"service": "grpc.testing.TestService"}],
  64. "waitForReady": true,
  65. "retryPolicy": {
  66. "MaxAttempts": 4,
  67. "InitialBackoff": ".01s",
  68. "MaxBackoff": ".01s",
  69. "BackoffMultiplier": 1.0,
  70. "RetryableStatusCodes": [ "ALREADY_EXISTS" ]
  71. }
  72. }]}`)
  73. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  74. for {
  75. if ctx.Err() != nil {
  76. t.Fatalf("Timed out waiting for service config update")
  77. }
  78. if ss.cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil {
  79. break
  80. }
  81. time.Sleep(time.Millisecond)
  82. }
  83. cancel()
  84. testCases := []struct {
  85. code codes.Code
  86. count int
  87. }{
  88. {codes.OK, 0},
  89. {codes.OK, 2},
  90. {codes.OK, 5},
  91. {codes.Internal, 6},
  92. {codes.Internal, 8},
  93. {codes.Internal, 11},
  94. {codes.AlreadyExists, 15},
  95. }
  96. for _, tc := range testCases {
  97. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  98. _, err := ss.client.EmptyCall(ctx, &testpb.Empty{})
  99. cancel()
  100. if status.Code(err) != tc.code {
  101. t.Fatalf("EmptyCall(_, _) = _, %v; want _, <Code() = %v>", err, tc.code)
  102. }
  103. if i != tc.count {
  104. t.Fatalf("i = %v; want %v", i, tc.count)
  105. }
  106. }
  107. }
  108. func (s) TestRetryDisabledByDefault(t *testing.T) {
  109. if strings.EqualFold(os.Getenv("GRPC_GO_RETRY"), "on") {
  110. return
  111. }
  112. i := -1
  113. ss := &stubServer{
  114. emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
  115. i++
  116. switch i {
  117. case 0:
  118. return nil, status.New(codes.AlreadyExists, "retryable error").Err()
  119. }
  120. return &testpb.Empty{}, nil
  121. },
  122. }
  123. if err := ss.Start([]grpc.ServerOption{}); err != nil {
  124. t.Fatalf("Error starting endpoint server: %v", err)
  125. }
  126. defer ss.Stop()
  127. ss.r.NewServiceConfig(`{
  128. "methodConfig": [{
  129. "name": [{"service": "grpc.testing.TestService"}],
  130. "waitForReady": true,
  131. "retryPolicy": {
  132. "MaxAttempts": 4,
  133. "InitialBackoff": ".01s",
  134. "MaxBackoff": ".01s",
  135. "BackoffMultiplier": 1.0,
  136. "RetryableStatusCodes": [ "ALREADY_EXISTS" ]
  137. }
  138. }]}`)
  139. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  140. for {
  141. if ctx.Err() != nil {
  142. t.Fatalf("Timed out waiting for service config update")
  143. }
  144. if ss.cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil {
  145. break
  146. }
  147. time.Sleep(time.Millisecond)
  148. }
  149. cancel()
  150. testCases := []struct {
  151. code codes.Code
  152. count int
  153. }{
  154. {codes.AlreadyExists, 0},
  155. }
  156. for _, tc := range testCases {
  157. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  158. _, err := ss.client.EmptyCall(ctx, &testpb.Empty{})
  159. cancel()
  160. if status.Code(err) != tc.code {
  161. t.Fatalf("EmptyCall(_, _) = _, %v; want _, <Code() = %v>", err, tc.code)
  162. }
  163. if i != tc.count {
  164. t.Fatalf("i = %v; want %v", i, tc.count)
  165. }
  166. }
  167. }
  168. func (s) TestRetryThrottling(t *testing.T) {
  169. defer enableRetry()()
  170. i := -1
  171. ss := &stubServer{
  172. emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
  173. i++
  174. switch i {
  175. case 0, 3, 6, 10, 11, 12, 13, 14, 16, 18:
  176. return &testpb.Empty{}, nil
  177. }
  178. return nil, status.New(codes.Unavailable, "retryable error").Err()
  179. },
  180. }
  181. if err := ss.Start([]grpc.ServerOption{}); err != nil {
  182. t.Fatalf("Error starting endpoint server: %v", err)
  183. }
  184. defer ss.Stop()
  185. ss.r.NewServiceConfig(`{
  186. "methodConfig": [{
  187. "name": [{"service": "grpc.testing.TestService"}],
  188. "waitForReady": true,
  189. "retryPolicy": {
  190. "MaxAttempts": 4,
  191. "InitialBackoff": ".01s",
  192. "MaxBackoff": ".01s",
  193. "BackoffMultiplier": 1.0,
  194. "RetryableStatusCodes": [ "UNAVAILABLE" ]
  195. }
  196. }],
  197. "retryThrottling": {
  198. "maxTokens": 10,
  199. "tokenRatio": 0.5
  200. }
  201. }`)
  202. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  203. for {
  204. if ctx.Err() != nil {
  205. t.Fatalf("Timed out waiting for service config update")
  206. }
  207. if ss.cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil {
  208. break
  209. }
  210. time.Sleep(time.Millisecond)
  211. }
  212. cancel()
  213. testCases := []struct {
  214. code codes.Code
  215. count int
  216. }{
  217. {codes.OK, 0}, // tokens = 10
  218. {codes.OK, 3}, // tokens = 8.5 (10 - 2 failures + 0.5 success)
  219. {codes.OK, 6}, // tokens = 6
  220. {codes.Unavailable, 8}, // tokens = 5 -- first attempt is retried; second aborted.
  221. {codes.Unavailable, 9}, // tokens = 4
  222. {codes.OK, 10}, // tokens = 4.5
  223. {codes.OK, 11}, // tokens = 5
  224. {codes.OK, 12}, // tokens = 5.5
  225. {codes.OK, 13}, // tokens = 6
  226. {codes.OK, 14}, // tokens = 6.5
  227. {codes.OK, 16}, // tokens = 5.5
  228. {codes.Unavailable, 17}, // tokens = 4.5
  229. }
  230. for _, tc := range testCases {
  231. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  232. _, err := ss.client.EmptyCall(ctx, &testpb.Empty{})
  233. cancel()
  234. if status.Code(err) != tc.code {
  235. t.Errorf("EmptyCall(_, _) = _, %v; want _, <Code() = %v>", err, tc.code)
  236. }
  237. if i != tc.count {
  238. t.Errorf("i = %v; want %v", i, tc.count)
  239. }
  240. }
  241. }
  242. func (s) TestRetryStreaming(t *testing.T) {
  243. defer enableRetry()()
  244. req := func(b byte) *testpb.StreamingOutputCallRequest {
  245. return &testpb.StreamingOutputCallRequest{Payload: &testpb.Payload{Body: []byte{b}}}
  246. }
  247. res := func(b byte) *testpb.StreamingOutputCallResponse {
  248. return &testpb.StreamingOutputCallResponse{Payload: &testpb.Payload{Body: []byte{b}}}
  249. }
  250. largePayload, _ := newPayload(testpb.PayloadType_COMPRESSABLE, 500)
  251. type serverOp func(stream testpb.TestService_FullDuplexCallServer) error
  252. type clientOp func(stream testpb.TestService_FullDuplexCallClient) error
  253. // Server Operations
  254. sAttempts := func(n int) serverOp {
  255. return func(stream testpb.TestService_FullDuplexCallServer) error {
  256. const key = "grpc-previous-rpc-attempts"
  257. md, ok := metadata.FromIncomingContext(stream.Context())
  258. if !ok {
  259. return status.Errorf(codes.Internal, "server: no header metadata received")
  260. }
  261. if got := md[key]; len(got) != 1 || got[0] != strconv.Itoa(n) {
  262. return status.Errorf(codes.Internal, "server: metadata = %v; want <contains %q: %q>", md, key, n)
  263. }
  264. return nil
  265. }
  266. }
  267. sReq := func(b byte) serverOp {
  268. return func(stream testpb.TestService_FullDuplexCallServer) error {
  269. want := req(b)
  270. if got, err := stream.Recv(); err != nil || !proto.Equal(got, want) {
  271. return status.Errorf(codes.Internal, "server: Recv() = %v, %v; want %v, <nil>", got, err, want)
  272. }
  273. return nil
  274. }
  275. }
  276. sReqPayload := func(p *testpb.Payload) serverOp {
  277. return func(stream testpb.TestService_FullDuplexCallServer) error {
  278. want := &testpb.StreamingOutputCallRequest{Payload: p}
  279. if got, err := stream.Recv(); err != nil || !proto.Equal(got, want) {
  280. return status.Errorf(codes.Internal, "server: Recv() = %v, %v; want %v, <nil>", got, err, want)
  281. }
  282. return nil
  283. }
  284. }
  285. sRes := func(b byte) serverOp {
  286. return func(stream testpb.TestService_FullDuplexCallServer) error {
  287. msg := res(b)
  288. if err := stream.Send(msg); err != nil {
  289. return status.Errorf(codes.Internal, "server: Send(%v) = %v; want <nil>", msg, err)
  290. }
  291. return nil
  292. }
  293. }
  294. sErr := func(c codes.Code) serverOp {
  295. return func(stream testpb.TestService_FullDuplexCallServer) error {
  296. return status.New(c, "").Err()
  297. }
  298. }
  299. sCloseSend := func() serverOp {
  300. return func(stream testpb.TestService_FullDuplexCallServer) error {
  301. if msg, err := stream.Recv(); msg != nil || err != io.EOF {
  302. return status.Errorf(codes.Internal, "server: Recv() = %v, %v; want <nil>, io.EOF", msg, err)
  303. }
  304. return nil
  305. }
  306. }
  307. sPushback := func(s string) serverOp {
  308. return func(stream testpb.TestService_FullDuplexCallServer) error {
  309. stream.SetTrailer(metadata.MD{"grpc-retry-pushback-ms": []string{s}})
  310. return nil
  311. }
  312. }
  313. // Client Operations
  314. cReq := func(b byte) clientOp {
  315. return func(stream testpb.TestService_FullDuplexCallClient) error {
  316. msg := req(b)
  317. if err := stream.Send(msg); err != nil {
  318. return fmt.Errorf("client: Send(%v) = %v; want <nil>", msg, err)
  319. }
  320. return nil
  321. }
  322. }
  323. cReqPayload := func(p *testpb.Payload) clientOp {
  324. return func(stream testpb.TestService_FullDuplexCallClient) error {
  325. msg := &testpb.StreamingOutputCallRequest{Payload: p}
  326. if err := stream.Send(msg); err != nil {
  327. return fmt.Errorf("client: Send(%v) = %v; want <nil>", msg, err)
  328. }
  329. return nil
  330. }
  331. }
  332. cRes := func(b byte) clientOp {
  333. return func(stream testpb.TestService_FullDuplexCallClient) error {
  334. want := res(b)
  335. if got, err := stream.Recv(); err != nil || !proto.Equal(got, want) {
  336. return fmt.Errorf("client: Recv() = %v, %v; want %v, <nil>", got, err, want)
  337. }
  338. return nil
  339. }
  340. }
  341. cErr := func(c codes.Code) clientOp {
  342. return func(stream testpb.TestService_FullDuplexCallClient) error {
  343. want := status.New(c, "").Err()
  344. if c == codes.OK {
  345. want = io.EOF
  346. }
  347. res, err := stream.Recv()
  348. if res != nil ||
  349. ((err == nil) != (want == nil)) ||
  350. (want != nil && !reflect.DeepEqual(err, want)) {
  351. return fmt.Errorf("client: Recv() = %v, %v; want <nil>, %v", res, err, want)
  352. }
  353. return nil
  354. }
  355. }
  356. cCloseSend := func() clientOp {
  357. return func(stream testpb.TestService_FullDuplexCallClient) error {
  358. if err := stream.CloseSend(); err != nil {
  359. return fmt.Errorf("client: CloseSend() = %v; want <nil>", err)
  360. }
  361. return nil
  362. }
  363. }
  364. var curTime time.Time
  365. cGetTime := func() clientOp {
  366. return func(_ testpb.TestService_FullDuplexCallClient) error {
  367. curTime = time.Now()
  368. return nil
  369. }
  370. }
  371. cCheckElapsed := func(d time.Duration) clientOp {
  372. return func(_ testpb.TestService_FullDuplexCallClient) error {
  373. if elapsed := time.Since(curTime); elapsed < d {
  374. return fmt.Errorf("elapsed time: %v; want >= %v", elapsed, d)
  375. }
  376. return nil
  377. }
  378. }
  379. cHdr := func() clientOp {
  380. return func(stream testpb.TestService_FullDuplexCallClient) error {
  381. _, err := stream.Header()
  382. return err
  383. }
  384. }
  385. cCtx := func() clientOp {
  386. return func(stream testpb.TestService_FullDuplexCallClient) error {
  387. stream.Context()
  388. return nil
  389. }
  390. }
  391. testCases := []struct {
  392. desc string
  393. serverOps []serverOp
  394. clientOps []clientOp
  395. }{{
  396. desc: "Non-retryable error code",
  397. serverOps: []serverOp{sReq(1), sErr(codes.Internal)},
  398. clientOps: []clientOp{cReq(1), cErr(codes.Internal)},
  399. }, {
  400. desc: "One retry necessary",
  401. serverOps: []serverOp{sReq(1), sErr(codes.Unavailable), sReq(1), sAttempts(1), sRes(1)},
  402. clientOps: []clientOp{cReq(1), cRes(1), cErr(codes.OK)},
  403. }, {
  404. desc: "Exceed max attempts (4); check attempts header on server",
  405. serverOps: []serverOp{
  406. sReq(1), sErr(codes.Unavailable),
  407. sReq(1), sAttempts(1), sErr(codes.Unavailable),
  408. sAttempts(2), sReq(1), sErr(codes.Unavailable),
  409. sAttempts(3), sReq(1), sErr(codes.Unavailable),
  410. },
  411. clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
  412. }, {
  413. desc: "Multiple requests",
  414. serverOps: []serverOp{
  415. sReq(1), sReq(2), sErr(codes.Unavailable),
  416. sReq(1), sReq(2), sRes(5),
  417. },
  418. clientOps: []clientOp{cReq(1), cReq(2), cRes(5), cErr(codes.OK)},
  419. }, {
  420. desc: "Multiple successive requests",
  421. serverOps: []serverOp{
  422. sReq(1), sErr(codes.Unavailable),
  423. sReq(1), sReq(2), sErr(codes.Unavailable),
  424. sReq(1), sReq(2), sReq(3), sRes(5),
  425. },
  426. clientOps: []clientOp{cReq(1), cReq(2), cReq(3), cRes(5), cErr(codes.OK)},
  427. }, {
  428. desc: "No retry after receiving",
  429. serverOps: []serverOp{
  430. sReq(1), sErr(codes.Unavailable),
  431. sReq(1), sRes(3), sErr(codes.Unavailable),
  432. },
  433. clientOps: []clientOp{cReq(1), cRes(3), cErr(codes.Unavailable)},
  434. }, {
  435. desc: "No retry after header",
  436. serverOps: []serverOp{sReq(1), sErr(codes.Unavailable)},
  437. clientOps: []clientOp{cReq(1), cHdr(), cErr(codes.Unavailable)},
  438. }, {
  439. desc: "No retry after context",
  440. serverOps: []serverOp{sReq(1), sErr(codes.Unavailable)},
  441. clientOps: []clientOp{cReq(1), cCtx(), cErr(codes.Unavailable)},
  442. }, {
  443. desc: "Replaying close send",
  444. serverOps: []serverOp{
  445. sReq(1), sReq(2), sCloseSend(), sErr(codes.Unavailable),
  446. sReq(1), sReq(2), sCloseSend(), sRes(1), sRes(3), sRes(5),
  447. },
  448. clientOps: []clientOp{cReq(1), cReq(2), cCloseSend(), cRes(1), cRes(3), cRes(5), cErr(codes.OK)},
  449. }, {
  450. desc: "Negative server pushback - no retry",
  451. serverOps: []serverOp{sReq(1), sPushback("-1"), sErr(codes.Unavailable)},
  452. clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
  453. }, {
  454. desc: "Non-numeric server pushback - no retry",
  455. serverOps: []serverOp{sReq(1), sPushback("xxx"), sErr(codes.Unavailable)},
  456. clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
  457. }, {
  458. desc: "Multiple server pushback values - no retry",
  459. serverOps: []serverOp{sReq(1), sPushback("100"), sPushback("10"), sErr(codes.Unavailable)},
  460. clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
  461. }, {
  462. desc: "1s server pushback - delayed retry",
  463. serverOps: []serverOp{sReq(1), sPushback("1000"), sErr(codes.Unavailable), sReq(1), sRes(2)},
  464. clientOps: []clientOp{cGetTime(), cReq(1), cRes(2), cCheckElapsed(time.Second), cErr(codes.OK)},
  465. }, {
  466. desc: "Overflowing buffer - no retry",
  467. serverOps: []serverOp{sReqPayload(largePayload), sErr(codes.Unavailable)},
  468. clientOps: []clientOp{cReqPayload(largePayload), cErr(codes.Unavailable)},
  469. }}
  470. var serverOpIter int
  471. var serverOps []serverOp
  472. ss := &stubServer{
  473. fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
  474. for serverOpIter < len(serverOps) {
  475. op := serverOps[serverOpIter]
  476. serverOpIter++
  477. if err := op(stream); err != nil {
  478. return err
  479. }
  480. }
  481. return nil
  482. },
  483. }
  484. if err := ss.Start([]grpc.ServerOption{}, grpc.WithDefaultCallOptions(grpc.MaxRetryRPCBufferSize(200))); err != nil {
  485. t.Fatalf("Error starting endpoint server: %v", err)
  486. }
  487. defer ss.Stop()
  488. ss.r.NewServiceConfig(`{
  489. "methodConfig": [{
  490. "name": [{"service": "grpc.testing.TestService"}],
  491. "waitForReady": true,
  492. "retryPolicy": {
  493. "MaxAttempts": 4,
  494. "InitialBackoff": ".01s",
  495. "MaxBackoff": ".01s",
  496. "BackoffMultiplier": 1.0,
  497. "RetryableStatusCodes": [ "UNAVAILABLE" ]
  498. }
  499. }]}`)
  500. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  501. for {
  502. if ctx.Err() != nil {
  503. t.Fatalf("Timed out waiting for service config update")
  504. }
  505. if ss.cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil {
  506. break
  507. }
  508. time.Sleep(time.Millisecond)
  509. }
  510. cancel()
  511. for _, tc := range testCases {
  512. func() {
  513. serverOpIter = 0
  514. serverOps = tc.serverOps
  515. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  516. defer cancel()
  517. stream, err := ss.client.FullDuplexCall(ctx)
  518. if err != nil {
  519. t.Fatalf("%v: Error while creating stream: %v", tc.desc, err)
  520. }
  521. for _, op := range tc.clientOps {
  522. if err := op(stream); err != nil {
  523. t.Errorf("%v: %v", tc.desc, err)
  524. break
  525. }
  526. }
  527. if serverOpIter != len(serverOps) {
  528. t.Errorf("%v: serverOpIter = %v; want %v", tc.desc, serverOpIter, len(serverOps))
  529. }
  530. }()
  531. }
  532. }