25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.
 
 
 

1266 satır
33 KiB

  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package stats_test
  19. import (
  20. "context"
  21. "fmt"
  22. "io"
  23. "net"
  24. "reflect"
  25. "sync"
  26. "testing"
  27. "time"
  28. "github.com/golang/protobuf/proto"
  29. "google.golang.org/grpc"
  30. "google.golang.org/grpc/metadata"
  31. "google.golang.org/grpc/stats"
  32. testpb "google.golang.org/grpc/stats/grpc_testing"
  33. "google.golang.org/grpc/status"
  34. )
  35. func init() {
  36. grpc.EnableTracing = false
  37. }
  38. type connCtxKey struct{}
  39. type rpcCtxKey struct{}
  40. var (
  41. // For headers:
  42. testMetadata = metadata.MD{
  43. "key1": []string{"value1"},
  44. "key2": []string{"value2"},
  45. }
  46. // For trailers:
  47. testTrailerMetadata = metadata.MD{
  48. "tkey1": []string{"trailerValue1"},
  49. "tkey2": []string{"trailerValue2"},
  50. }
  51. // The id for which the service handler should return error.
  52. errorID int32 = 32202
  53. )
  54. type testServer struct{}
  55. func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  56. md, ok := metadata.FromIncomingContext(ctx)
  57. if ok {
  58. if err := grpc.SendHeader(ctx, md); err != nil {
  59. return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", md, err)
  60. }
  61. if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil {
  62. return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err)
  63. }
  64. }
  65. if in.Id == errorID {
  66. return nil, fmt.Errorf("got error id: %v", in.Id)
  67. }
  68. return &testpb.SimpleResponse{Id: in.Id}, nil
  69. }
  70. func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
  71. md, ok := metadata.FromIncomingContext(stream.Context())
  72. if ok {
  73. if err := stream.SendHeader(md); err != nil {
  74. return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
  75. }
  76. stream.SetTrailer(testTrailerMetadata)
  77. }
  78. for {
  79. in, err := stream.Recv()
  80. if err == io.EOF {
  81. // read done.
  82. return nil
  83. }
  84. if err != nil {
  85. return err
  86. }
  87. if in.Id == errorID {
  88. return fmt.Errorf("got error id: %v", in.Id)
  89. }
  90. if err := stream.Send(&testpb.SimpleResponse{Id: in.Id}); err != nil {
  91. return err
  92. }
  93. }
  94. }
  95. func (s *testServer) ClientStreamCall(stream testpb.TestService_ClientStreamCallServer) error {
  96. md, ok := metadata.FromIncomingContext(stream.Context())
  97. if ok {
  98. if err := stream.SendHeader(md); err != nil {
  99. return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
  100. }
  101. stream.SetTrailer(testTrailerMetadata)
  102. }
  103. for {
  104. in, err := stream.Recv()
  105. if err == io.EOF {
  106. // read done.
  107. return stream.SendAndClose(&testpb.SimpleResponse{Id: int32(0)})
  108. }
  109. if err != nil {
  110. return err
  111. }
  112. if in.Id == errorID {
  113. return fmt.Errorf("got error id: %v", in.Id)
  114. }
  115. }
  116. }
  117. func (s *testServer) ServerStreamCall(in *testpb.SimpleRequest, stream testpb.TestService_ServerStreamCallServer) error {
  118. md, ok := metadata.FromIncomingContext(stream.Context())
  119. if ok {
  120. if err := stream.SendHeader(md); err != nil {
  121. return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
  122. }
  123. stream.SetTrailer(testTrailerMetadata)
  124. }
  125. if in.Id == errorID {
  126. return fmt.Errorf("got error id: %v", in.Id)
  127. }
  128. for i := 0; i < 5; i++ {
  129. if err := stream.Send(&testpb.SimpleResponse{Id: in.Id}); err != nil {
  130. return err
  131. }
  132. }
  133. return nil
  134. }
  135. // test is an end-to-end test. It should be created with the newTest
  136. // func, modified as needed, and then started with its startServer method.
  137. // It should be cleaned up with the tearDown method.
  138. type test struct {
  139. t *testing.T
  140. compress string
  141. clientStatsHandler stats.Handler
  142. serverStatsHandler stats.Handler
  143. testServer testpb.TestServiceServer // nil means none
  144. // srv and srvAddr are set once startServer is called.
  145. srv *grpc.Server
  146. srvAddr string
  147. cc *grpc.ClientConn // nil until requested via clientConn
  148. }
  149. func (te *test) tearDown() {
  150. if te.cc != nil {
  151. te.cc.Close()
  152. te.cc = nil
  153. }
  154. te.srv.Stop()
  155. }
  156. type testConfig struct {
  157. compress string
  158. }
  159. // newTest returns a new test using the provided testing.T and
  160. // environment. It is returned with default values. Tests should
  161. // modify it before calling its startServer and clientConn methods.
  162. func newTest(t *testing.T, tc *testConfig, ch stats.Handler, sh stats.Handler) *test {
  163. te := &test{
  164. t: t,
  165. compress: tc.compress,
  166. clientStatsHandler: ch,
  167. serverStatsHandler: sh,
  168. }
  169. return te
  170. }
  171. // startServer starts a gRPC server listening. Callers should defer a
  172. // call to te.tearDown to clean up.
  173. func (te *test) startServer(ts testpb.TestServiceServer) {
  174. te.testServer = ts
  175. lis, err := net.Listen("tcp", "localhost:0")
  176. if err != nil {
  177. te.t.Fatalf("Failed to listen: %v", err)
  178. }
  179. var opts []grpc.ServerOption
  180. if te.compress == "gzip" {
  181. opts = append(opts,
  182. grpc.RPCCompressor(grpc.NewGZIPCompressor()),
  183. grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
  184. )
  185. }
  186. if te.serverStatsHandler != nil {
  187. opts = append(opts, grpc.StatsHandler(te.serverStatsHandler))
  188. }
  189. s := grpc.NewServer(opts...)
  190. te.srv = s
  191. if te.testServer != nil {
  192. testpb.RegisterTestServiceServer(s, te.testServer)
  193. }
  194. go s.Serve(lis)
  195. te.srvAddr = lis.Addr().String()
  196. }
  197. func (te *test) clientConn() *grpc.ClientConn {
  198. if te.cc != nil {
  199. return te.cc
  200. }
  201. opts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()}
  202. if te.compress == "gzip" {
  203. opts = append(opts,
  204. grpc.WithCompressor(grpc.NewGZIPCompressor()),
  205. grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
  206. )
  207. }
  208. if te.clientStatsHandler != nil {
  209. opts = append(opts, grpc.WithStatsHandler(te.clientStatsHandler))
  210. }
  211. var err error
  212. te.cc, err = grpc.Dial(te.srvAddr, opts...)
  213. if err != nil {
  214. te.t.Fatalf("Dial(%q) = %v", te.srvAddr, err)
  215. }
  216. return te.cc
  217. }
  218. type rpcType int
  219. const (
  220. unaryRPC rpcType = iota
  221. clientStreamRPC
  222. serverStreamRPC
  223. fullDuplexStreamRPC
  224. )
  225. type rpcConfig struct {
  226. count int // Number of requests and responses for streaming RPCs.
  227. success bool // Whether the RPC should succeed or return error.
  228. failfast bool
  229. callType rpcType // Type of RPC.
  230. }
  231. func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
  232. var (
  233. resp *testpb.SimpleResponse
  234. req *testpb.SimpleRequest
  235. err error
  236. )
  237. tc := testpb.NewTestServiceClient(te.clientConn())
  238. if c.success {
  239. req = &testpb.SimpleRequest{Id: errorID + 1}
  240. } else {
  241. req = &testpb.SimpleRequest{Id: errorID}
  242. }
  243. ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
  244. resp, err = tc.UnaryCall(ctx, req, grpc.WaitForReady(!c.failfast))
  245. return req, resp, err
  246. }
  247. func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]*testpb.SimpleRequest, []*testpb.SimpleResponse, error) {
  248. var (
  249. reqs []*testpb.SimpleRequest
  250. resps []*testpb.SimpleResponse
  251. err error
  252. )
  253. tc := testpb.NewTestServiceClient(te.clientConn())
  254. stream, err := tc.FullDuplexCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.WaitForReady(!c.failfast))
  255. if err != nil {
  256. return reqs, resps, err
  257. }
  258. var startID int32
  259. if !c.success {
  260. startID = errorID
  261. }
  262. for i := 0; i < c.count; i++ {
  263. req := &testpb.SimpleRequest{
  264. Id: int32(i) + startID,
  265. }
  266. reqs = append(reqs, req)
  267. if err = stream.Send(req); err != nil {
  268. return reqs, resps, err
  269. }
  270. var resp *testpb.SimpleResponse
  271. if resp, err = stream.Recv(); err != nil {
  272. return reqs, resps, err
  273. }
  274. resps = append(resps, resp)
  275. }
  276. if err = stream.CloseSend(); err != nil && err != io.EOF {
  277. return reqs, resps, err
  278. }
  279. if _, err = stream.Recv(); err != io.EOF {
  280. return reqs, resps, err
  281. }
  282. return reqs, resps, nil
  283. }
  284. func (te *test) doClientStreamCall(c *rpcConfig) ([]*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
  285. var (
  286. reqs []*testpb.SimpleRequest
  287. resp *testpb.SimpleResponse
  288. err error
  289. )
  290. tc := testpb.NewTestServiceClient(te.clientConn())
  291. stream, err := tc.ClientStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.WaitForReady(!c.failfast))
  292. if err != nil {
  293. return reqs, resp, err
  294. }
  295. var startID int32
  296. if !c.success {
  297. startID = errorID
  298. }
  299. for i := 0; i < c.count; i++ {
  300. req := &testpb.SimpleRequest{
  301. Id: int32(i) + startID,
  302. }
  303. reqs = append(reqs, req)
  304. if err = stream.Send(req); err != nil {
  305. return reqs, resp, err
  306. }
  307. }
  308. resp, err = stream.CloseAndRecv()
  309. return reqs, resp, err
  310. }
  311. func (te *test) doServerStreamCall(c *rpcConfig) (*testpb.SimpleRequest, []*testpb.SimpleResponse, error) {
  312. var (
  313. req *testpb.SimpleRequest
  314. resps []*testpb.SimpleResponse
  315. err error
  316. )
  317. tc := testpb.NewTestServiceClient(te.clientConn())
  318. var startID int32
  319. if !c.success {
  320. startID = errorID
  321. }
  322. req = &testpb.SimpleRequest{Id: startID}
  323. stream, err := tc.ServerStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), req, grpc.WaitForReady(!c.failfast))
  324. if err != nil {
  325. return req, resps, err
  326. }
  327. for {
  328. var resp *testpb.SimpleResponse
  329. resp, err := stream.Recv()
  330. if err == io.EOF {
  331. return req, resps, nil
  332. } else if err != nil {
  333. return req, resps, err
  334. }
  335. resps = append(resps, resp)
  336. }
  337. }
  338. type expectedData struct {
  339. method string
  340. serverAddr string
  341. compression string
  342. reqIdx int
  343. requests []*testpb.SimpleRequest
  344. respIdx int
  345. responses []*testpb.SimpleResponse
  346. err error
  347. failfast bool
  348. }
  349. type gotData struct {
  350. ctx context.Context
  351. client bool
  352. s interface{} // This could be RPCStats or ConnStats.
  353. }
  354. const (
  355. begin int = iota
  356. end
  357. inPayload
  358. inHeader
  359. inTrailer
  360. outPayload
  361. outHeader
  362. // TODO: test outTrailer ?
  363. connbegin
  364. connend
  365. )
  366. func checkBegin(t *testing.T, d *gotData, e *expectedData) {
  367. var (
  368. ok bool
  369. st *stats.Begin
  370. )
  371. if st, ok = d.s.(*stats.Begin); !ok {
  372. t.Fatalf("got %T, want Begin", d.s)
  373. }
  374. if d.ctx == nil {
  375. t.Fatalf("d.ctx = nil, want <non-nil>")
  376. }
  377. if st.BeginTime.IsZero() {
  378. t.Fatalf("st.BeginTime = %v, want <non-zero>", st.BeginTime)
  379. }
  380. if d.client {
  381. if st.FailFast != e.failfast {
  382. t.Fatalf("st.FailFast = %v, want %v", st.FailFast, e.failfast)
  383. }
  384. }
  385. }
  386. func checkInHeader(t *testing.T, d *gotData, e *expectedData) {
  387. var (
  388. ok bool
  389. st *stats.InHeader
  390. )
  391. if st, ok = d.s.(*stats.InHeader); !ok {
  392. t.Fatalf("got %T, want InHeader", d.s)
  393. }
  394. if d.ctx == nil {
  395. t.Fatalf("d.ctx = nil, want <non-nil>")
  396. }
  397. if !d.client {
  398. if st.FullMethod != e.method {
  399. t.Fatalf("st.FullMethod = %s, want %v", st.FullMethod, e.method)
  400. }
  401. if st.LocalAddr.String() != e.serverAddr {
  402. t.Fatalf("st.LocalAddr = %v, want %v", st.LocalAddr, e.serverAddr)
  403. }
  404. if st.Compression != e.compression {
  405. t.Fatalf("st.Compression = %v, want %v", st.Compression, e.compression)
  406. }
  407. if connInfo, ok := d.ctx.Value(connCtxKey{}).(*stats.ConnTagInfo); ok {
  408. if connInfo.RemoteAddr != st.RemoteAddr {
  409. t.Fatalf("connInfo.RemoteAddr = %v, want %v", connInfo.RemoteAddr, st.RemoteAddr)
  410. }
  411. if connInfo.LocalAddr != st.LocalAddr {
  412. t.Fatalf("connInfo.LocalAddr = %v, want %v", connInfo.LocalAddr, st.LocalAddr)
  413. }
  414. } else {
  415. t.Fatalf("got context %v, want one with connCtxKey", d.ctx)
  416. }
  417. if rpcInfo, ok := d.ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo); ok {
  418. if rpcInfo.FullMethodName != st.FullMethod {
  419. t.Fatalf("rpcInfo.FullMethod = %s, want %v", rpcInfo.FullMethodName, st.FullMethod)
  420. }
  421. } else {
  422. t.Fatalf("got context %v, want one with rpcCtxKey", d.ctx)
  423. }
  424. }
  425. }
  426. func checkInPayload(t *testing.T, d *gotData, e *expectedData) {
  427. var (
  428. ok bool
  429. st *stats.InPayload
  430. )
  431. if st, ok = d.s.(*stats.InPayload); !ok {
  432. t.Fatalf("got %T, want InPayload", d.s)
  433. }
  434. if d.ctx == nil {
  435. t.Fatalf("d.ctx = nil, want <non-nil>")
  436. }
  437. if d.client {
  438. b, err := proto.Marshal(e.responses[e.respIdx])
  439. if err != nil {
  440. t.Fatalf("failed to marshal message: %v", err)
  441. }
  442. if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.responses[e.respIdx]) {
  443. t.Fatalf("st.Payload = %T, want %T", st.Payload, e.responses[e.respIdx])
  444. }
  445. e.respIdx++
  446. if string(st.Data) != string(b) {
  447. t.Fatalf("st.Data = %v, want %v", st.Data, b)
  448. }
  449. if st.Length != len(b) {
  450. t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
  451. }
  452. } else {
  453. b, err := proto.Marshal(e.requests[e.reqIdx])
  454. if err != nil {
  455. t.Fatalf("failed to marshal message: %v", err)
  456. }
  457. if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.requests[e.reqIdx]) {
  458. t.Fatalf("st.Payload = %T, want %T", st.Payload, e.requests[e.reqIdx])
  459. }
  460. e.reqIdx++
  461. if string(st.Data) != string(b) {
  462. t.Fatalf("st.Data = %v, want %v", st.Data, b)
  463. }
  464. if st.Length != len(b) {
  465. t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
  466. }
  467. }
  468. // TODO check WireLength and ReceivedTime.
  469. if st.RecvTime.IsZero() {
  470. t.Fatalf("st.ReceivedTime = %v, want <non-zero>", st.RecvTime)
  471. }
  472. }
  473. func checkInTrailer(t *testing.T, d *gotData, e *expectedData) {
  474. var (
  475. ok bool
  476. )
  477. if _, ok = d.s.(*stats.InTrailer); !ok {
  478. t.Fatalf("got %T, want InTrailer", d.s)
  479. }
  480. if d.ctx == nil {
  481. t.Fatalf("d.ctx = nil, want <non-nil>")
  482. }
  483. }
  484. func checkOutHeader(t *testing.T, d *gotData, e *expectedData) {
  485. var (
  486. ok bool
  487. st *stats.OutHeader
  488. )
  489. if st, ok = d.s.(*stats.OutHeader); !ok {
  490. t.Fatalf("got %T, want OutHeader", d.s)
  491. }
  492. if d.ctx == nil {
  493. t.Fatalf("d.ctx = nil, want <non-nil>")
  494. }
  495. if d.client {
  496. if st.FullMethod != e.method {
  497. t.Fatalf("st.FullMethod = %s, want %v", st.FullMethod, e.method)
  498. }
  499. if st.RemoteAddr.String() != e.serverAddr {
  500. t.Fatalf("st.RemoteAddr = %v, want %v", st.RemoteAddr, e.serverAddr)
  501. }
  502. if st.Compression != e.compression {
  503. t.Fatalf("st.Compression = %v, want %v", st.Compression, e.compression)
  504. }
  505. if rpcInfo, ok := d.ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo); ok {
  506. if rpcInfo.FullMethodName != st.FullMethod {
  507. t.Fatalf("rpcInfo.FullMethod = %s, want %v", rpcInfo.FullMethodName, st.FullMethod)
  508. }
  509. } else {
  510. t.Fatalf("got context %v, want one with rpcCtxKey", d.ctx)
  511. }
  512. }
  513. }
  514. func checkOutPayload(t *testing.T, d *gotData, e *expectedData) {
  515. var (
  516. ok bool
  517. st *stats.OutPayload
  518. )
  519. if st, ok = d.s.(*stats.OutPayload); !ok {
  520. t.Fatalf("got %T, want OutPayload", d.s)
  521. }
  522. if d.ctx == nil {
  523. t.Fatalf("d.ctx = nil, want <non-nil>")
  524. }
  525. if d.client {
  526. b, err := proto.Marshal(e.requests[e.reqIdx])
  527. if err != nil {
  528. t.Fatalf("failed to marshal message: %v", err)
  529. }
  530. if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.requests[e.reqIdx]) {
  531. t.Fatalf("st.Payload = %T, want %T", st.Payload, e.requests[e.reqIdx])
  532. }
  533. e.reqIdx++
  534. if string(st.Data) != string(b) {
  535. t.Fatalf("st.Data = %v, want %v", st.Data, b)
  536. }
  537. if st.Length != len(b) {
  538. t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
  539. }
  540. } else {
  541. b, err := proto.Marshal(e.responses[e.respIdx])
  542. if err != nil {
  543. t.Fatalf("failed to marshal message: %v", err)
  544. }
  545. if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.responses[e.respIdx]) {
  546. t.Fatalf("st.Payload = %T, want %T", st.Payload, e.responses[e.respIdx])
  547. }
  548. e.respIdx++
  549. if string(st.Data) != string(b) {
  550. t.Fatalf("st.Data = %v, want %v", st.Data, b)
  551. }
  552. if st.Length != len(b) {
  553. t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
  554. }
  555. }
  556. // TODO check WireLength and ReceivedTime.
  557. if st.SentTime.IsZero() {
  558. t.Fatalf("st.SentTime = %v, want <non-zero>", st.SentTime)
  559. }
  560. }
  561. func checkOutTrailer(t *testing.T, d *gotData, e *expectedData) {
  562. var (
  563. ok bool
  564. st *stats.OutTrailer
  565. )
  566. if st, ok = d.s.(*stats.OutTrailer); !ok {
  567. t.Fatalf("got %T, want OutTrailer", d.s)
  568. }
  569. if d.ctx == nil {
  570. t.Fatalf("d.ctx = nil, want <non-nil>")
  571. }
  572. if st.Client {
  573. t.Fatalf("st IsClient = true, want false")
  574. }
  575. }
  576. func checkEnd(t *testing.T, d *gotData, e *expectedData) {
  577. var (
  578. ok bool
  579. st *stats.End
  580. )
  581. if st, ok = d.s.(*stats.End); !ok {
  582. t.Fatalf("got %T, want End", d.s)
  583. }
  584. if d.ctx == nil {
  585. t.Fatalf("d.ctx = nil, want <non-nil>")
  586. }
  587. if st.BeginTime.IsZero() {
  588. t.Fatalf("st.BeginTime = %v, want <non-zero>", st.BeginTime)
  589. }
  590. if st.EndTime.IsZero() {
  591. t.Fatalf("st.EndTime = %v, want <non-zero>", st.EndTime)
  592. }
  593. actual, ok := status.FromError(st.Error)
  594. if !ok {
  595. t.Fatalf("expected st.Error to be a statusError, got %v (type %T)", st.Error, st.Error)
  596. }
  597. expectedStatus, _ := status.FromError(e.err)
  598. if actual.Code() != expectedStatus.Code() || actual.Message() != expectedStatus.Message() {
  599. t.Fatalf("st.Error = %v, want %v", st.Error, e.err)
  600. }
  601. if st.Client {
  602. if !reflect.DeepEqual(st.Trailer, testTrailerMetadata) {
  603. t.Fatalf("st.Trailer = %v, want %v", st.Trailer, testTrailerMetadata)
  604. }
  605. } else {
  606. if st.Trailer != nil {
  607. t.Fatalf("st.Trailer = %v, want nil", st.Trailer)
  608. }
  609. }
  610. }
  611. func checkConnBegin(t *testing.T, d *gotData, e *expectedData) {
  612. var (
  613. ok bool
  614. st *stats.ConnBegin
  615. )
  616. if st, ok = d.s.(*stats.ConnBegin); !ok {
  617. t.Fatalf("got %T, want ConnBegin", d.s)
  618. }
  619. if d.ctx == nil {
  620. t.Fatalf("d.ctx = nil, want <non-nil>")
  621. }
  622. st.IsClient() // TODO remove this.
  623. }
  624. func checkConnEnd(t *testing.T, d *gotData, e *expectedData) {
  625. var (
  626. ok bool
  627. st *stats.ConnEnd
  628. )
  629. if st, ok = d.s.(*stats.ConnEnd); !ok {
  630. t.Fatalf("got %T, want ConnEnd", d.s)
  631. }
  632. if d.ctx == nil {
  633. t.Fatalf("d.ctx = nil, want <non-nil>")
  634. }
  635. st.IsClient() // TODO remove this.
  636. }
  637. type statshandler struct {
  638. mu sync.Mutex
  639. gotRPC []*gotData
  640. gotConn []*gotData
  641. }
  642. func (h *statshandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
  643. return context.WithValue(ctx, connCtxKey{}, info)
  644. }
  645. func (h *statshandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
  646. return context.WithValue(ctx, rpcCtxKey{}, info)
  647. }
  648. func (h *statshandler) HandleConn(ctx context.Context, s stats.ConnStats) {
  649. h.mu.Lock()
  650. defer h.mu.Unlock()
  651. h.gotConn = append(h.gotConn, &gotData{ctx, s.IsClient(), s})
  652. }
  653. func (h *statshandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
  654. h.mu.Lock()
  655. defer h.mu.Unlock()
  656. h.gotRPC = append(h.gotRPC, &gotData{ctx, s.IsClient(), s})
  657. }
  658. func checkConnStats(t *testing.T, got []*gotData) {
  659. if len(got) <= 0 || len(got)%2 != 0 {
  660. for i, g := range got {
  661. t.Errorf(" - %v, %T = %+v, ctx: %v", i, g.s, g.s, g.ctx)
  662. }
  663. t.Fatalf("got %v stats, want even positive number", len(got))
  664. }
  665. // The first conn stats must be a ConnBegin.
  666. checkConnBegin(t, got[0], nil)
  667. // The last conn stats must be a ConnEnd.
  668. checkConnEnd(t, got[len(got)-1], nil)
  669. }
  670. func checkServerStats(t *testing.T, got []*gotData, expect *expectedData, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) {
  671. if len(got) != len(checkFuncs) {
  672. for i, g := range got {
  673. t.Errorf(" - %v, %T", i, g.s)
  674. }
  675. t.Fatalf("got %v stats, want %v stats", len(got), len(checkFuncs))
  676. }
  677. var rpcctx context.Context
  678. for i := 0; i < len(got); i++ {
  679. if _, ok := got[i].s.(stats.RPCStats); ok {
  680. if rpcctx != nil && got[i].ctx != rpcctx {
  681. t.Fatalf("got different contexts with stats %T", got[i].s)
  682. }
  683. rpcctx = got[i].ctx
  684. }
  685. }
  686. for i, f := range checkFuncs {
  687. f(t, got[i], expect)
  688. }
  689. }
  690. func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) {
  691. h := &statshandler{}
  692. te := newTest(t, tc, nil, h)
  693. te.startServer(&testServer{})
  694. defer te.tearDown()
  695. var (
  696. reqs []*testpb.SimpleRequest
  697. resps []*testpb.SimpleResponse
  698. err error
  699. method string
  700. req *testpb.SimpleRequest
  701. resp *testpb.SimpleResponse
  702. e error
  703. )
  704. switch cc.callType {
  705. case unaryRPC:
  706. method = "/grpc.testing.TestService/UnaryCall"
  707. req, resp, e = te.doUnaryCall(cc)
  708. reqs = []*testpb.SimpleRequest{req}
  709. resps = []*testpb.SimpleResponse{resp}
  710. err = e
  711. case clientStreamRPC:
  712. method = "/grpc.testing.TestService/ClientStreamCall"
  713. reqs, resp, e = te.doClientStreamCall(cc)
  714. resps = []*testpb.SimpleResponse{resp}
  715. err = e
  716. case serverStreamRPC:
  717. method = "/grpc.testing.TestService/ServerStreamCall"
  718. req, resps, e = te.doServerStreamCall(cc)
  719. reqs = []*testpb.SimpleRequest{req}
  720. err = e
  721. case fullDuplexStreamRPC:
  722. method = "/grpc.testing.TestService/FullDuplexCall"
  723. reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
  724. }
  725. if cc.success != (err == nil) {
  726. t.Fatalf("cc.success: %v, got error: %v", cc.success, err)
  727. }
  728. te.cc.Close()
  729. te.srv.GracefulStop() // Wait for the server to stop.
  730. for {
  731. h.mu.Lock()
  732. if len(h.gotRPC) >= len(checkFuncs) {
  733. h.mu.Unlock()
  734. break
  735. }
  736. h.mu.Unlock()
  737. time.Sleep(10 * time.Millisecond)
  738. }
  739. for {
  740. h.mu.Lock()
  741. if _, ok := h.gotConn[len(h.gotConn)-1].s.(*stats.ConnEnd); ok {
  742. h.mu.Unlock()
  743. break
  744. }
  745. h.mu.Unlock()
  746. time.Sleep(10 * time.Millisecond)
  747. }
  748. expect := &expectedData{
  749. serverAddr: te.srvAddr,
  750. compression: tc.compress,
  751. method: method,
  752. requests: reqs,
  753. responses: resps,
  754. err: err,
  755. }
  756. h.mu.Lock()
  757. checkConnStats(t, h.gotConn)
  758. h.mu.Unlock()
  759. checkServerStats(t, h.gotRPC, expect, checkFuncs)
  760. }
  761. func TestServerStatsUnaryRPC(t *testing.T) {
  762. testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){
  763. checkInHeader,
  764. checkBegin,
  765. checkInPayload,
  766. checkOutHeader,
  767. checkOutPayload,
  768. checkOutTrailer,
  769. checkEnd,
  770. })
  771. }
  772. func TestServerStatsUnaryRPCError(t *testing.T) {
  773. testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){
  774. checkInHeader,
  775. checkBegin,
  776. checkInPayload,
  777. checkOutHeader,
  778. checkOutTrailer,
  779. checkEnd,
  780. })
  781. }
  782. func TestServerStatsClientStreamRPC(t *testing.T) {
  783. count := 5
  784. checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
  785. checkInHeader,
  786. checkBegin,
  787. checkOutHeader,
  788. }
  789. ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
  790. checkInPayload,
  791. }
  792. for i := 0; i < count; i++ {
  793. checkFuncs = append(checkFuncs, ioPayFuncs...)
  794. }
  795. checkFuncs = append(checkFuncs,
  796. checkOutPayload,
  797. checkOutTrailer,
  798. checkEnd,
  799. )
  800. testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: clientStreamRPC}, checkFuncs)
  801. }
  802. func TestServerStatsClientStreamRPCError(t *testing.T) {
  803. count := 1
  804. testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: clientStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
  805. checkInHeader,
  806. checkBegin,
  807. checkOutHeader,
  808. checkInPayload,
  809. checkOutTrailer,
  810. checkEnd,
  811. })
  812. }
  813. func TestServerStatsServerStreamRPC(t *testing.T) {
  814. count := 5
  815. checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
  816. checkInHeader,
  817. checkBegin,
  818. checkInPayload,
  819. checkOutHeader,
  820. }
  821. ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
  822. checkOutPayload,
  823. }
  824. for i := 0; i < count; i++ {
  825. checkFuncs = append(checkFuncs, ioPayFuncs...)
  826. }
  827. checkFuncs = append(checkFuncs,
  828. checkOutTrailer,
  829. checkEnd,
  830. )
  831. testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: serverStreamRPC}, checkFuncs)
  832. }
  833. func TestServerStatsServerStreamRPCError(t *testing.T) {
  834. count := 5
  835. testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: serverStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
  836. checkInHeader,
  837. checkBegin,
  838. checkInPayload,
  839. checkOutHeader,
  840. checkOutTrailer,
  841. checkEnd,
  842. })
  843. }
  844. func TestServerStatsFullDuplexRPC(t *testing.T) {
  845. count := 5
  846. checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
  847. checkInHeader,
  848. checkBegin,
  849. checkOutHeader,
  850. }
  851. ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
  852. checkInPayload,
  853. checkOutPayload,
  854. }
  855. for i := 0; i < count; i++ {
  856. checkFuncs = append(checkFuncs, ioPayFuncs...)
  857. }
  858. checkFuncs = append(checkFuncs,
  859. checkOutTrailer,
  860. checkEnd,
  861. )
  862. testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: fullDuplexStreamRPC}, checkFuncs)
  863. }
  864. func TestServerStatsFullDuplexRPCError(t *testing.T) {
  865. count := 5
  866. testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: fullDuplexStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
  867. checkInHeader,
  868. checkBegin,
  869. checkOutHeader,
  870. checkInPayload,
  871. checkOutTrailer,
  872. checkEnd,
  873. })
  874. }
  875. type checkFuncWithCount struct {
  876. f func(t *testing.T, d *gotData, e *expectedData)
  877. c int // expected count
  878. }
  879. func checkClientStats(t *testing.T, got []*gotData, expect *expectedData, checkFuncs map[int]*checkFuncWithCount) {
  880. var expectLen int
  881. for _, v := range checkFuncs {
  882. expectLen += v.c
  883. }
  884. if len(got) != expectLen {
  885. for i, g := range got {
  886. t.Errorf(" - %v, %T", i, g.s)
  887. }
  888. t.Fatalf("got %v stats, want %v stats", len(got), expectLen)
  889. }
  890. var tagInfoInCtx *stats.RPCTagInfo
  891. for i := 0; i < len(got); i++ {
  892. if _, ok := got[i].s.(stats.RPCStats); ok {
  893. tagInfoInCtxNew, _ := got[i].ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo)
  894. if tagInfoInCtx != nil && tagInfoInCtx != tagInfoInCtxNew {
  895. t.Fatalf("got context containing different tagInfo with stats %T", got[i].s)
  896. }
  897. tagInfoInCtx = tagInfoInCtxNew
  898. }
  899. }
  900. for _, s := range got {
  901. switch s.s.(type) {
  902. case *stats.Begin:
  903. if checkFuncs[begin].c <= 0 {
  904. t.Fatalf("unexpected stats: %T", s.s)
  905. }
  906. checkFuncs[begin].f(t, s, expect)
  907. checkFuncs[begin].c--
  908. case *stats.OutHeader:
  909. if checkFuncs[outHeader].c <= 0 {
  910. t.Fatalf("unexpected stats: %T", s.s)
  911. }
  912. checkFuncs[outHeader].f(t, s, expect)
  913. checkFuncs[outHeader].c--
  914. case *stats.OutPayload:
  915. if checkFuncs[outPayload].c <= 0 {
  916. t.Fatalf("unexpected stats: %T", s.s)
  917. }
  918. checkFuncs[outPayload].f(t, s, expect)
  919. checkFuncs[outPayload].c--
  920. case *stats.InHeader:
  921. if checkFuncs[inHeader].c <= 0 {
  922. t.Fatalf("unexpected stats: %T", s.s)
  923. }
  924. checkFuncs[inHeader].f(t, s, expect)
  925. checkFuncs[inHeader].c--
  926. case *stats.InPayload:
  927. if checkFuncs[inPayload].c <= 0 {
  928. t.Fatalf("unexpected stats: %T", s.s)
  929. }
  930. checkFuncs[inPayload].f(t, s, expect)
  931. checkFuncs[inPayload].c--
  932. case *stats.InTrailer:
  933. if checkFuncs[inTrailer].c <= 0 {
  934. t.Fatalf("unexpected stats: %T", s.s)
  935. }
  936. checkFuncs[inTrailer].f(t, s, expect)
  937. checkFuncs[inTrailer].c--
  938. case *stats.End:
  939. if checkFuncs[end].c <= 0 {
  940. t.Fatalf("unexpected stats: %T", s.s)
  941. }
  942. checkFuncs[end].f(t, s, expect)
  943. checkFuncs[end].c--
  944. case *stats.ConnBegin:
  945. if checkFuncs[connbegin].c <= 0 {
  946. t.Fatalf("unexpected stats: %T", s.s)
  947. }
  948. checkFuncs[connbegin].f(t, s, expect)
  949. checkFuncs[connbegin].c--
  950. case *stats.ConnEnd:
  951. if checkFuncs[connend].c <= 0 {
  952. t.Fatalf("unexpected stats: %T", s.s)
  953. }
  954. checkFuncs[connend].f(t, s, expect)
  955. checkFuncs[connend].c--
  956. default:
  957. t.Fatalf("unexpected stats: %T", s.s)
  958. }
  959. }
  960. }
  961. func testClientStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map[int]*checkFuncWithCount) {
  962. h := &statshandler{}
  963. te := newTest(t, tc, h, nil)
  964. te.startServer(&testServer{})
  965. defer te.tearDown()
  966. var (
  967. reqs []*testpb.SimpleRequest
  968. resps []*testpb.SimpleResponse
  969. method string
  970. err error
  971. req *testpb.SimpleRequest
  972. resp *testpb.SimpleResponse
  973. e error
  974. )
  975. switch cc.callType {
  976. case unaryRPC:
  977. method = "/grpc.testing.TestService/UnaryCall"
  978. req, resp, e = te.doUnaryCall(cc)
  979. reqs = []*testpb.SimpleRequest{req}
  980. resps = []*testpb.SimpleResponse{resp}
  981. err = e
  982. case clientStreamRPC:
  983. method = "/grpc.testing.TestService/ClientStreamCall"
  984. reqs, resp, e = te.doClientStreamCall(cc)
  985. resps = []*testpb.SimpleResponse{resp}
  986. err = e
  987. case serverStreamRPC:
  988. method = "/grpc.testing.TestService/ServerStreamCall"
  989. req, resps, e = te.doServerStreamCall(cc)
  990. reqs = []*testpb.SimpleRequest{req}
  991. err = e
  992. case fullDuplexStreamRPC:
  993. method = "/grpc.testing.TestService/FullDuplexCall"
  994. reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
  995. }
  996. if cc.success != (err == nil) {
  997. t.Fatalf("cc.success: %v, got error: %v", cc.success, err)
  998. }
  999. te.cc.Close()
  1000. te.srv.GracefulStop() // Wait for the server to stop.
  1001. lenRPCStats := 0
  1002. for _, v := range checkFuncs {
  1003. lenRPCStats += v.c
  1004. }
  1005. for {
  1006. h.mu.Lock()
  1007. if len(h.gotRPC) >= lenRPCStats {
  1008. h.mu.Unlock()
  1009. break
  1010. }
  1011. h.mu.Unlock()
  1012. time.Sleep(10 * time.Millisecond)
  1013. }
  1014. for {
  1015. h.mu.Lock()
  1016. if _, ok := h.gotConn[len(h.gotConn)-1].s.(*stats.ConnEnd); ok {
  1017. h.mu.Unlock()
  1018. break
  1019. }
  1020. h.mu.Unlock()
  1021. time.Sleep(10 * time.Millisecond)
  1022. }
  1023. expect := &expectedData{
  1024. serverAddr: te.srvAddr,
  1025. compression: tc.compress,
  1026. method: method,
  1027. requests: reqs,
  1028. responses: resps,
  1029. failfast: cc.failfast,
  1030. err: err,
  1031. }
  1032. h.mu.Lock()
  1033. checkConnStats(t, h.gotConn)
  1034. h.mu.Unlock()
  1035. checkClientStats(t, h.gotRPC, expect, checkFuncs)
  1036. }
  1037. func TestClientStatsUnaryRPC(t *testing.T) {
  1038. testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{
  1039. begin: {checkBegin, 1},
  1040. outHeader: {checkOutHeader, 1},
  1041. outPayload: {checkOutPayload, 1},
  1042. inHeader: {checkInHeader, 1},
  1043. inPayload: {checkInPayload, 1},
  1044. inTrailer: {checkInTrailer, 1},
  1045. end: {checkEnd, 1},
  1046. })
  1047. }
  1048. func TestClientStatsUnaryRPCError(t *testing.T) {
  1049. testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{
  1050. begin: {checkBegin, 1},
  1051. outHeader: {checkOutHeader, 1},
  1052. outPayload: {checkOutPayload, 1},
  1053. inHeader: {checkInHeader, 1},
  1054. inTrailer: {checkInTrailer, 1},
  1055. end: {checkEnd, 1},
  1056. })
  1057. }
  1058. func TestClientStatsClientStreamRPC(t *testing.T) {
  1059. count := 5
  1060. testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{
  1061. begin: {checkBegin, 1},
  1062. outHeader: {checkOutHeader, 1},
  1063. inHeader: {checkInHeader, 1},
  1064. outPayload: {checkOutPayload, count},
  1065. inTrailer: {checkInTrailer, 1},
  1066. inPayload: {checkInPayload, 1},
  1067. end: {checkEnd, 1},
  1068. })
  1069. }
  1070. func TestClientStatsClientStreamRPCError(t *testing.T) {
  1071. count := 1
  1072. testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{
  1073. begin: {checkBegin, 1},
  1074. outHeader: {checkOutHeader, 1},
  1075. inHeader: {checkInHeader, 1},
  1076. outPayload: {checkOutPayload, 1},
  1077. inTrailer: {checkInTrailer, 1},
  1078. end: {checkEnd, 1},
  1079. })
  1080. }
  1081. func TestClientStatsServerStreamRPC(t *testing.T) {
  1082. count := 5
  1083. testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{
  1084. begin: {checkBegin, 1},
  1085. outHeader: {checkOutHeader, 1},
  1086. outPayload: {checkOutPayload, 1},
  1087. inHeader: {checkInHeader, 1},
  1088. inPayload: {checkInPayload, count},
  1089. inTrailer: {checkInTrailer, 1},
  1090. end: {checkEnd, 1},
  1091. })
  1092. }
  1093. func TestClientStatsServerStreamRPCError(t *testing.T) {
  1094. count := 5
  1095. testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{
  1096. begin: {checkBegin, 1},
  1097. outHeader: {checkOutHeader, 1},
  1098. outPayload: {checkOutPayload, 1},
  1099. inHeader: {checkInHeader, 1},
  1100. inTrailer: {checkInTrailer, 1},
  1101. end: {checkEnd, 1},
  1102. })
  1103. }
  1104. func TestClientStatsFullDuplexRPC(t *testing.T) {
  1105. count := 5
  1106. testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{
  1107. begin: {checkBegin, 1},
  1108. outHeader: {checkOutHeader, 1},
  1109. outPayload: {checkOutPayload, count},
  1110. inHeader: {checkInHeader, 1},
  1111. inPayload: {checkInPayload, count},
  1112. inTrailer: {checkInTrailer, 1},
  1113. end: {checkEnd, 1},
  1114. })
  1115. }
  1116. func TestClientStatsFullDuplexRPCError(t *testing.T) {
  1117. count := 5
  1118. testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{
  1119. begin: {checkBegin, 1},
  1120. outHeader: {checkOutHeader, 1},
  1121. outPayload: {checkOutPayload, 1},
  1122. inHeader: {checkInHeader, 1},
  1123. inTrailer: {checkInTrailer, 1},
  1124. end: {checkEnd, 1},
  1125. })
  1126. }
  1127. func TestTags(t *testing.T) {
  1128. b := []byte{5, 2, 4, 3, 1}
  1129. ctx := stats.SetTags(context.Background(), b)
  1130. if tg := stats.OutgoingTags(ctx); !reflect.DeepEqual(tg, b) {
  1131. t.Errorf("OutgoingTags(%v) = %v; want %v", ctx, tg, b)
  1132. }
  1133. if tg := stats.Tags(ctx); tg != nil {
  1134. t.Errorf("Tags(%v) = %v; want nil", ctx, tg)
  1135. }
  1136. ctx = stats.SetIncomingTags(context.Background(), b)
  1137. if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, b) {
  1138. t.Errorf("Tags(%v) = %v; want %v", ctx, tg, b)
  1139. }
  1140. if tg := stats.OutgoingTags(ctx); tg != nil {
  1141. t.Errorf("OutgoingTags(%v) = %v; want nil", ctx, tg)
  1142. }
  1143. }
  1144. func TestTrace(t *testing.T) {
  1145. b := []byte{5, 2, 4, 3, 1}
  1146. ctx := stats.SetTrace(context.Background(), b)
  1147. if tr := stats.OutgoingTrace(ctx); !reflect.DeepEqual(tr, b) {
  1148. t.Errorf("OutgoingTrace(%v) = %v; want %v", ctx, tr, b)
  1149. }
  1150. if tr := stats.Trace(ctx); tr != nil {
  1151. t.Errorf("Trace(%v) = %v; want nil", ctx, tr)
  1152. }
  1153. ctx = stats.SetIncomingTrace(context.Background(), b)
  1154. if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, b) {
  1155. t.Errorf("Trace(%v) = %v; want %v", ctx, tr, b)
  1156. }
  1157. if tr := stats.OutgoingTrace(ctx); tr != nil {
  1158. t.Errorf("OutgoingTrace(%v) = %v; want nil", ctx, tr)
  1159. }
  1160. }