You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

786 lines
26 KiB

  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. //go:generate protoc --go_out=plugins=grpc:. grpc_testing/test.proto
  19. package interop
  20. import (
  21. "context"
  22. "fmt"
  23. "io"
  24. "io/ioutil"
  25. "strings"
  26. "time"
  27. "github.com/golang/protobuf/proto"
  28. "golang.org/x/oauth2"
  29. "golang.org/x/oauth2/google"
  30. "google.golang.org/grpc"
  31. "google.golang.org/grpc/codes"
  32. "google.golang.org/grpc/grpclog"
  33. testpb "google.golang.org/grpc/interop/grpc_testing"
  34. "google.golang.org/grpc/metadata"
  35. "google.golang.org/grpc/status"
  36. )
  37. var (
  38. reqSizes = []int{27182, 8, 1828, 45904}
  39. respSizes = []int{31415, 9, 2653, 58979}
  40. largeReqSize = 271828
  41. largeRespSize = 314159
  42. initialMetadataKey = "x-grpc-test-echo-initial"
  43. trailingMetadataKey = "x-grpc-test-echo-trailing-bin"
  44. )
  45. // ClientNewPayload returns a payload of the given type and size.
  46. func ClientNewPayload(t testpb.PayloadType, size int) *testpb.Payload {
  47. if size < 0 {
  48. grpclog.Fatalf("Requested a response with invalid length %d", size)
  49. }
  50. body := make([]byte, size)
  51. switch t {
  52. case testpb.PayloadType_COMPRESSABLE:
  53. case testpb.PayloadType_UNCOMPRESSABLE:
  54. grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported")
  55. default:
  56. grpclog.Fatalf("Unsupported payload type: %d", t)
  57. }
  58. return &testpb.Payload{
  59. Type: t,
  60. Body: body,
  61. }
  62. }
  63. // DoEmptyUnaryCall performs a unary RPC with empty request and response messages.
  64. func DoEmptyUnaryCall(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  65. reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, args...)
  66. if err != nil {
  67. grpclog.Fatal("/TestService/EmptyCall RPC failed: ", err)
  68. }
  69. if !proto.Equal(&testpb.Empty{}, reply) {
  70. grpclog.Fatalf("/TestService/EmptyCall receives %v, want %v", reply, testpb.Empty{})
  71. }
  72. }
  73. // DoLargeUnaryCall performs a unary RPC with large payload in the request and response.
  74. func DoLargeUnaryCall(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  75. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  76. req := &testpb.SimpleRequest{
  77. ResponseType: testpb.PayloadType_COMPRESSABLE,
  78. ResponseSize: int32(largeRespSize),
  79. Payload: pl,
  80. }
  81. reply, err := tc.UnaryCall(context.Background(), req, args...)
  82. if err != nil {
  83. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  84. }
  85. t := reply.GetPayload().GetType()
  86. s := len(reply.GetPayload().GetBody())
  87. if t != testpb.PayloadType_COMPRESSABLE || s != largeRespSize {
  88. grpclog.Fatalf("Got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, largeRespSize)
  89. }
  90. }
  91. // DoClientStreaming performs a client streaming RPC.
  92. func DoClientStreaming(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  93. stream, err := tc.StreamingInputCall(context.Background(), args...)
  94. if err != nil {
  95. grpclog.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
  96. }
  97. var sum int
  98. for _, s := range reqSizes {
  99. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, s)
  100. req := &testpb.StreamingInputCallRequest{
  101. Payload: pl,
  102. }
  103. if err := stream.Send(req); err != nil {
  104. grpclog.Fatalf("%v has error %v while sending %v", stream, err, req)
  105. }
  106. sum += s
  107. }
  108. reply, err := stream.CloseAndRecv()
  109. if err != nil {
  110. grpclog.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
  111. }
  112. if reply.GetAggregatedPayloadSize() != int32(sum) {
  113. grpclog.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum)
  114. }
  115. }
  116. // DoServerStreaming performs a server streaming RPC.
  117. func DoServerStreaming(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  118. respParam := make([]*testpb.ResponseParameters, len(respSizes))
  119. for i, s := range respSizes {
  120. respParam[i] = &testpb.ResponseParameters{
  121. Size: int32(s),
  122. }
  123. }
  124. req := &testpb.StreamingOutputCallRequest{
  125. ResponseType: testpb.PayloadType_COMPRESSABLE,
  126. ResponseParameters: respParam,
  127. }
  128. stream, err := tc.StreamingOutputCall(context.Background(), req, args...)
  129. if err != nil {
  130. grpclog.Fatalf("%v.StreamingOutputCall(_) = _, %v", tc, err)
  131. }
  132. var rpcStatus error
  133. var respCnt int
  134. var index int
  135. for {
  136. reply, err := stream.Recv()
  137. if err != nil {
  138. rpcStatus = err
  139. break
  140. }
  141. t := reply.GetPayload().GetType()
  142. if t != testpb.PayloadType_COMPRESSABLE {
  143. grpclog.Fatalf("Got the reply of type %d, want %d", t, testpb.PayloadType_COMPRESSABLE)
  144. }
  145. size := len(reply.GetPayload().GetBody())
  146. if size != respSizes[index] {
  147. grpclog.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
  148. }
  149. index++
  150. respCnt++
  151. }
  152. if rpcStatus != io.EOF {
  153. grpclog.Fatalf("Failed to finish the server streaming rpc: %v", rpcStatus)
  154. }
  155. if respCnt != len(respSizes) {
  156. grpclog.Fatalf("Got %d reply, want %d", len(respSizes), respCnt)
  157. }
  158. }
  159. // DoPingPong performs ping-pong style bi-directional streaming RPC.
  160. func DoPingPong(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  161. stream, err := tc.FullDuplexCall(context.Background(), args...)
  162. if err != nil {
  163. grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
  164. }
  165. var index int
  166. for index < len(reqSizes) {
  167. respParam := []*testpb.ResponseParameters{
  168. {
  169. Size: int32(respSizes[index]),
  170. },
  171. }
  172. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, reqSizes[index])
  173. req := &testpb.StreamingOutputCallRequest{
  174. ResponseType: testpb.PayloadType_COMPRESSABLE,
  175. ResponseParameters: respParam,
  176. Payload: pl,
  177. }
  178. if err := stream.Send(req); err != nil {
  179. grpclog.Fatalf("%v has error %v while sending %v", stream, err, req)
  180. }
  181. reply, err := stream.Recv()
  182. if err != nil {
  183. grpclog.Fatalf("%v.Recv() = %v", stream, err)
  184. }
  185. t := reply.GetPayload().GetType()
  186. if t != testpb.PayloadType_COMPRESSABLE {
  187. grpclog.Fatalf("Got the reply of type %d, want %d", t, testpb.PayloadType_COMPRESSABLE)
  188. }
  189. size := len(reply.GetPayload().GetBody())
  190. if size != respSizes[index] {
  191. grpclog.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
  192. }
  193. index++
  194. }
  195. if err := stream.CloseSend(); err != nil {
  196. grpclog.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
  197. }
  198. if _, err := stream.Recv(); err != io.EOF {
  199. grpclog.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
  200. }
  201. }
  202. // DoEmptyStream sets up a bi-directional streaming with zero message.
  203. func DoEmptyStream(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  204. stream, err := tc.FullDuplexCall(context.Background(), args...)
  205. if err != nil {
  206. grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
  207. }
  208. if err := stream.CloseSend(); err != nil {
  209. grpclog.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
  210. }
  211. if _, err := stream.Recv(); err != io.EOF {
  212. grpclog.Fatalf("%v failed to complete the empty stream test: %v", stream, err)
  213. }
  214. }
  215. // DoTimeoutOnSleepingServer performs an RPC on a sleep server which causes RPC timeout.
  216. func DoTimeoutOnSleepingServer(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  217. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
  218. defer cancel()
  219. stream, err := tc.FullDuplexCall(ctx, args...)
  220. if err != nil {
  221. if status.Code(err) == codes.DeadlineExceeded {
  222. return
  223. }
  224. grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
  225. }
  226. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
  227. req := &testpb.StreamingOutputCallRequest{
  228. ResponseType: testpb.PayloadType_COMPRESSABLE,
  229. Payload: pl,
  230. }
  231. if err := stream.Send(req); err != nil && err != io.EOF {
  232. grpclog.Fatalf("%v.Send(_) = %v", stream, err)
  233. }
  234. if _, err := stream.Recv(); status.Code(err) != codes.DeadlineExceeded {
  235. grpclog.Fatalf("%v.Recv() = _, %v, want error code %d", stream, err, codes.DeadlineExceeded)
  236. }
  237. }
  238. // DoComputeEngineCreds performs a unary RPC with compute engine auth.
  239. func DoComputeEngineCreds(tc testpb.TestServiceClient, serviceAccount, oauthScope string) {
  240. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  241. req := &testpb.SimpleRequest{
  242. ResponseType: testpb.PayloadType_COMPRESSABLE,
  243. ResponseSize: int32(largeRespSize),
  244. Payload: pl,
  245. FillUsername: true,
  246. FillOauthScope: true,
  247. }
  248. reply, err := tc.UnaryCall(context.Background(), req)
  249. if err != nil {
  250. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  251. }
  252. user := reply.GetUsername()
  253. scope := reply.GetOauthScope()
  254. if user != serviceAccount {
  255. grpclog.Fatalf("Got user name %q, want %q.", user, serviceAccount)
  256. }
  257. if !strings.Contains(oauthScope, scope) {
  258. grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
  259. }
  260. }
  261. func getServiceAccountJSONKey(keyFile string) []byte {
  262. jsonKey, err := ioutil.ReadFile(keyFile)
  263. if err != nil {
  264. grpclog.Fatalf("Failed to read the service account key file: %v", err)
  265. }
  266. return jsonKey
  267. }
  268. // DoServiceAccountCreds performs a unary RPC with service account auth.
  269. func DoServiceAccountCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
  270. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  271. req := &testpb.SimpleRequest{
  272. ResponseType: testpb.PayloadType_COMPRESSABLE,
  273. ResponseSize: int32(largeRespSize),
  274. Payload: pl,
  275. FillUsername: true,
  276. FillOauthScope: true,
  277. }
  278. reply, err := tc.UnaryCall(context.Background(), req)
  279. if err != nil {
  280. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  281. }
  282. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  283. user := reply.GetUsername()
  284. scope := reply.GetOauthScope()
  285. if !strings.Contains(string(jsonKey), user) {
  286. grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
  287. }
  288. if !strings.Contains(oauthScope, scope) {
  289. grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
  290. }
  291. }
  292. // DoJWTTokenCreds performs a unary RPC with JWT token auth.
  293. func DoJWTTokenCreds(tc testpb.TestServiceClient, serviceAccountKeyFile string) {
  294. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  295. req := &testpb.SimpleRequest{
  296. ResponseType: testpb.PayloadType_COMPRESSABLE,
  297. ResponseSize: int32(largeRespSize),
  298. Payload: pl,
  299. FillUsername: true,
  300. }
  301. reply, err := tc.UnaryCall(context.Background(), req)
  302. if err != nil {
  303. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  304. }
  305. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  306. user := reply.GetUsername()
  307. if !strings.Contains(string(jsonKey), user) {
  308. grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
  309. }
  310. }
  311. // GetToken obtains an OAUTH token from the input.
  312. func GetToken(serviceAccountKeyFile string, oauthScope string) *oauth2.Token {
  313. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  314. config, err := google.JWTConfigFromJSON(jsonKey, oauthScope)
  315. if err != nil {
  316. grpclog.Fatalf("Failed to get the config: %v", err)
  317. }
  318. token, err := config.TokenSource(context.Background()).Token()
  319. if err != nil {
  320. grpclog.Fatalf("Failed to get the token: %v", err)
  321. }
  322. return token
  323. }
  324. // DoOauth2TokenCreds performs a unary RPC with OAUTH2 token auth.
  325. func DoOauth2TokenCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
  326. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  327. req := &testpb.SimpleRequest{
  328. ResponseType: testpb.PayloadType_COMPRESSABLE,
  329. ResponseSize: int32(largeRespSize),
  330. Payload: pl,
  331. FillUsername: true,
  332. FillOauthScope: true,
  333. }
  334. reply, err := tc.UnaryCall(context.Background(), req)
  335. if err != nil {
  336. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  337. }
  338. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  339. user := reply.GetUsername()
  340. scope := reply.GetOauthScope()
  341. if !strings.Contains(string(jsonKey), user) {
  342. grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
  343. }
  344. if !strings.Contains(oauthScope, scope) {
  345. grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
  346. }
  347. }
  348. // DoPerRPCCreds performs a unary RPC with per RPC OAUTH2 token.
  349. func DoPerRPCCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
  350. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  351. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  352. req := &testpb.SimpleRequest{
  353. ResponseType: testpb.PayloadType_COMPRESSABLE,
  354. ResponseSize: int32(largeRespSize),
  355. Payload: pl,
  356. FillUsername: true,
  357. FillOauthScope: true,
  358. }
  359. token := GetToken(serviceAccountKeyFile, oauthScope)
  360. kv := map[string]string{"authorization": token.Type() + " " + token.AccessToken}
  361. ctx := metadata.NewOutgoingContext(context.Background(), metadata.MD{"authorization": []string{kv["authorization"]}})
  362. reply, err := tc.UnaryCall(ctx, req)
  363. if err != nil {
  364. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  365. }
  366. user := reply.GetUsername()
  367. scope := reply.GetOauthScope()
  368. if !strings.Contains(string(jsonKey), user) {
  369. grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
  370. }
  371. if !strings.Contains(oauthScope, scope) {
  372. grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
  373. }
  374. }
  375. // DoGoogleDefaultCredentials performs a unary RPC with google default credentials
  376. func DoGoogleDefaultCredentials(tc testpb.TestServiceClient, defaultServiceAccount string) {
  377. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  378. req := &testpb.SimpleRequest{
  379. ResponseType: testpb.PayloadType_COMPRESSABLE,
  380. ResponseSize: int32(largeRespSize),
  381. Payload: pl,
  382. FillUsername: true,
  383. FillOauthScope: true,
  384. }
  385. reply, err := tc.UnaryCall(context.Background(), req)
  386. if err != nil {
  387. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  388. }
  389. if reply.GetUsername() != defaultServiceAccount {
  390. grpclog.Fatalf("Got user name %q; wanted %q. ", reply.GetUsername(), defaultServiceAccount)
  391. }
  392. }
  393. var testMetadata = metadata.MD{
  394. "key1": []string{"value1"},
  395. "key2": []string{"value2"},
  396. }
  397. // DoCancelAfterBegin cancels the RPC after metadata has been sent but before payloads are sent.
  398. func DoCancelAfterBegin(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  399. ctx, cancel := context.WithCancel(metadata.NewOutgoingContext(context.Background(), testMetadata))
  400. stream, err := tc.StreamingInputCall(ctx, args...)
  401. if err != nil {
  402. grpclog.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
  403. }
  404. cancel()
  405. _, err = stream.CloseAndRecv()
  406. if status.Code(err) != codes.Canceled {
  407. grpclog.Fatalf("%v.CloseAndRecv() got error code %d, want %d", stream, status.Code(err), codes.Canceled)
  408. }
  409. }
  410. // DoCancelAfterFirstResponse cancels the RPC after receiving the first message from the server.
  411. func DoCancelAfterFirstResponse(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  412. ctx, cancel := context.WithCancel(context.Background())
  413. stream, err := tc.FullDuplexCall(ctx, args...)
  414. if err != nil {
  415. grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
  416. }
  417. respParam := []*testpb.ResponseParameters{
  418. {
  419. Size: 31415,
  420. },
  421. }
  422. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
  423. req := &testpb.StreamingOutputCallRequest{
  424. ResponseType: testpb.PayloadType_COMPRESSABLE,
  425. ResponseParameters: respParam,
  426. Payload: pl,
  427. }
  428. if err := stream.Send(req); err != nil {
  429. grpclog.Fatalf("%v has error %v while sending %v", stream, err, req)
  430. }
  431. if _, err := stream.Recv(); err != nil {
  432. grpclog.Fatalf("%v.Recv() = %v", stream, err)
  433. }
  434. cancel()
  435. if _, err := stream.Recv(); status.Code(err) != codes.Canceled {
  436. grpclog.Fatalf("%v compleled with error code %d, want %d", stream, status.Code(err), codes.Canceled)
  437. }
  438. }
  439. var (
  440. initialMetadataValue = "test_initial_metadata_value"
  441. trailingMetadataValue = "\x0a\x0b\x0a\x0b\x0a\x0b"
  442. customMetadata = metadata.Pairs(
  443. initialMetadataKey, initialMetadataValue,
  444. trailingMetadataKey, trailingMetadataValue,
  445. )
  446. )
  447. func validateMetadata(header, trailer metadata.MD) {
  448. if len(header[initialMetadataKey]) != 1 {
  449. grpclog.Fatalf("Expected exactly one header from server. Received %d", len(header[initialMetadataKey]))
  450. }
  451. if header[initialMetadataKey][0] != initialMetadataValue {
  452. grpclog.Fatalf("Got header %s; want %s", header[initialMetadataKey][0], initialMetadataValue)
  453. }
  454. if len(trailer[trailingMetadataKey]) != 1 {
  455. grpclog.Fatalf("Expected exactly one trailer from server. Received %d", len(trailer[trailingMetadataKey]))
  456. }
  457. if trailer[trailingMetadataKey][0] != trailingMetadataValue {
  458. grpclog.Fatalf("Got trailer %s; want %s", trailer[trailingMetadataKey][0], trailingMetadataValue)
  459. }
  460. }
  461. // DoCustomMetadata checks that metadata is echoed back to the client.
  462. func DoCustomMetadata(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  463. // Testing with UnaryCall.
  464. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 1)
  465. req := &testpb.SimpleRequest{
  466. ResponseType: testpb.PayloadType_COMPRESSABLE,
  467. ResponseSize: int32(1),
  468. Payload: pl,
  469. }
  470. ctx := metadata.NewOutgoingContext(context.Background(), customMetadata)
  471. var header, trailer metadata.MD
  472. args = append(args, grpc.Header(&header), grpc.Trailer(&trailer))
  473. reply, err := tc.UnaryCall(
  474. ctx,
  475. req,
  476. args...,
  477. )
  478. if err != nil {
  479. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  480. }
  481. t := reply.GetPayload().GetType()
  482. s := len(reply.GetPayload().GetBody())
  483. if t != testpb.PayloadType_COMPRESSABLE || s != 1 {
  484. grpclog.Fatalf("Got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, 1)
  485. }
  486. validateMetadata(header, trailer)
  487. // Testing with FullDuplex.
  488. stream, err := tc.FullDuplexCall(ctx, args...)
  489. if err != nil {
  490. grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  491. }
  492. respParam := []*testpb.ResponseParameters{
  493. {
  494. Size: 1,
  495. },
  496. }
  497. streamReq := &testpb.StreamingOutputCallRequest{
  498. ResponseType: testpb.PayloadType_COMPRESSABLE,
  499. ResponseParameters: respParam,
  500. Payload: pl,
  501. }
  502. if err := stream.Send(streamReq); err != nil {
  503. grpclog.Fatalf("%v has error %v while sending %v", stream, err, streamReq)
  504. }
  505. streamHeader, err := stream.Header()
  506. if err != nil {
  507. grpclog.Fatalf("%v.Header() = %v", stream, err)
  508. }
  509. if _, err := stream.Recv(); err != nil {
  510. grpclog.Fatalf("%v.Recv() = %v", stream, err)
  511. }
  512. if err := stream.CloseSend(); err != nil {
  513. grpclog.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
  514. }
  515. if _, err := stream.Recv(); err != io.EOF {
  516. grpclog.Fatalf("%v failed to complete the custom metadata test: %v", stream, err)
  517. }
  518. streamTrailer := stream.Trailer()
  519. validateMetadata(streamHeader, streamTrailer)
  520. }
  521. // DoStatusCodeAndMessage checks that the status code is propagated back to the client.
  522. func DoStatusCodeAndMessage(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  523. var code int32 = 2
  524. msg := "test status message"
  525. expectedErr := status.Error(codes.Code(code), msg)
  526. respStatus := &testpb.EchoStatus{
  527. Code: code,
  528. Message: msg,
  529. }
  530. // Test UnaryCall.
  531. req := &testpb.SimpleRequest{
  532. ResponseStatus: respStatus,
  533. }
  534. if _, err := tc.UnaryCall(context.Background(), req, args...); err == nil || err.Error() != expectedErr.Error() {
  535. grpclog.Fatalf("%v.UnaryCall(_, %v) = _, %v, want _, %v", tc, req, err, expectedErr)
  536. }
  537. // Test FullDuplexCall.
  538. stream, err := tc.FullDuplexCall(context.Background(), args...)
  539. if err != nil {
  540. grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  541. }
  542. streamReq := &testpb.StreamingOutputCallRequest{
  543. ResponseStatus: respStatus,
  544. }
  545. if err := stream.Send(streamReq); err != nil {
  546. grpclog.Fatalf("%v has error %v while sending %v, want <nil>", stream, err, streamReq)
  547. }
  548. if err := stream.CloseSend(); err != nil {
  549. grpclog.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
  550. }
  551. if _, err = stream.Recv(); err.Error() != expectedErr.Error() {
  552. grpclog.Fatalf("%v.Recv() returned error %v, want %v", stream, err, expectedErr)
  553. }
  554. }
  555. // DoSpecialStatusMessage verifies Unicode and whitespace is correctly processed
  556. // in status message.
  557. func DoSpecialStatusMessage(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  558. const (
  559. code int32 = 2
  560. msg string = "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP 😈\t\n"
  561. )
  562. expectedErr := status.Error(codes.Code(code), msg)
  563. req := &testpb.SimpleRequest{
  564. ResponseStatus: &testpb.EchoStatus{
  565. Code: code,
  566. Message: msg,
  567. },
  568. }
  569. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  570. defer cancel()
  571. if _, err := tc.UnaryCall(ctx, req, args...); err == nil || err.Error() != expectedErr.Error() {
  572. grpclog.Fatalf("%v.UnaryCall(_, %v) = _, %v, want _, %v", tc, req, err, expectedErr)
  573. }
  574. }
  575. // DoUnimplementedService attempts to call a method from an unimplemented service.
  576. func DoUnimplementedService(tc testpb.UnimplementedServiceClient) {
  577. _, err := tc.UnimplementedCall(context.Background(), &testpb.Empty{})
  578. if status.Code(err) != codes.Unimplemented {
  579. grpclog.Fatalf("%v.UnimplementedCall() = _, %v, want _, %v", tc, status.Code(err), codes.Unimplemented)
  580. }
  581. }
  582. // DoUnimplementedMethod attempts to call an unimplemented method.
  583. func DoUnimplementedMethod(cc *grpc.ClientConn) {
  584. var req, reply proto.Message
  585. if err := cc.Invoke(context.Background(), "/grpc.testing.TestService/UnimplementedCall", req, reply); err == nil || status.Code(err) != codes.Unimplemented {
  586. grpclog.Fatalf("ClientConn.Invoke(_, _, _, _, _) = %v, want error code %s", err, codes.Unimplemented)
  587. }
  588. }
  589. type testServer struct {
  590. }
  591. // NewTestServer creates a test server for test service.
  592. func NewTestServer() testpb.TestServiceServer {
  593. return &testServer{}
  594. }
  595. func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
  596. return new(testpb.Empty), nil
  597. }
  598. func serverNewPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) {
  599. if size < 0 {
  600. return nil, fmt.Errorf("requested a response with invalid length %d", size)
  601. }
  602. body := make([]byte, size)
  603. switch t {
  604. case testpb.PayloadType_COMPRESSABLE:
  605. case testpb.PayloadType_UNCOMPRESSABLE:
  606. return nil, fmt.Errorf("payloadType UNCOMPRESSABLE is not supported")
  607. default:
  608. return nil, fmt.Errorf("unsupported payload type: %d", t)
  609. }
  610. return &testpb.Payload{
  611. Type: t,
  612. Body: body,
  613. }, nil
  614. }
  615. func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  616. st := in.GetResponseStatus()
  617. if md, ok := metadata.FromIncomingContext(ctx); ok {
  618. if initialMetadata, ok := md[initialMetadataKey]; ok {
  619. header := metadata.Pairs(initialMetadataKey, initialMetadata[0])
  620. grpc.SendHeader(ctx, header)
  621. }
  622. if trailingMetadata, ok := md[trailingMetadataKey]; ok {
  623. trailer := metadata.Pairs(trailingMetadataKey, trailingMetadata[0])
  624. grpc.SetTrailer(ctx, trailer)
  625. }
  626. }
  627. if st != nil && st.Code != 0 {
  628. return nil, status.Error(codes.Code(st.Code), st.Message)
  629. }
  630. pl, err := serverNewPayload(in.GetResponseType(), in.GetResponseSize())
  631. if err != nil {
  632. return nil, err
  633. }
  634. return &testpb.SimpleResponse{
  635. Payload: pl,
  636. }, nil
  637. }
  638. func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
  639. cs := args.GetResponseParameters()
  640. for _, c := range cs {
  641. if us := c.GetIntervalUs(); us > 0 {
  642. time.Sleep(time.Duration(us) * time.Microsecond)
  643. }
  644. pl, err := serverNewPayload(args.GetResponseType(), c.GetSize())
  645. if err != nil {
  646. return err
  647. }
  648. if err := stream.Send(&testpb.StreamingOutputCallResponse{
  649. Payload: pl,
  650. }); err != nil {
  651. return err
  652. }
  653. }
  654. return nil
  655. }
  656. func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error {
  657. var sum int
  658. for {
  659. in, err := stream.Recv()
  660. if err == io.EOF {
  661. return stream.SendAndClose(&testpb.StreamingInputCallResponse{
  662. AggregatedPayloadSize: int32(sum),
  663. })
  664. }
  665. if err != nil {
  666. return err
  667. }
  668. p := in.GetPayload().GetBody()
  669. sum += len(p)
  670. }
  671. }
  672. func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
  673. if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
  674. if initialMetadata, ok := md[initialMetadataKey]; ok {
  675. header := metadata.Pairs(initialMetadataKey, initialMetadata[0])
  676. stream.SendHeader(header)
  677. }
  678. if trailingMetadata, ok := md[trailingMetadataKey]; ok {
  679. trailer := metadata.Pairs(trailingMetadataKey, trailingMetadata[0])
  680. stream.SetTrailer(trailer)
  681. }
  682. }
  683. for {
  684. in, err := stream.Recv()
  685. if err == io.EOF {
  686. // read done.
  687. return nil
  688. }
  689. if err != nil {
  690. return err
  691. }
  692. st := in.GetResponseStatus()
  693. if st != nil && st.Code != 0 {
  694. return status.Error(codes.Code(st.Code), st.Message)
  695. }
  696. cs := in.GetResponseParameters()
  697. for _, c := range cs {
  698. if us := c.GetIntervalUs(); us > 0 {
  699. time.Sleep(time.Duration(us) * time.Microsecond)
  700. }
  701. pl, err := serverNewPayload(in.GetResponseType(), c.GetSize())
  702. if err != nil {
  703. return err
  704. }
  705. if err := stream.Send(&testpb.StreamingOutputCallResponse{
  706. Payload: pl,
  707. }); err != nil {
  708. return err
  709. }
  710. }
  711. }
  712. }
  713. func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServer) error {
  714. var msgBuf []*testpb.StreamingOutputCallRequest
  715. for {
  716. in, err := stream.Recv()
  717. if err == io.EOF {
  718. // read done.
  719. break
  720. }
  721. if err != nil {
  722. return err
  723. }
  724. msgBuf = append(msgBuf, in)
  725. }
  726. for _, m := range msgBuf {
  727. cs := m.GetResponseParameters()
  728. for _, c := range cs {
  729. if us := c.GetIntervalUs(); us > 0 {
  730. time.Sleep(time.Duration(us) * time.Microsecond)
  731. }
  732. pl, err := serverNewPayload(m.GetResponseType(), c.GetSize())
  733. if err != nil {
  734. return err
  735. }
  736. if err := stream.Send(&testpb.StreamingOutputCallResponse{
  737. Payload: pl,
  738. }); err != nil {
  739. return err
  740. }
  741. }
  742. }
  743. return nil
  744. }