You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

963 lines
30 KiB

  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package test
  19. import (
  20. "context"
  21. "errors"
  22. "fmt"
  23. "net"
  24. "sync"
  25. "testing"
  26. "time"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/codes"
  29. "google.golang.org/grpc/connectivity"
  30. _ "google.golang.org/grpc/health"
  31. healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
  32. healthpb "google.golang.org/grpc/health/grpc_health_v1"
  33. "google.golang.org/grpc/internal"
  34. "google.golang.org/grpc/internal/channelz"
  35. "google.golang.org/grpc/resolver"
  36. "google.golang.org/grpc/resolver/manual"
  37. "google.golang.org/grpc/status"
  38. testpb "google.golang.org/grpc/test/grpc_testing"
  39. )
  40. var testHealthCheckFunc = internal.HealthCheckFunc
  41. func newTestHealthServer() *testHealthServer {
  42. return newTestHealthServerWithWatchFunc(defaultWatchFunc)
  43. }
  44. func newTestHealthServerWithWatchFunc(f func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error) *testHealthServer {
  45. return &testHealthServer{
  46. watchFunc: f,
  47. update: make(chan struct{}, 1),
  48. status: make(map[string]healthpb.HealthCheckResponse_ServingStatus),
  49. }
  50. }
  51. // defaultWatchFunc will send a HealthCheckResponse to the client whenever SetServingStatus is called.
  52. func defaultWatchFunc(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  53. if in.Service != "foo" {
  54. return status.Error(codes.FailedPrecondition,
  55. "the defaultWatchFunc only handles request with service name to be \"foo\"")
  56. }
  57. var done bool
  58. for {
  59. select {
  60. case <-stream.Context().Done():
  61. done = true
  62. case <-s.update:
  63. }
  64. if done {
  65. break
  66. }
  67. s.mu.Lock()
  68. resp := &healthpb.HealthCheckResponse{
  69. Status: s.status[in.Service],
  70. }
  71. s.mu.Unlock()
  72. stream.SendMsg(resp)
  73. }
  74. return nil
  75. }
  76. type testHealthServer struct {
  77. watchFunc func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error
  78. mu sync.Mutex
  79. status map[string]healthpb.HealthCheckResponse_ServingStatus
  80. update chan struct{}
  81. }
  82. func (s *testHealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
  83. return &healthpb.HealthCheckResponse{
  84. Status: healthpb.HealthCheckResponse_SERVING,
  85. }, nil
  86. }
  87. func (s *testHealthServer) Watch(in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  88. return s.watchFunc(s, in, stream)
  89. }
  90. // SetServingStatus is called when need to reset the serving status of a service
  91. // or insert a new service entry into the statusMap.
  92. func (s *testHealthServer) SetServingStatus(service string, status healthpb.HealthCheckResponse_ServingStatus) {
  93. s.mu.Lock()
  94. s.status[service] = status
  95. select {
  96. case <-s.update:
  97. default:
  98. }
  99. s.update <- struct{}{}
  100. s.mu.Unlock()
  101. }
  102. func setupHealthCheckWrapper() (hcEnterChan chan struct{}, hcExitChan chan struct{}, wrapper func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error) {
  103. hcEnterChan = make(chan struct{})
  104. hcExitChan = make(chan struct{})
  105. wrapper = func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
  106. close(hcEnterChan)
  107. defer close(hcExitChan)
  108. return testHealthCheckFunc(ctx, newStream, update, service)
  109. }
  110. return
  111. }
  112. type svrConfig struct {
  113. specialWatchFunc func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error
  114. }
  115. func setupServer(sc *svrConfig) (s *grpc.Server, lis net.Listener, ts *testHealthServer, deferFunc func(), err error) {
  116. s = grpc.NewServer()
  117. lis, err = net.Listen("tcp", "localhost:0")
  118. if err != nil {
  119. return nil, nil, nil, func() {}, fmt.Errorf("failed to listen due to err %v", err)
  120. }
  121. if sc.specialWatchFunc != nil {
  122. ts = newTestHealthServerWithWatchFunc(sc.specialWatchFunc)
  123. } else {
  124. ts = newTestHealthServer()
  125. }
  126. healthgrpc.RegisterHealthServer(s, ts)
  127. testpb.RegisterTestServiceServer(s, &testServer{})
  128. go s.Serve(lis)
  129. return s, lis, ts, s.Stop, nil
  130. }
  131. type clientConfig struct {
  132. balancerName string
  133. testHealthCheckFuncWrapper func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error
  134. extraDialOption []grpc.DialOption
  135. }
  136. func setupClient(c *clientConfig) (cc *grpc.ClientConn, r *manual.Resolver, deferFunc func(), err error) {
  137. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  138. var opts []grpc.DialOption
  139. opts = append(opts, grpc.WithInsecure(), grpc.WithBalancerName(c.balancerName))
  140. if c.testHealthCheckFuncWrapper != nil {
  141. opts = append(opts, internal.WithHealthCheckFunc.(func(internal.HealthChecker) grpc.DialOption)(c.testHealthCheckFuncWrapper))
  142. }
  143. opts = append(opts, c.extraDialOption...)
  144. cc, err = grpc.Dial(r.Scheme()+":///test.server", opts...)
  145. if err != nil {
  146. rcleanup()
  147. return nil, nil, nil, fmt.Errorf("dial failed due to err: %v", err)
  148. }
  149. return cc, r, func() { cc.Close(); rcleanup() }, nil
  150. }
  151. func (s) TestHealthCheckWatchStateChange(t *testing.T) {
  152. _, lis, ts, deferFunc, err := setupServer(&svrConfig{})
  153. defer deferFunc()
  154. if err != nil {
  155. t.Fatal(err)
  156. }
  157. // The table below shows the expected series of addrConn connectivity transitions when server
  158. // updates its health status. As there's only one addrConn corresponds with the ClientConn in this
  159. // test, we use ClientConn's connectivity state as the addrConn connectivity state.
  160. //+------------------------------+-------------------------------------------+
  161. //| Health Check Returned Status | Expected addrConn Connectivity Transition |
  162. //+------------------------------+-------------------------------------------+
  163. //| NOT_SERVING | ->TRANSIENT FAILURE |
  164. //| SERVING | ->READY |
  165. //| SERVICE_UNKNOWN | ->TRANSIENT FAILURE |
  166. //| SERVING | ->READY |
  167. //| UNKNOWN | ->TRANSIENT FAILURE |
  168. //+------------------------------+-------------------------------------------+
  169. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_NOT_SERVING)
  170. cc, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
  171. if err != nil {
  172. t.Fatal(err)
  173. }
  174. defer deferFunc()
  175. r.NewServiceConfig(`{
  176. "healthCheckConfig": {
  177. "serviceName": "foo"
  178. }
  179. }`)
  180. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  181. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  182. defer cancel()
  183. if ok := cc.WaitForStateChange(ctx, connectivity.Idle); !ok {
  184. t.Fatal("ClientConn is still in IDLE state when the context times out.")
  185. }
  186. if ok := cc.WaitForStateChange(ctx, connectivity.Connecting); !ok {
  187. t.Fatal("ClientConn is still in CONNECTING state when the context times out.")
  188. }
  189. if s := cc.GetState(); s != connectivity.TransientFailure {
  190. t.Fatalf("ClientConn is in %v state, want TRANSIENT FAILURE", s)
  191. }
  192. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  193. if ok := cc.WaitForStateChange(ctx, connectivity.TransientFailure); !ok {
  194. t.Fatal("ClientConn is still in TRANSIENT FAILURE state when the context times out.")
  195. }
  196. if s := cc.GetState(); s != connectivity.Ready {
  197. t.Fatalf("ClientConn is in %v state, want READY", s)
  198. }
  199. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
  200. if ok := cc.WaitForStateChange(ctx, connectivity.Ready); !ok {
  201. t.Fatal("ClientConn is still in READY state when the context times out.")
  202. }
  203. if s := cc.GetState(); s != connectivity.TransientFailure {
  204. t.Fatalf("ClientConn is in %v state, want TRANSIENT FAILURE", s)
  205. }
  206. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  207. if ok := cc.WaitForStateChange(ctx, connectivity.TransientFailure); !ok {
  208. t.Fatal("ClientConn is still in TRANSIENT FAILURE state when the context times out.")
  209. }
  210. if s := cc.GetState(); s != connectivity.Ready {
  211. t.Fatalf("ClientConn is in %v state, want READY", s)
  212. }
  213. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_UNKNOWN)
  214. if ok := cc.WaitForStateChange(ctx, connectivity.Ready); !ok {
  215. t.Fatal("ClientConn is still in READY state when the context times out.")
  216. }
  217. if s := cc.GetState(); s != connectivity.TransientFailure {
  218. t.Fatalf("ClientConn is in %v state, want TRANSIENT FAILURE", s)
  219. }
  220. }
  221. // If Watch returns Unimplemented, then the ClientConn should go into READY state.
  222. func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) {
  223. s := grpc.NewServer()
  224. lis, err := net.Listen("tcp", "localhost:0")
  225. if err != nil {
  226. t.Fatalf("failed to listen due to err: %v", err)
  227. }
  228. go s.Serve(lis)
  229. defer s.Stop()
  230. cc, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
  231. if err != nil {
  232. t.Fatal(err)
  233. }
  234. defer deferFunc()
  235. r.NewServiceConfig(`{
  236. "healthCheckConfig": {
  237. "serviceName": "foo"
  238. }
  239. }`)
  240. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  241. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  242. defer cancel()
  243. if ok := cc.WaitForStateChange(ctx, connectivity.Idle); !ok {
  244. t.Fatal("ClientConn is still in IDLE state when the context times out.")
  245. }
  246. if ok := cc.WaitForStateChange(ctx, connectivity.Connecting); !ok {
  247. t.Fatal("ClientConn is still in CONNECTING state when the context times out.")
  248. }
  249. if s := cc.GetState(); s != connectivity.Ready {
  250. t.Fatalf("ClientConn is in %v state, want READY", s)
  251. }
  252. }
  253. // In the case of a goaway received, the health check stream should be terminated and health check
  254. // function should exit.
  255. func (s) TestHealthCheckWithGoAway(t *testing.T) {
  256. hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  257. s, lis, ts, deferFunc, err := setupServer(&svrConfig{})
  258. defer deferFunc()
  259. if err != nil {
  260. t.Fatal(err)
  261. }
  262. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  263. cc, r, deferFunc, err := setupClient(&clientConfig{
  264. balancerName: "round_robin",
  265. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  266. })
  267. if err != nil {
  268. t.Fatal(err)
  269. }
  270. defer deferFunc()
  271. tc := testpb.NewTestServiceClient(cc)
  272. r.NewServiceConfig(`{
  273. "healthCheckConfig": {
  274. "serviceName": "foo"
  275. }
  276. }`)
  277. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  278. // make some rpcs to make sure connection is working.
  279. if err := verifyResultWithDelay(func() (bool, error) {
  280. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  281. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  282. }
  283. return true, nil
  284. }); err != nil {
  285. t.Fatal(err)
  286. }
  287. // the stream rpc will persist through goaway event.
  288. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  289. defer cancel()
  290. stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
  291. if err != nil {
  292. t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  293. }
  294. respParam := []*testpb.ResponseParameters{{Size: 1}}
  295. payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
  296. if err != nil {
  297. t.Fatal(err)
  298. }
  299. req := &testpb.StreamingOutputCallRequest{
  300. ResponseParameters: respParam,
  301. Payload: payload,
  302. }
  303. if err := stream.Send(req); err != nil {
  304. t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
  305. }
  306. if _, err := stream.Recv(); err != nil {
  307. t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
  308. }
  309. select {
  310. case <-hcExitChan:
  311. t.Fatal("Health check function has exited, which is not expected.")
  312. default:
  313. }
  314. // server sends GoAway
  315. go s.GracefulStop()
  316. select {
  317. case <-hcExitChan:
  318. case <-time.After(5 * time.Second):
  319. select {
  320. case <-hcEnterChan:
  321. default:
  322. t.Fatal("Health check function has not entered after 5s.")
  323. }
  324. t.Fatal("Health check function has not exited after 5s.")
  325. }
  326. // The existing RPC should be still good to proceed.
  327. if err := stream.Send(req); err != nil {
  328. t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
  329. }
  330. if _, err := stream.Recv(); err != nil {
  331. t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
  332. }
  333. }
  334. func (s) TestHealthCheckWithConnClose(t *testing.T) {
  335. hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  336. s, lis, ts, deferFunc, err := setupServer(&svrConfig{})
  337. defer deferFunc()
  338. if err != nil {
  339. t.Fatal(err)
  340. }
  341. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  342. cc, r, deferFunc, err := setupClient(&clientConfig{
  343. balancerName: "round_robin",
  344. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  345. })
  346. if err != nil {
  347. t.Fatal(err)
  348. }
  349. defer deferFunc()
  350. tc := testpb.NewTestServiceClient(cc)
  351. r.NewServiceConfig(`{
  352. "healthCheckConfig": {
  353. "serviceName": "foo"
  354. }
  355. }`)
  356. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  357. // make some rpcs to make sure connection is working.
  358. if err := verifyResultWithDelay(func() (bool, error) {
  359. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  360. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  361. }
  362. return true, nil
  363. }); err != nil {
  364. t.Fatal(err)
  365. }
  366. select {
  367. case <-hcExitChan:
  368. t.Fatal("Health check function has exited, which is not expected.")
  369. default:
  370. }
  371. // server closes the connection
  372. s.Stop()
  373. select {
  374. case <-hcExitChan:
  375. case <-time.After(5 * time.Second):
  376. select {
  377. case <-hcEnterChan:
  378. default:
  379. t.Fatal("Health check function has not entered after 5s.")
  380. }
  381. t.Fatal("Health check function has not exited after 5s.")
  382. }
  383. }
  384. // addrConn drain happens when addrConn gets torn down due to its address being no longer in the
  385. // address list returned by the resolver.
  386. func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) {
  387. hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  388. _, lis, ts, deferFunc, err := setupServer(&svrConfig{})
  389. defer deferFunc()
  390. if err != nil {
  391. t.Fatal(err)
  392. }
  393. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  394. cc, r, deferFunc, err := setupClient(&clientConfig{
  395. balancerName: "round_robin",
  396. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  397. })
  398. if err != nil {
  399. t.Fatal(err)
  400. }
  401. defer deferFunc()
  402. tc := testpb.NewTestServiceClient(cc)
  403. r.NewServiceConfig(`{
  404. "healthCheckConfig": {
  405. "serviceName": "foo"
  406. }
  407. }`)
  408. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  409. // make some rpcs to make sure connection is working.
  410. if err := verifyResultWithDelay(func() (bool, error) {
  411. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  412. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  413. }
  414. return true, nil
  415. }); err != nil {
  416. t.Fatal(err)
  417. }
  418. // the stream rpc will persist through goaway event.
  419. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  420. defer cancel()
  421. stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
  422. if err != nil {
  423. t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  424. }
  425. respParam := []*testpb.ResponseParameters{{Size: 1}}
  426. payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
  427. if err != nil {
  428. t.Fatal(err)
  429. }
  430. req := &testpb.StreamingOutputCallRequest{
  431. ResponseParameters: respParam,
  432. Payload: payload,
  433. }
  434. if err := stream.Send(req); err != nil {
  435. t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
  436. }
  437. if _, err := stream.Recv(); err != nil {
  438. t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
  439. }
  440. select {
  441. case <-hcExitChan:
  442. t.Fatal("Health check function has exited, which is not expected.")
  443. default:
  444. }
  445. // trigger teardown of the ac
  446. r.NewAddress([]resolver.Address{})
  447. select {
  448. case <-hcExitChan:
  449. case <-time.After(5 * time.Second):
  450. select {
  451. case <-hcEnterChan:
  452. default:
  453. t.Fatal("Health check function has not entered after 5s.")
  454. }
  455. t.Fatal("Health check function has not exited after 5s.")
  456. }
  457. // The existing RPC should be still good to proceed.
  458. if err := stream.Send(req); err != nil {
  459. t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
  460. }
  461. if _, err := stream.Recv(); err != nil {
  462. t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
  463. }
  464. }
  465. // ClientConn close will lead to its addrConns being torn down.
  466. func (s) TestHealthCheckWithClientConnClose(t *testing.T) {
  467. hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  468. _, lis, ts, deferFunc, err := setupServer(&svrConfig{})
  469. defer deferFunc()
  470. if err != nil {
  471. t.Fatal(err)
  472. }
  473. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  474. cc, r, deferFunc, err := setupClient(&clientConfig{
  475. balancerName: "round_robin",
  476. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  477. })
  478. if err != nil {
  479. t.Fatal(err)
  480. }
  481. defer deferFunc()
  482. tc := testpb.NewTestServiceClient(cc)
  483. r.NewServiceConfig(`{
  484. "healthCheckConfig": {
  485. "serviceName": "foo"
  486. }
  487. }`)
  488. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  489. // make some rpcs to make sure connection is working.
  490. if err := verifyResultWithDelay(func() (bool, error) {
  491. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  492. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  493. }
  494. return true, nil
  495. }); err != nil {
  496. t.Fatal(err)
  497. }
  498. select {
  499. case <-hcExitChan:
  500. t.Fatal("Health check function has exited, which is not expected.")
  501. default:
  502. }
  503. // trigger addrConn teardown
  504. cc.Close()
  505. select {
  506. case <-hcExitChan:
  507. case <-time.After(5 * time.Second):
  508. select {
  509. case <-hcEnterChan:
  510. default:
  511. t.Fatal("Health check function has not entered after 5s.")
  512. }
  513. t.Fatal("Health check function has not exited after 5s.")
  514. }
  515. }
  516. // This test is to test the logic in the createTransport after the health check function returns which
  517. // closes the skipReset channel(since it has not been closed inside health check func) to unblock
  518. // onGoAway/onClose goroutine.
  519. func (s) TestHealthCheckWithoutReportHealthCalledAddrConnShutDown(t *testing.T) {
  520. hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  521. _, lis, ts, deferFunc, err := setupServer(&svrConfig{
  522. specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  523. if in.Service != "delay" {
  524. return status.Error(codes.FailedPrecondition,
  525. "this special Watch function only handles request with service name to be \"delay\"")
  526. }
  527. // Do nothing to mock a delay of health check response from server side.
  528. // This case is to help with the test that covers the condition that reportHealth is not
  529. // called inside HealthCheckFunc before the func returns.
  530. select {
  531. case <-stream.Context().Done():
  532. case <-time.After(5 * time.Second):
  533. }
  534. return nil
  535. },
  536. })
  537. defer deferFunc()
  538. if err != nil {
  539. t.Fatal(err)
  540. }
  541. ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING)
  542. _, r, deferFunc, err := setupClient(&clientConfig{
  543. balancerName: "round_robin",
  544. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  545. })
  546. if err != nil {
  547. t.Fatal(err)
  548. }
  549. defer deferFunc()
  550. // The serviceName "delay" is specially handled at server side, where response will not be sent
  551. // back to client immediately upon receiving the request (client should receive no response until
  552. // test ends).
  553. r.NewServiceConfig(`{
  554. "healthCheckConfig": {
  555. "serviceName": "delay"
  556. }
  557. }`)
  558. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  559. select {
  560. case <-hcExitChan:
  561. t.Fatal("Health check function has exited, which is not expected.")
  562. default:
  563. }
  564. select {
  565. case <-hcEnterChan:
  566. case <-time.After(5 * time.Second):
  567. t.Fatal("Health check function has not been invoked after 5s.")
  568. }
  569. // trigger teardown of the ac, ac in SHUTDOWN state
  570. r.NewAddress([]resolver.Address{})
  571. // The health check func should exit without calling the reportHealth func, as server hasn't sent
  572. // any response.
  573. select {
  574. case <-hcExitChan:
  575. case <-time.After(5 * time.Second):
  576. t.Fatal("Health check function has not exited after 5s.")
  577. }
  578. // The deferred leakcheck will check whether there's leaked goroutine, which is an indication
  579. // whether we closes the skipReset channel to unblock onGoAway/onClose goroutine.
  580. }
  581. // This test is to test the logic in the createTransport after the health check function returns which
  582. // closes the allowedToReset channel(since it has not been closed inside health check func) to unblock
  583. // onGoAway/onClose goroutine.
  584. func (s) TestHealthCheckWithoutReportHealthCalled(t *testing.T) {
  585. hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  586. s, lis, ts, deferFunc, err := setupServer(&svrConfig{
  587. specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  588. if in.Service != "delay" {
  589. return status.Error(codes.FailedPrecondition,
  590. "this special Watch function only handles request with service name to be \"delay\"")
  591. }
  592. // Do nothing to mock a delay of health check response from server side.
  593. // This case is to help with the test that covers the condition that reportHealth is not
  594. // called inside HealthCheckFunc before the func returns.
  595. select {
  596. case <-stream.Context().Done():
  597. case <-time.After(5 * time.Second):
  598. }
  599. return nil
  600. },
  601. })
  602. defer deferFunc()
  603. if err != nil {
  604. t.Fatal(err)
  605. }
  606. ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING)
  607. _, r, deferFunc, err := setupClient(&clientConfig{
  608. balancerName: "round_robin",
  609. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  610. })
  611. if err != nil {
  612. t.Fatal(err)
  613. }
  614. defer deferFunc()
  615. // The serviceName "delay" is specially handled at server side, where response will not be sent
  616. // back to client immediately upon receiving the request (client should receive no response until
  617. // test ends).
  618. r.NewServiceConfig(`{
  619. "healthCheckConfig": {
  620. "serviceName": "delay"
  621. }
  622. }`)
  623. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  624. select {
  625. case <-hcExitChan:
  626. t.Fatal("Health check function has exited, which is not expected.")
  627. default:
  628. }
  629. select {
  630. case <-hcEnterChan:
  631. case <-time.After(5 * time.Second):
  632. t.Fatal("Health check function has not been invoked after 5s.")
  633. }
  634. // trigger transport being closed
  635. s.Stop()
  636. // The health check func should exit without calling the reportHealth func, as server hasn't sent
  637. // any response.
  638. select {
  639. case <-hcExitChan:
  640. case <-time.After(5 * time.Second):
  641. t.Fatal("Health check function has not exited after 5s.")
  642. }
  643. // The deferred leakcheck will check whether there's leaked goroutine, which is an indication
  644. // whether we closes the allowedToReset channel to unblock onGoAway/onClose goroutine.
  645. }
  646. func testHealthCheckDisableWithDialOption(t *testing.T, addr string) {
  647. hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  648. cc, r, deferFunc, err := setupClient(&clientConfig{
  649. balancerName: "round_robin",
  650. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  651. extraDialOption: []grpc.DialOption{grpc.WithDisableHealthCheck()},
  652. })
  653. if err != nil {
  654. t.Fatal(err)
  655. }
  656. defer deferFunc()
  657. tc := testpb.NewTestServiceClient(cc)
  658. r.NewServiceConfig(`{
  659. "healthCheckConfig": {
  660. "serviceName": "foo"
  661. }
  662. }`)
  663. r.NewAddress([]resolver.Address{{Addr: addr}})
  664. // send some rpcs to make sure transport has been created and is ready for use.
  665. if err := verifyResultWithDelay(func() (bool, error) {
  666. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  667. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  668. }
  669. return true, nil
  670. }); err != nil {
  671. t.Fatal(err)
  672. }
  673. select {
  674. case <-hcEnterChan:
  675. t.Fatal("Health check function has exited, which is not expected.")
  676. default:
  677. }
  678. }
  679. func testHealthCheckDisableWithBalancer(t *testing.T, addr string) {
  680. hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  681. cc, r, deferFunc, err := setupClient(&clientConfig{
  682. balancerName: "pick_first",
  683. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  684. })
  685. if err != nil {
  686. t.Fatal(err)
  687. }
  688. defer deferFunc()
  689. tc := testpb.NewTestServiceClient(cc)
  690. r.NewServiceConfig(`{
  691. "healthCheckConfig": {
  692. "serviceName": "foo"
  693. }
  694. }`)
  695. r.NewAddress([]resolver.Address{{Addr: addr}})
  696. // send some rpcs to make sure transport has been created and is ready for use.
  697. if err := verifyResultWithDelay(func() (bool, error) {
  698. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  699. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  700. }
  701. return true, nil
  702. }); err != nil {
  703. t.Fatal(err)
  704. }
  705. select {
  706. case <-hcEnterChan:
  707. t.Fatal("Health check function has started, which is not expected.")
  708. default:
  709. }
  710. }
  711. func testHealthCheckDisableWithServiceConfig(t *testing.T, addr string) {
  712. hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
  713. cc, r, deferFunc, err := setupClient(&clientConfig{
  714. balancerName: "round_robin",
  715. testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
  716. })
  717. if err != nil {
  718. t.Fatal(err)
  719. }
  720. defer deferFunc()
  721. tc := testpb.NewTestServiceClient(cc)
  722. r.NewAddress([]resolver.Address{{Addr: addr}})
  723. // send some rpcs to make sure transport has been created and is ready for use.
  724. if err := verifyResultWithDelay(func() (bool, error) {
  725. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  726. return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  727. }
  728. return true, nil
  729. }); err != nil {
  730. t.Fatal(err)
  731. }
  732. select {
  733. case <-hcEnterChan:
  734. t.Fatal("Health check function has started, which is not expected.")
  735. default:
  736. }
  737. }
  738. func (s) TestHealthCheckDisable(t *testing.T) {
  739. _, lis, ts, deferFunc, err := setupServer(&svrConfig{})
  740. defer deferFunc()
  741. if err != nil {
  742. t.Fatal(err)
  743. }
  744. ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
  745. // test client side disabling configuration.
  746. testHealthCheckDisableWithDialOption(t, lis.Addr().String())
  747. testHealthCheckDisableWithBalancer(t, lis.Addr().String())
  748. testHealthCheckDisableWithServiceConfig(t, lis.Addr().String())
  749. }
  750. func (s) TestHealthCheckChannelzCountingCallSuccess(t *testing.T) {
  751. _, lis, _, deferFunc, err := setupServer(&svrConfig{
  752. specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  753. if in.Service != "channelzSuccess" {
  754. return status.Error(codes.FailedPrecondition,
  755. "this special Watch function only handles request with service name to be \"channelzSuccess\"")
  756. }
  757. return status.Error(codes.OK, "fake success")
  758. },
  759. })
  760. defer deferFunc()
  761. if err != nil {
  762. t.Fatal(err)
  763. }
  764. _, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
  765. if err != nil {
  766. t.Fatal(err)
  767. }
  768. defer deferFunc()
  769. r.NewServiceConfig(`{
  770. "healthCheckConfig": {
  771. "serviceName": "channelzSuccess"
  772. }
  773. }`)
  774. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  775. if err := verifyResultWithDelay(func() (bool, error) {
  776. cm, _ := channelz.GetTopChannels(0, 0)
  777. if len(cm) == 0 {
  778. return false, errors.New("channelz.GetTopChannels return 0 top channel")
  779. }
  780. if len(cm[0].SubChans) == 0 {
  781. return false, errors.New("there is 0 subchannel")
  782. }
  783. var id int64
  784. for k := range cm[0].SubChans {
  785. id = k
  786. break
  787. }
  788. scm := channelz.GetSubChannel(id)
  789. if scm == nil || scm.ChannelData == nil {
  790. return false, errors.New("nil subchannel metric or nil subchannel metric ChannelData returned")
  791. }
  792. // exponential backoff retry may result in more than one health check call.
  793. if scm.ChannelData.CallsStarted > 0 && scm.ChannelData.CallsSucceeded > 0 && scm.ChannelData.CallsFailed == 0 {
  794. return true, nil
  795. }
  796. return false, fmt.Errorf("got %d CallsStarted, %d CallsSucceeded, want >0 >0", scm.ChannelData.CallsStarted, scm.ChannelData.CallsSucceeded)
  797. }); err != nil {
  798. t.Fatal(err)
  799. }
  800. }
  801. func (s) TestHealthCheckChannelzCountingCallFailure(t *testing.T) {
  802. _, lis, _, deferFunc, err := setupServer(&svrConfig{
  803. specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  804. if in.Service != "channelzFailure" {
  805. return status.Error(codes.FailedPrecondition,
  806. "this special Watch function only handles request with service name to be \"channelzFailure\"")
  807. }
  808. return status.Error(codes.Internal, "fake failure")
  809. },
  810. })
  811. if err != nil {
  812. t.Fatal(err)
  813. }
  814. defer deferFunc()
  815. _, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
  816. if err != nil {
  817. t.Fatal(err)
  818. }
  819. defer deferFunc()
  820. r.NewServiceConfig(`{
  821. "healthCheckConfig": {
  822. "serviceName": "channelzFailure"
  823. }
  824. }`)
  825. r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
  826. if err := verifyResultWithDelay(func() (bool, error) {
  827. cm, _ := channelz.GetTopChannels(0, 0)
  828. if len(cm) == 0 {
  829. return false, errors.New("channelz.GetTopChannels return 0 top channel")
  830. }
  831. if len(cm[0].SubChans) == 0 {
  832. return false, errors.New("there is 0 subchannel")
  833. }
  834. var id int64
  835. for k := range cm[0].SubChans {
  836. id = k
  837. break
  838. }
  839. scm := channelz.GetSubChannel(id)
  840. if scm == nil || scm.ChannelData == nil {
  841. return false, errors.New("nil subchannel metric or nil subchannel metric ChannelData returned")
  842. }
  843. // exponential backoff retry may result in more than one health check call.
  844. if scm.ChannelData.CallsStarted > 0 && scm.ChannelData.CallsFailed > 0 && scm.ChannelData.CallsSucceeded == 0 {
  845. return true, nil
  846. }
  847. return false, fmt.Errorf("got %d CallsStarted, %d CallsFailed, want >0, >0", scm.ChannelData.CallsStarted, scm.ChannelData.CallsFailed)
  848. }); err != nil {
  849. t.Fatal(err)
  850. }
  851. }