You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

1247 line
35 KiB

  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "errors"
  22. "fmt"
  23. "math"
  24. "net"
  25. "sync/atomic"
  26. "testing"
  27. "time"
  28. "golang.org/x/net/http2"
  29. "google.golang.org/grpc/connectivity"
  30. "google.golang.org/grpc/credentials"
  31. "google.golang.org/grpc/internal/backoff"
  32. "google.golang.org/grpc/internal/envconfig"
  33. "google.golang.org/grpc/internal/transport"
  34. "google.golang.org/grpc/keepalive"
  35. "google.golang.org/grpc/naming"
  36. "google.golang.org/grpc/resolver"
  37. "google.golang.org/grpc/resolver/manual"
  38. _ "google.golang.org/grpc/resolver/passthrough"
  39. "google.golang.org/grpc/testdata"
  40. )
  41. var (
  42. mutableMinConnectTimeout = time.Second * 20
  43. )
  44. func init() {
  45. getMinConnectTimeout = func() time.Duration {
  46. return time.Duration(atomic.LoadInt64((*int64)(&mutableMinConnectTimeout)))
  47. }
  48. }
  49. func assertState(wantState connectivity.State, cc *ClientConn) (connectivity.State, bool) {
  50. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  51. defer cancel()
  52. var state connectivity.State
  53. for state = cc.GetState(); state != wantState && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
  54. }
  55. return state, state == wantState
  56. }
  57. func (s) TestDialWithMultipleBackendsNotSendingServerPreface(t *testing.T) {
  58. lis1, err := net.Listen("tcp", "localhost:0")
  59. if err != nil {
  60. t.Fatalf("Error while listening. Err: %v", err)
  61. }
  62. defer lis1.Close()
  63. lis1Addr := resolver.Address{Addr: lis1.Addr().String()}
  64. lis1Done := make(chan struct{})
  65. // 1st listener accepts the connection and immediately closes it.
  66. go func() {
  67. defer close(lis1Done)
  68. conn, err := lis1.Accept()
  69. if err != nil {
  70. t.Errorf("Error while accepting. Err: %v", err)
  71. return
  72. }
  73. conn.Close()
  74. }()
  75. lis2, err := net.Listen("tcp", "localhost:0")
  76. if err != nil {
  77. t.Fatalf("Error while listening. Err: %v", err)
  78. }
  79. defer lis2.Close()
  80. lis2Done := make(chan struct{})
  81. lis2Addr := resolver.Address{Addr: lis2.Addr().String()}
  82. // 2nd listener should get a connection attempt since the first one failed.
  83. go func() {
  84. defer close(lis2Done)
  85. _, err := lis2.Accept() // Closing the client will clean up this conn.
  86. if err != nil {
  87. t.Errorf("Error while accepting. Err: %v", err)
  88. return
  89. }
  90. }()
  91. r, cleanup := manual.GenerateAndRegisterManualResolver()
  92. defer cleanup()
  93. r.InitialAddrs([]resolver.Address{lis1Addr, lis2Addr})
  94. client, err := Dial(r.Scheme()+":///test.server", WithInsecure())
  95. if err != nil {
  96. t.Fatalf("Dial failed. Err: %v", err)
  97. }
  98. defer client.Close()
  99. timeout := time.After(5 * time.Second)
  100. select {
  101. case <-timeout:
  102. t.Fatal("timed out waiting for server 1 to finish")
  103. case <-lis1Done:
  104. }
  105. select {
  106. case <-timeout:
  107. t.Fatal("timed out waiting for server 2 to finish")
  108. case <-lis2Done:
  109. }
  110. }
  111. var allReqHSSettings = []envconfig.RequireHandshakeSetting{
  112. envconfig.RequireHandshakeOff,
  113. envconfig.RequireHandshakeOn,
  114. }
  115. func (s) TestDialWaitsForServerSettings(t *testing.T) {
  116. // Restore current setting after test.
  117. old := envconfig.RequireHandshake
  118. defer func() { envconfig.RequireHandshake = old }()
  119. // Test with all environment variable settings, which should not impact the
  120. // test case since WithWaitForHandshake has higher priority.
  121. for _, setting := range allReqHSSettings {
  122. envconfig.RequireHandshake = setting
  123. lis, err := net.Listen("tcp", "localhost:0")
  124. if err != nil {
  125. t.Fatalf("Error while listening. Err: %v", err)
  126. }
  127. defer lis.Close()
  128. done := make(chan struct{})
  129. sent := make(chan struct{})
  130. dialDone := make(chan struct{})
  131. go func() { // Launch the server.
  132. defer func() {
  133. close(done)
  134. }()
  135. conn, err := lis.Accept()
  136. if err != nil {
  137. t.Errorf("Error while accepting. Err: %v", err)
  138. return
  139. }
  140. defer conn.Close()
  141. // Sleep for a little bit to make sure that Dial on client
  142. // side blocks until settings are received.
  143. time.Sleep(100 * time.Millisecond)
  144. framer := http2.NewFramer(conn, conn)
  145. close(sent)
  146. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  147. t.Errorf("Error while writing settings. Err: %v", err)
  148. return
  149. }
  150. <-dialDone // Close conn only after dial returns.
  151. }()
  152. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  153. defer cancel()
  154. client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock())
  155. close(dialDone)
  156. if err != nil {
  157. t.Fatalf("Error while dialing. Err: %v", err)
  158. }
  159. defer client.Close()
  160. select {
  161. case <-sent:
  162. default:
  163. t.Fatalf("Dial returned before server settings were sent")
  164. }
  165. <-done
  166. }
  167. }
  168. func (s) TestDialWaitsForServerSettingsViaEnv(t *testing.T) {
  169. // Set default behavior and restore current setting after test.
  170. old := envconfig.RequireHandshake
  171. envconfig.RequireHandshake = envconfig.RequireHandshakeOn
  172. defer func() { envconfig.RequireHandshake = old }()
  173. lis, err := net.Listen("tcp", "localhost:0")
  174. if err != nil {
  175. t.Fatalf("Error while listening. Err: %v", err)
  176. }
  177. defer lis.Close()
  178. done := make(chan struct{})
  179. sent := make(chan struct{})
  180. dialDone := make(chan struct{})
  181. go func() { // Launch the server.
  182. defer func() {
  183. close(done)
  184. }()
  185. conn, err := lis.Accept()
  186. if err != nil {
  187. t.Errorf("Error while accepting. Err: %v", err)
  188. return
  189. }
  190. defer conn.Close()
  191. // Sleep for a little bit to make sure that Dial on client
  192. // side blocks until settings are received.
  193. time.Sleep(100 * time.Millisecond)
  194. framer := http2.NewFramer(conn, conn)
  195. close(sent)
  196. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  197. t.Errorf("Error while writing settings. Err: %v", err)
  198. return
  199. }
  200. <-dialDone // Close conn only after dial returns.
  201. }()
  202. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  203. defer cancel()
  204. client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBlock())
  205. close(dialDone)
  206. if err != nil {
  207. t.Fatalf("Error while dialing. Err: %v", err)
  208. }
  209. defer client.Close()
  210. select {
  211. case <-sent:
  212. default:
  213. t.Fatalf("Dial returned before server settings were sent")
  214. }
  215. <-done
  216. }
  217. func (s) TestDialWaitsForServerSettingsAndFails(t *testing.T) {
  218. // Restore current setting after test.
  219. old := envconfig.RequireHandshake
  220. defer func() { envconfig.RequireHandshake = old }()
  221. for _, setting := range allReqHSSettings {
  222. envconfig.RequireHandshake = setting
  223. lis, err := net.Listen("tcp", "localhost:0")
  224. if err != nil {
  225. t.Fatalf("Error while listening. Err: %v", err)
  226. }
  227. done := make(chan struct{})
  228. numConns := 0
  229. go func() { // Launch the server.
  230. defer func() {
  231. close(done)
  232. }()
  233. for {
  234. conn, err := lis.Accept()
  235. if err != nil {
  236. break
  237. }
  238. numConns++
  239. defer conn.Close()
  240. }
  241. }()
  242. cleanup := setMinConnectTimeout(time.Second / 4)
  243. defer cleanup()
  244. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
  245. defer cancel()
  246. client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock(), withBackoff(noBackoff{}))
  247. lis.Close()
  248. if err == nil {
  249. client.Close()
  250. t.Fatalf("Unexpected success (err=nil) while dialing")
  251. }
  252. if err != context.DeadlineExceeded {
  253. t.Fatalf("DialContext(_) = %v; want context.DeadlineExceeded", err)
  254. }
  255. if numConns < 2 {
  256. t.Fatalf("dial attempts: %v; want > 1", numConns)
  257. }
  258. <-done
  259. }
  260. }
  261. func (s) TestDialWaitsForServerSettingsViaEnvAndFails(t *testing.T) {
  262. // Set default behavior and restore current setting after test.
  263. old := envconfig.RequireHandshake
  264. envconfig.RequireHandshake = envconfig.RequireHandshakeOn
  265. defer func() { envconfig.RequireHandshake = old }()
  266. lis, err := net.Listen("tcp", "localhost:0")
  267. if err != nil {
  268. t.Fatalf("Error while listening. Err: %v", err)
  269. }
  270. done := make(chan struct{})
  271. numConns := 0
  272. go func() { // Launch the server.
  273. defer func() {
  274. close(done)
  275. }()
  276. for {
  277. conn, err := lis.Accept()
  278. if err != nil {
  279. break
  280. }
  281. numConns++
  282. defer conn.Close()
  283. }
  284. }()
  285. cleanup := setMinConnectTimeout(time.Second / 4)
  286. defer cleanup()
  287. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
  288. defer cancel()
  289. client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBlock(), withBackoff(noBackoff{}))
  290. lis.Close()
  291. if err == nil {
  292. client.Close()
  293. t.Fatalf("Unexpected success (err=nil) while dialing")
  294. }
  295. if err != context.DeadlineExceeded {
  296. t.Fatalf("DialContext(_) = %v; want context.DeadlineExceeded", err)
  297. }
  298. if numConns < 2 {
  299. t.Fatalf("dial attempts: %v; want > 1", numConns)
  300. }
  301. <-done
  302. }
  303. func (s) TestDialDoesNotWaitForServerSettings(t *testing.T) {
  304. // Restore current setting after test.
  305. old := envconfig.RequireHandshake
  306. defer func() { envconfig.RequireHandshake = old }()
  307. envconfig.RequireHandshake = envconfig.RequireHandshakeOff
  308. lis, err := net.Listen("tcp", "localhost:0")
  309. if err != nil {
  310. t.Fatalf("Error while listening. Err: %v", err)
  311. }
  312. defer lis.Close()
  313. done := make(chan struct{})
  314. dialDone := make(chan struct{})
  315. go func() { // Launch the server.
  316. defer func() {
  317. close(done)
  318. }()
  319. conn, err := lis.Accept()
  320. if err != nil {
  321. t.Errorf("Error while accepting. Err: %v", err)
  322. return
  323. }
  324. defer conn.Close()
  325. <-dialDone // Close conn only after dial returns.
  326. }()
  327. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  328. defer cancel()
  329. client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBlock())
  330. if err != nil {
  331. t.Fatalf("DialContext returned err =%v; want nil", err)
  332. }
  333. defer client.Close()
  334. if state := client.GetState(); state != connectivity.Ready {
  335. t.Fatalf("client.GetState() = %v; want connectivity.Ready", state)
  336. }
  337. close(dialDone)
  338. }
  339. func (s) TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) {
  340. // Restore current setting after test.
  341. old := envconfig.RequireHandshake
  342. defer func() { envconfig.RequireHandshake = old }()
  343. envconfig.RequireHandshake = envconfig.RequireHandshakeOn
  344. // 1. Client connects to a server that doesn't send preface.
  345. // 2. After minConnectTimeout(500 ms here), client disconnects and retries.
  346. // 3. The new server sends its preface.
  347. // 4. Client doesn't kill the connection this time.
  348. cleanup := setMinConnectTimeout(time.Millisecond * 500)
  349. defer cleanup()
  350. lis, err := net.Listen("tcp", "localhost:0")
  351. if err != nil {
  352. t.Fatalf("Error while listening. Err: %v", err)
  353. }
  354. var (
  355. conn2 net.Conn
  356. over uint32
  357. )
  358. defer func() {
  359. lis.Close()
  360. // conn2 shouldn't be closed until the client has
  361. // observed a successful test.
  362. if conn2 != nil {
  363. conn2.Close()
  364. }
  365. }()
  366. done := make(chan struct{})
  367. accepted := make(chan struct{})
  368. go func() { // Launch the server.
  369. defer close(done)
  370. conn1, err := lis.Accept()
  371. if err != nil {
  372. t.Errorf("Error while accepting. Err: %v", err)
  373. return
  374. }
  375. defer conn1.Close()
  376. // Don't send server settings and the client should close the connection and try again.
  377. conn2, err = lis.Accept() // Accept a reconnection request from client.
  378. if err != nil {
  379. t.Errorf("Error while accepting. Err: %v", err)
  380. return
  381. }
  382. close(accepted)
  383. framer := http2.NewFramer(conn2, conn2)
  384. if err = framer.WriteSettings(http2.Setting{}); err != nil {
  385. t.Errorf("Error while writing settings. Err: %v", err)
  386. return
  387. }
  388. b := make([]byte, 8)
  389. for {
  390. _, err = conn2.Read(b)
  391. if err == nil {
  392. continue
  393. }
  394. if atomic.LoadUint32(&over) == 1 {
  395. // The connection stayed alive for the timer.
  396. // Success.
  397. return
  398. }
  399. t.Errorf("Unexpected error while reading. Err: %v, want timeout error", err)
  400. break
  401. }
  402. }()
  403. client, err := Dial(lis.Addr().String(), WithInsecure())
  404. if err != nil {
  405. t.Fatalf("Error while dialing. Err: %v", err)
  406. }
  407. // wait for connection to be accepted on the server.
  408. timer := time.NewTimer(time.Second * 10)
  409. select {
  410. case <-accepted:
  411. case <-timer.C:
  412. t.Fatalf("Client didn't make another connection request in time.")
  413. }
  414. // Make sure the connection stays alive for sometime.
  415. time.Sleep(time.Second)
  416. atomic.StoreUint32(&over, 1)
  417. client.Close()
  418. <-done
  419. }
  420. func (s) TestBackoffWhenNoServerPrefaceReceived(t *testing.T) {
  421. lis, err := net.Listen("tcp", "localhost:0")
  422. if err != nil {
  423. t.Fatalf("Error while listening. Err: %v", err)
  424. }
  425. defer lis.Close()
  426. done := make(chan struct{})
  427. go func() { // Launch the server.
  428. defer func() {
  429. close(done)
  430. }()
  431. conn, err := lis.Accept() // Accept the connection only to close it immediately.
  432. if err != nil {
  433. t.Errorf("Error while accepting. Err: %v", err)
  434. return
  435. }
  436. prevAt := time.Now()
  437. conn.Close()
  438. var prevDuration time.Duration
  439. // Make sure the retry attempts are backed off properly.
  440. for i := 0; i < 3; i++ {
  441. conn, err := lis.Accept()
  442. if err != nil {
  443. t.Errorf("Error while accepting. Err: %v", err)
  444. return
  445. }
  446. meow := time.Now()
  447. conn.Close()
  448. dr := meow.Sub(prevAt)
  449. if dr <= prevDuration {
  450. t.Errorf("Client backoff did not increase with retries. Previous duration: %v, current duration: %v", prevDuration, dr)
  451. return
  452. }
  453. prevDuration = dr
  454. prevAt = meow
  455. }
  456. }()
  457. client, err := Dial(lis.Addr().String(), WithInsecure())
  458. if err != nil {
  459. t.Fatalf("Error while dialing. Err: %v", err)
  460. }
  461. defer client.Close()
  462. <-done
  463. }
  464. func (s) TestConnectivityStates(t *testing.T) {
  465. servers, resolver, cleanup := startServers(t, 2, math.MaxUint32)
  466. defer cleanup()
  467. cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(resolver)), WithInsecure())
  468. if err != nil {
  469. t.Fatalf("Dial(\"foo.bar.com\", WithBalancer(_)) = _, %v, want _ <nil>", err)
  470. }
  471. defer cc.Close()
  472. wantState := connectivity.Ready
  473. if state, ok := assertState(wantState, cc); !ok {
  474. t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState)
  475. }
  476. // Send an update to delete the server connection (tearDown addrConn).
  477. update := []*naming.Update{
  478. {
  479. Op: naming.Delete,
  480. Addr: "localhost:" + servers[0].port,
  481. },
  482. }
  483. resolver.w.inject(update)
  484. wantState = connectivity.TransientFailure
  485. if state, ok := assertState(wantState, cc); !ok {
  486. t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState)
  487. }
  488. update[0] = &naming.Update{
  489. Op: naming.Add,
  490. Addr: "localhost:" + servers[1].port,
  491. }
  492. resolver.w.inject(update)
  493. wantState = connectivity.Ready
  494. if state, ok := assertState(wantState, cc); !ok {
  495. t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState)
  496. }
  497. }
  498. func (s) TestWithTimeout(t *testing.T) {
  499. conn, err := Dial("passthrough:///Non-Existent.Server:80", WithTimeout(time.Millisecond), WithBlock(), WithInsecure())
  500. if err == nil {
  501. conn.Close()
  502. }
  503. if err != context.DeadlineExceeded {
  504. t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, context.DeadlineExceeded)
  505. }
  506. }
  507. func (s) TestWithTransportCredentialsTLS(t *testing.T) {
  508. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  509. defer cancel()
  510. creds, err := credentials.NewClientTLSFromFile(testdata.Path("ca.pem"), "x.test.youtube.com")
  511. if err != nil {
  512. t.Fatalf("Failed to create credentials %v", err)
  513. }
  514. conn, err := DialContext(ctx, "passthrough:///Non-Existent.Server:80", WithTransportCredentials(creds), WithBlock())
  515. if err == nil {
  516. conn.Close()
  517. }
  518. if err != context.DeadlineExceeded {
  519. t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, context.DeadlineExceeded)
  520. }
  521. }
  522. func (s) TestDefaultAuthority(t *testing.T) {
  523. target := "Non-Existent.Server:8080"
  524. conn, err := Dial(target, WithInsecure())
  525. if err != nil {
  526. t.Fatalf("Dial(_, _) = _, %v, want _, <nil>", err)
  527. }
  528. defer conn.Close()
  529. if conn.authority != target {
  530. t.Fatalf("%v.authority = %v, want %v", conn, conn.authority, target)
  531. }
  532. }
  533. func (s) TestTLSServerNameOverwrite(t *testing.T) {
  534. overwriteServerName := "over.write.server.name"
  535. creds, err := credentials.NewClientTLSFromFile(testdata.Path("ca.pem"), overwriteServerName)
  536. if err != nil {
  537. t.Fatalf("Failed to create credentials %v", err)
  538. }
  539. conn, err := Dial("passthrough:///Non-Existent.Server:80", WithTransportCredentials(creds))
  540. if err != nil {
  541. t.Fatalf("Dial(_, _) = _, %v, want _, <nil>", err)
  542. }
  543. defer conn.Close()
  544. if conn.authority != overwriteServerName {
  545. t.Fatalf("%v.authority = %v, want %v", conn, conn.authority, overwriteServerName)
  546. }
  547. }
  548. func (s) TestWithAuthority(t *testing.T) {
  549. overwriteServerName := "over.write.server.name"
  550. conn, err := Dial("passthrough:///Non-Existent.Server:80", WithInsecure(), WithAuthority(overwriteServerName))
  551. if err != nil {
  552. t.Fatalf("Dial(_, _) = _, %v, want _, <nil>", err)
  553. }
  554. defer conn.Close()
  555. if conn.authority != overwriteServerName {
  556. t.Fatalf("%v.authority = %v, want %v", conn, conn.authority, overwriteServerName)
  557. }
  558. }
  559. func (s) TestWithAuthorityAndTLS(t *testing.T) {
  560. overwriteServerName := "over.write.server.name"
  561. creds, err := credentials.NewClientTLSFromFile(testdata.Path("ca.pem"), overwriteServerName)
  562. if err != nil {
  563. t.Fatalf("Failed to create credentials %v", err)
  564. }
  565. conn, err := Dial("passthrough:///Non-Existent.Server:80", WithTransportCredentials(creds), WithAuthority("no.effect.authority"))
  566. if err != nil {
  567. t.Fatalf("Dial(_, _) = _, %v, want _, <nil>", err)
  568. }
  569. defer conn.Close()
  570. if conn.authority != overwriteServerName {
  571. t.Fatalf("%v.authority = %v, want %v", conn, conn.authority, overwriteServerName)
  572. }
  573. }
  574. // When creating a transport configured with n addresses, only calculate the
  575. // backoff once per "round" of attempts instead of once per address (n times
  576. // per "round" of attempts).
  577. func (s) TestDial_OneBackoffPerRetryGroup(t *testing.T) {
  578. getMinConnectTimeoutBackup := getMinConnectTimeout
  579. defer func() {
  580. getMinConnectTimeout = getMinConnectTimeoutBackup
  581. }()
  582. var attempts uint32
  583. getMinConnectTimeout = func() time.Duration {
  584. if atomic.AddUint32(&attempts, 1) == 1 {
  585. // Once all addresses are exhausted, hang around and wait for the
  586. // client.Close to happen rather than re-starting a new round of
  587. // attempts.
  588. return time.Hour
  589. }
  590. t.Error("only one attempt backoff calculation, but got more")
  591. return 0
  592. }
  593. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  594. defer cancel()
  595. lis1, err := net.Listen("tcp", "localhost:0")
  596. if err != nil {
  597. t.Fatalf("Error while listening. Err: %v", err)
  598. }
  599. defer lis1.Close()
  600. lis2, err := net.Listen("tcp", "localhost:0")
  601. if err != nil {
  602. t.Fatalf("Error while listening. Err: %v", err)
  603. }
  604. defer lis2.Close()
  605. server1Done := make(chan struct{})
  606. server2Done := make(chan struct{})
  607. // Launch server 1.
  608. go func() {
  609. conn, err := lis1.Accept()
  610. if err != nil {
  611. t.Error(err)
  612. return
  613. }
  614. conn.Close()
  615. close(server1Done)
  616. }()
  617. // Launch server 2.
  618. go func() {
  619. conn, err := lis2.Accept()
  620. if err != nil {
  621. t.Error(err)
  622. return
  623. }
  624. conn.Close()
  625. close(server2Done)
  626. }()
  627. rb := manual.NewBuilderWithScheme("whatever")
  628. rb.InitialAddrs([]resolver.Address{
  629. {Addr: lis1.Addr().String()},
  630. {Addr: lis2.Addr().String()},
  631. })
  632. client, err := DialContext(ctx, "this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), withResolverBuilder(rb))
  633. if err != nil {
  634. t.Fatal(err)
  635. }
  636. defer client.Close()
  637. timeout := time.After(15 * time.Second)
  638. select {
  639. case <-timeout:
  640. t.Fatal("timed out waiting for test to finish")
  641. case <-server1Done:
  642. }
  643. select {
  644. case <-timeout:
  645. t.Fatal("timed out waiting for test to finish")
  646. case <-server2Done:
  647. }
  648. }
  649. func (s) TestDialContextCancel(t *testing.T) {
  650. ctx, cancel := context.WithCancel(context.Background())
  651. cancel()
  652. if _, err := DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithInsecure()); err != context.Canceled {
  653. t.Fatalf("DialContext(%v, _) = _, %v, want _, %v", ctx, err, context.Canceled)
  654. }
  655. }
  656. type failFastError struct{}
  657. func (failFastError) Error() string { return "failfast" }
  658. func (failFastError) Temporary() bool { return false }
  659. func (s) TestDialContextFailFast(t *testing.T) {
  660. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  661. defer cancel()
  662. failErr := failFastError{}
  663. dialer := func(string, time.Duration) (net.Conn, error) {
  664. return nil, failErr
  665. }
  666. _, err := DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithInsecure(), WithDialer(dialer), FailOnNonTempDialError(true))
  667. if terr, ok := err.(transport.ConnectionError); !ok || terr.Origin() != failErr {
  668. t.Fatalf("DialContext() = _, %v, want _, %v", err, failErr)
  669. }
  670. }
  671. // blockingBalancer mimics the behavior of balancers whose initialization takes a long time.
  672. // In this test, reading from blockingBalancer.Notify() blocks forever.
  673. type blockingBalancer struct {
  674. ch chan []Address
  675. }
  676. func newBlockingBalancer() Balancer {
  677. return &blockingBalancer{ch: make(chan []Address)}
  678. }
  679. func (b *blockingBalancer) Start(target string, config BalancerConfig) error {
  680. return nil
  681. }
  682. func (b *blockingBalancer) Up(addr Address) func(error) {
  683. return nil
  684. }
  685. func (b *blockingBalancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
  686. return Address{}, nil, nil
  687. }
  688. func (b *blockingBalancer) Notify() <-chan []Address {
  689. return b.ch
  690. }
  691. func (b *blockingBalancer) Close() error {
  692. close(b.ch)
  693. return nil
  694. }
  695. func (s) TestDialWithBlockingBalancer(t *testing.T) {
  696. ctx, cancel := context.WithCancel(context.Background())
  697. dialDone := make(chan struct{})
  698. go func() {
  699. DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithInsecure(), WithBalancer(newBlockingBalancer()))
  700. close(dialDone)
  701. }()
  702. cancel()
  703. <-dialDone
  704. }
  705. // securePerRPCCredentials always requires transport security.
  706. type securePerRPCCredentials struct{}
  707. func (c securePerRPCCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
  708. return nil, nil
  709. }
  710. func (c securePerRPCCredentials) RequireTransportSecurity() bool {
  711. return true
  712. }
  713. func (s) TestCredentialsMisuse(t *testing.T) {
  714. tlsCreds, err := credentials.NewClientTLSFromFile(testdata.Path("ca.pem"), "x.test.youtube.com")
  715. if err != nil {
  716. t.Fatalf("Failed to create authenticator %v", err)
  717. }
  718. // Two conflicting credential configurations
  719. if _, err := Dial("passthrough:///Non-Existent.Server:80", WithTransportCredentials(tlsCreds), WithBlock(), WithInsecure()); err != errCredentialsConflict {
  720. t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errCredentialsConflict)
  721. }
  722. // security info on insecure connection
  723. if _, err := Dial("passthrough:///Non-Existent.Server:80", WithPerRPCCredentials(securePerRPCCredentials{}), WithBlock(), WithInsecure()); err != errTransportCredentialsMissing {
  724. t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errTransportCredentialsMissing)
  725. }
  726. }
  727. func (s) TestWithBackoffConfigDefault(t *testing.T) {
  728. testBackoffConfigSet(t, &DefaultBackoffConfig)
  729. }
  730. func (s) TestWithBackoffConfig(t *testing.T) {
  731. b := BackoffConfig{MaxDelay: DefaultBackoffConfig.MaxDelay / 2}
  732. expected := b
  733. testBackoffConfigSet(t, &expected, WithBackoffConfig(b))
  734. }
  735. func (s) TestWithBackoffMaxDelay(t *testing.T) {
  736. md := DefaultBackoffConfig.MaxDelay / 2
  737. expected := BackoffConfig{MaxDelay: md}
  738. testBackoffConfigSet(t, &expected, WithBackoffMaxDelay(md))
  739. }
  740. func testBackoffConfigSet(t *testing.T, expected *BackoffConfig, opts ...DialOption) {
  741. opts = append(opts, WithInsecure())
  742. conn, err := Dial("passthrough:///foo:80", opts...)
  743. if err != nil {
  744. t.Fatalf("unexpected error dialing connection: %v", err)
  745. }
  746. defer conn.Close()
  747. if conn.dopts.bs == nil {
  748. t.Fatalf("backoff config not set")
  749. }
  750. actual, ok := conn.dopts.bs.(backoff.Exponential)
  751. if !ok {
  752. t.Fatalf("unexpected type of backoff config: %#v", conn.dopts.bs)
  753. }
  754. expectedValue := backoff.Exponential{
  755. MaxDelay: expected.MaxDelay,
  756. }
  757. if actual != expectedValue {
  758. t.Fatalf("unexpected backoff config on connection: %v, want %v", actual, expected)
  759. }
  760. }
  761. // emptyBalancer returns an empty set of servers.
  762. type emptyBalancer struct {
  763. ch chan []Address
  764. }
  765. func newEmptyBalancer() Balancer {
  766. return &emptyBalancer{ch: make(chan []Address, 1)}
  767. }
  768. func (b *emptyBalancer) Start(_ string, _ BalancerConfig) error {
  769. b.ch <- nil
  770. return nil
  771. }
  772. func (b *emptyBalancer) Up(_ Address) func(error) {
  773. return nil
  774. }
  775. func (b *emptyBalancer) Get(_ context.Context, _ BalancerGetOptions) (Address, func(), error) {
  776. return Address{}, nil, nil
  777. }
  778. func (b *emptyBalancer) Notify() <-chan []Address {
  779. return b.ch
  780. }
  781. func (b *emptyBalancer) Close() error {
  782. close(b.ch)
  783. return nil
  784. }
  785. func (s) TestNonblockingDialWithEmptyBalancer(t *testing.T) {
  786. ctx, cancel := context.WithCancel(context.Background())
  787. defer cancel()
  788. dialDone := make(chan error)
  789. go func() {
  790. dialDone <- func() error {
  791. conn, err := DialContext(ctx, "Non-Existent.Server:80", WithInsecure(), WithBalancer(newEmptyBalancer()))
  792. if err != nil {
  793. return err
  794. }
  795. return conn.Close()
  796. }()
  797. }()
  798. if err := <-dialDone; err != nil {
  799. t.Fatalf("unexpected error dialing connection: %s", err)
  800. }
  801. }
  802. func (s) TestResolverServiceConfigBeforeAddressNotPanic(t *testing.T) {
  803. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  804. defer rcleanup()
  805. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure())
  806. if err != nil {
  807. t.Fatalf("failed to dial: %v", err)
  808. }
  809. defer cc.Close()
  810. // SwitchBalancer before NewAddress. There was no balancer created, this
  811. // makes sure we don't call close on nil balancerWrapper.
  812. r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) // This should not panic.
  813. time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn.
  814. }
  815. func (s) TestResolverServiceConfigWhileClosingNotPanic(t *testing.T) {
  816. for i := 0; i < 10; i++ { // Run this multiple times to make sure it doesn't panic.
  817. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  818. defer rcleanup()
  819. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure())
  820. if err != nil {
  821. t.Fatalf("failed to dial: %v", err)
  822. }
  823. // Send a new service config while closing the ClientConn.
  824. go cc.Close()
  825. go r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) // This should not panic.
  826. }
  827. }
  828. func (s) TestResolverEmptyUpdateNotPanic(t *testing.T) {
  829. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  830. defer rcleanup()
  831. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure())
  832. if err != nil {
  833. t.Fatalf("failed to dial: %v", err)
  834. }
  835. defer cc.Close()
  836. // This make sure we don't create addrConn with empty address list.
  837. r.NewAddress([]resolver.Address{}) // This should not panic.
  838. time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn.
  839. }
  840. func (s) TestClientUpdatesParamsAfterGoAway(t *testing.T) {
  841. lis, err := net.Listen("tcp", "localhost:0")
  842. if err != nil {
  843. t.Fatalf("Failed to listen. Err: %v", err)
  844. }
  845. defer lis.Close()
  846. connected := make(chan struct{})
  847. go func() {
  848. conn, err := lis.Accept()
  849. if err != nil {
  850. t.Errorf("error accepting connection: %v", err)
  851. return
  852. }
  853. defer conn.Close()
  854. f := http2.NewFramer(conn, conn)
  855. // Start a goroutine to read from the conn to prevent the client from
  856. // blocking after it writes its preface.
  857. go func() {
  858. for {
  859. if _, err := f.ReadFrame(); err != nil {
  860. return
  861. }
  862. }
  863. }()
  864. if err := f.WriteSettings(http2.Setting{}); err != nil {
  865. t.Errorf("error writing settings: %v", err)
  866. return
  867. }
  868. <-connected
  869. if err := f.WriteGoAway(0, http2.ErrCodeEnhanceYourCalm, []byte("too_many_pings")); err != nil {
  870. t.Errorf("error writing GOAWAY: %v", err)
  871. return
  872. }
  873. }()
  874. addr := lis.Addr().String()
  875. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  876. defer cancel()
  877. cc, err := DialContext(ctx, addr, WithBlock(), WithInsecure(), WithKeepaliveParams(keepalive.ClientParameters{
  878. Time: 10 * time.Second,
  879. Timeout: 100 * time.Millisecond,
  880. PermitWithoutStream: true,
  881. }))
  882. if err != nil {
  883. t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
  884. }
  885. defer cc.Close()
  886. close(connected)
  887. for {
  888. time.Sleep(10 * time.Millisecond)
  889. cc.mu.RLock()
  890. v := cc.mkp.Time
  891. if v == 20*time.Second {
  892. // Success
  893. cc.mu.RUnlock()
  894. return
  895. }
  896. if ctx.Err() != nil {
  897. // Timeout
  898. t.Fatalf("cc.dopts.copts.Keepalive.Time = %v , want 20s", v)
  899. }
  900. cc.mu.RUnlock()
  901. }
  902. }
  903. func (s) TestDisableServiceConfigOption(t *testing.T) {
  904. r, cleanup := manual.GenerateAndRegisterManualResolver()
  905. defer cleanup()
  906. addr := r.Scheme() + ":///non.existent"
  907. cc, err := Dial(addr, WithInsecure(), WithDisableServiceConfig())
  908. if err != nil {
  909. t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
  910. }
  911. defer cc.Close()
  912. r.NewServiceConfig(`{
  913. "methodConfig": [
  914. {
  915. "name": [
  916. {
  917. "service": "foo",
  918. "method": "Bar"
  919. }
  920. ],
  921. "waitForReady": true
  922. }
  923. ]
  924. }`)
  925. time.Sleep(1 * time.Second)
  926. m := cc.GetMethodConfig("/foo/Bar")
  927. if m.WaitForReady != nil {
  928. t.Fatalf("want: method (\"/foo/bar/\") config to be empty, got: %v", m)
  929. }
  930. }
  931. func (s) TestGetClientConnTarget(t *testing.T) {
  932. addr := "nonexist:///non.existent"
  933. cc, err := Dial(addr, WithInsecure())
  934. if err != nil {
  935. t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
  936. }
  937. defer cc.Close()
  938. if cc.Target() != addr {
  939. t.Fatalf("Target() = %s, want %s", cc.Target(), addr)
  940. }
  941. }
  942. type backoffForever struct{}
  943. func (b backoffForever) Backoff(int) time.Duration { return time.Duration(math.MaxInt64) }
  944. func (s) TestResetConnectBackoff(t *testing.T) {
  945. dials := make(chan struct{})
  946. defer func() { // If we fail, let the http2client break out of dialing.
  947. select {
  948. case <-dials:
  949. default:
  950. }
  951. }()
  952. dialer := func(string, time.Duration) (net.Conn, error) {
  953. dials <- struct{}{}
  954. return nil, errors.New("failed to fake dial")
  955. }
  956. cc, err := Dial("any", WithInsecure(), WithDialer(dialer), withBackoff(backoffForever{}))
  957. if err != nil {
  958. t.Fatalf("Dial() = _, %v; want _, nil", err)
  959. }
  960. defer cc.Close()
  961. select {
  962. case <-dials:
  963. case <-time.NewTimer(10 * time.Second).C:
  964. t.Fatal("Failed to call dial within 10s")
  965. }
  966. select {
  967. case <-dials:
  968. t.Fatal("Dial called unexpectedly before resetting backoff")
  969. case <-time.NewTimer(100 * time.Millisecond).C:
  970. }
  971. cc.ResetConnectBackoff()
  972. select {
  973. case <-dials:
  974. case <-time.NewTimer(10 * time.Second).C:
  975. t.Fatal("Failed to call dial within 10s after resetting backoff")
  976. }
  977. }
  978. func (s) TestBackoffCancel(t *testing.T) {
  979. dialStrCh := make(chan string)
  980. cc, err := Dial("any", WithInsecure(), WithDialer(func(t string, _ time.Duration) (net.Conn, error) {
  981. dialStrCh <- t
  982. return nil, fmt.Errorf("test dialer, always error")
  983. }))
  984. if err != nil {
  985. t.Fatalf("Failed to create ClientConn: %v", err)
  986. }
  987. <-dialStrCh
  988. cc.Close()
  989. // Should not leak. May need -count 5000 to exercise.
  990. }
  991. // UpdateAddresses should cause the next reconnect to begin from the top of the
  992. // list if the connection is not READY.
  993. func (s) TestUpdateAddresses_RetryFromFirstAddr(t *testing.T) {
  994. cleanup := setMinConnectTimeout(time.Hour)
  995. defer cleanup()
  996. lis1, err := net.Listen("tcp", "localhost:0")
  997. if err != nil {
  998. t.Fatalf("Error while listening. Err: %v", err)
  999. }
  1000. defer lis1.Close()
  1001. lis2, err := net.Listen("tcp", "localhost:0")
  1002. if err != nil {
  1003. t.Fatalf("Error while listening. Err: %v", err)
  1004. }
  1005. defer lis2.Close()
  1006. lis3, err := net.Listen("tcp", "localhost:0")
  1007. if err != nil {
  1008. t.Fatalf("Error while listening. Err: %v", err)
  1009. }
  1010. defer lis3.Close()
  1011. closeServer2 := make(chan struct{})
  1012. server1ContactedFirstTime := make(chan struct{})
  1013. server1ContactedSecondTime := make(chan struct{})
  1014. server2ContactedFirstTime := make(chan struct{})
  1015. server2ContactedSecondTime := make(chan struct{})
  1016. server3Contacted := make(chan struct{})
  1017. // Launch server 1.
  1018. go func() {
  1019. // First, let's allow the initial connection to go READY. We need to do
  1020. // this because tryUpdateAddrs only works after there's some non-nil
  1021. // address on the ac, and curAddress is only set after READY.
  1022. conn1, err := lis1.Accept()
  1023. if err != nil {
  1024. t.Error(err)
  1025. return
  1026. }
  1027. go keepReading(conn1)
  1028. framer := http2.NewFramer(conn1, conn1)
  1029. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  1030. t.Errorf("Error while writing settings frame. %v", err)
  1031. return
  1032. }
  1033. // nextStateNotifier() is updated after balancerBuilder.Build(), which is
  1034. // called by grpc.Dial. It's safe to do it here because lis1.Accept blocks
  1035. // until balancer is built to process the addresses.
  1036. stateNotifications := testBalancerBuilder.nextStateNotifier()
  1037. // Wait for the transport to become ready.
  1038. for s := range stateNotifications {
  1039. if s == connectivity.Ready {
  1040. break
  1041. }
  1042. }
  1043. // Once it's ready, curAddress has been set. So let's close this
  1044. // connection prompting the first reconnect cycle.
  1045. conn1.Close()
  1046. // Accept and immediately close, causing it to go to server2.
  1047. conn2, err := lis1.Accept()
  1048. if err != nil {
  1049. t.Error(err)
  1050. return
  1051. }
  1052. close(server1ContactedFirstTime)
  1053. conn2.Close()
  1054. // Hopefully it picks this server after tryUpdateAddrs.
  1055. lis1.Accept()
  1056. close(server1ContactedSecondTime)
  1057. }()
  1058. // Launch server 2.
  1059. go func() {
  1060. // Accept and then hang waiting for the test call tryUpdateAddrs and
  1061. // then signal to this server to close. After this server closes, it
  1062. // should start from the top instead of trying server2 or continuing
  1063. // to server3.
  1064. conn, err := lis2.Accept()
  1065. if err != nil {
  1066. t.Error(err)
  1067. return
  1068. }
  1069. close(server2ContactedFirstTime)
  1070. <-closeServer2
  1071. conn.Close()
  1072. // After tryUpdateAddrs, it should NOT try server2.
  1073. lis2.Accept()
  1074. close(server2ContactedSecondTime)
  1075. }()
  1076. // Launch server 3.
  1077. go func() {
  1078. // After tryUpdateAddrs, it should NOT try server3. (or any other time)
  1079. lis3.Accept()
  1080. close(server3Contacted)
  1081. }()
  1082. addrsList := []resolver.Address{
  1083. {Addr: lis1.Addr().String()},
  1084. {Addr: lis2.Addr().String()},
  1085. {Addr: lis3.Addr().String()},
  1086. }
  1087. rb := manual.NewBuilderWithScheme("whatever")
  1088. rb.InitialAddrs(addrsList)
  1089. client, err := Dial("this-gets-overwritten", WithInsecure(), WithWaitForHandshake(), withResolverBuilder(rb), withBackoff(noBackoff{}), WithBalancerName(stateRecordingBalancerName))
  1090. if err != nil {
  1091. t.Fatal(err)
  1092. }
  1093. defer client.Close()
  1094. timeout := time.After(5 * time.Second)
  1095. // Wait for server1 to be contacted (which will immediately fail), then
  1096. // server2 (which will hang waiting for our signal).
  1097. select {
  1098. case <-server1ContactedFirstTime:
  1099. case <-timeout:
  1100. t.Fatal("timed out waiting for server1 to be contacted")
  1101. }
  1102. select {
  1103. case <-server2ContactedFirstTime:
  1104. case <-timeout:
  1105. t.Fatal("timed out waiting for server2 to be contacted")
  1106. }
  1107. // Grab the addrConn and call tryUpdateAddrs.
  1108. var ac *addrConn
  1109. client.mu.Lock()
  1110. for clientAC := range client.conns {
  1111. ac = clientAC
  1112. break
  1113. }
  1114. client.mu.Unlock()
  1115. ac.acbw.UpdateAddresses(addrsList)
  1116. // We've called tryUpdateAddrs - now let's make server2 close the
  1117. // connection and check that it goes back to server1 instead of continuing
  1118. // to server3 or trying server2 again.
  1119. close(closeServer2)
  1120. select {
  1121. case <-server1ContactedSecondTime:
  1122. case <-server2ContactedSecondTime:
  1123. t.Fatal("server2 was contacted a second time, but it after tryUpdateAddrs it should have re-started the list and tried server1")
  1124. case <-server3Contacted:
  1125. t.Fatal("server3 was contacted, but after tryUpdateAddrs it should have re-started the list and tried server1")
  1126. case <-timeout:
  1127. t.Fatal("timed out waiting for any server to be contacted after tryUpdateAddrs")
  1128. }
  1129. }
  1130. // Set the minConnectTimeout. Be sure to defer cleanup!
  1131. func setMinConnectTimeout(newMin time.Duration) (cleanup func()) {
  1132. mctBkp := getMinConnectTimeout()
  1133. atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(newMin))
  1134. return func() {
  1135. atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(mctBkp))
  1136. }
  1137. }