Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.
 
 
 

515 wiersze
13 KiB

  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "net"
  22. "sync"
  23. "sync/atomic"
  24. "testing"
  25. "time"
  26. "golang.org/x/net/http2"
  27. "google.golang.org/grpc/balancer"
  28. "google.golang.org/grpc/connectivity"
  29. "google.golang.org/grpc/internal/testutils"
  30. "google.golang.org/grpc/resolver"
  31. "google.golang.org/grpc/resolver/manual"
  32. )
  33. const stateRecordingBalancerName = "state_recoding_balancer"
  34. var testBalancerBuilder = newStateRecordingBalancerBuilder()
  35. func init() {
  36. balancer.Register(testBalancerBuilder)
  37. }
  38. // These tests use a pipeListener. This listener is similar to net.Listener
  39. // except that it is unbuffered, so each read and write will wait for the other
  40. // side's corresponding write or read.
  41. func (s) TestStateTransitions_SingleAddress(t *testing.T) {
  42. mctBkp := getMinConnectTimeout()
  43. defer func() {
  44. atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(mctBkp))
  45. }()
  46. atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(time.Millisecond)*100)
  47. for _, test := range []struct {
  48. desc string
  49. want []connectivity.State
  50. server func(net.Listener) net.Conn
  51. }{
  52. {
  53. desc: "When the server returns server preface, the client enters READY.",
  54. want: []connectivity.State{
  55. connectivity.Connecting,
  56. connectivity.Ready,
  57. },
  58. server: func(lis net.Listener) net.Conn {
  59. conn, err := lis.Accept()
  60. if err != nil {
  61. t.Error(err)
  62. return nil
  63. }
  64. go keepReading(conn)
  65. framer := http2.NewFramer(conn, conn)
  66. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  67. t.Errorf("Error while writing settings frame. %v", err)
  68. return nil
  69. }
  70. return conn
  71. },
  72. },
  73. {
  74. desc: "When the connection is closed, the client enters TRANSIENT FAILURE.",
  75. want: []connectivity.State{
  76. connectivity.Connecting,
  77. connectivity.TransientFailure,
  78. },
  79. server: func(lis net.Listener) net.Conn {
  80. conn, err := lis.Accept()
  81. if err != nil {
  82. t.Error(err)
  83. return nil
  84. }
  85. conn.Close()
  86. return nil
  87. },
  88. },
  89. {
  90. desc: `When the server sends its connection preface, but the connection dies before the client can write its
  91. connection preface, the client enters TRANSIENT FAILURE.`,
  92. want: []connectivity.State{
  93. connectivity.Connecting,
  94. connectivity.TransientFailure,
  95. },
  96. server: func(lis net.Listener) net.Conn {
  97. conn, err := lis.Accept()
  98. if err != nil {
  99. t.Error(err)
  100. return nil
  101. }
  102. framer := http2.NewFramer(conn, conn)
  103. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  104. t.Errorf("Error while writing settings frame. %v", err)
  105. return nil
  106. }
  107. conn.Close()
  108. return nil
  109. },
  110. },
  111. {
  112. desc: `When the server reads the client connection preface but does not send its connection preface, the
  113. client enters TRANSIENT FAILURE.`,
  114. want: []connectivity.State{
  115. connectivity.Connecting,
  116. connectivity.TransientFailure,
  117. },
  118. server: func(lis net.Listener) net.Conn {
  119. conn, err := lis.Accept()
  120. if err != nil {
  121. t.Error(err)
  122. return nil
  123. }
  124. go keepReading(conn)
  125. return conn
  126. },
  127. },
  128. } {
  129. t.Log(test.desc)
  130. testStateTransitionSingleAddress(t, test.want, test.server)
  131. }
  132. }
  133. func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) {
  134. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  135. defer cancel()
  136. pl := testutils.NewPipeListener()
  137. defer pl.Close()
  138. // Launch the server.
  139. var conn net.Conn
  140. var connMu sync.Mutex
  141. go func() {
  142. connMu.Lock()
  143. conn = server(pl)
  144. connMu.Unlock()
  145. }()
  146. client, err := DialContext(ctx, "", WithWaitForHandshake(), WithInsecure(),
  147. WithBalancerName(stateRecordingBalancerName), WithDialer(pl.Dialer()), withBackoff(noBackoff{}))
  148. if err != nil {
  149. t.Fatal(err)
  150. }
  151. defer client.Close()
  152. stateNotifications := testBalancerBuilder.nextStateNotifier()
  153. timeout := time.After(5 * time.Second)
  154. for i := 0; i < len(want); i++ {
  155. select {
  156. case <-timeout:
  157. t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
  158. case seen := <-stateNotifications:
  159. if seen != want[i] {
  160. t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
  161. }
  162. }
  163. }
  164. connMu.Lock()
  165. defer connMu.Unlock()
  166. if conn != nil {
  167. err = conn.Close()
  168. if err != nil {
  169. t.Fatal(err)
  170. }
  171. }
  172. }
  173. // When a READY connection is closed, the client enters TRANSIENT FAILURE before CONNECTING.
  174. func (s) TestStateTransitions_ReadyToTransientFailure(t *testing.T) {
  175. want := []connectivity.State{
  176. connectivity.Connecting,
  177. connectivity.Ready,
  178. connectivity.TransientFailure,
  179. connectivity.Connecting,
  180. }
  181. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  182. defer cancel()
  183. lis, err := net.Listen("tcp", "localhost:0")
  184. if err != nil {
  185. t.Fatalf("Error while listening. Err: %v", err)
  186. }
  187. defer lis.Close()
  188. sawReady := make(chan struct{})
  189. // Launch the server.
  190. go func() {
  191. conn, err := lis.Accept()
  192. if err != nil {
  193. t.Error(err)
  194. return
  195. }
  196. go keepReading(conn)
  197. framer := http2.NewFramer(conn, conn)
  198. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  199. t.Errorf("Error while writing settings frame. %v", err)
  200. return
  201. }
  202. // Prevents race between onPrefaceReceipt and onClose.
  203. <-sawReady
  204. conn.Close()
  205. }()
  206. client, err := DialContext(ctx, lis.Addr().String(), WithWaitForHandshake(), WithInsecure(), WithBalancerName(stateRecordingBalancerName))
  207. if err != nil {
  208. t.Fatal(err)
  209. }
  210. defer client.Close()
  211. stateNotifications := testBalancerBuilder.nextStateNotifier()
  212. timeout := time.After(5 * time.Second)
  213. for i := 0; i < len(want); i++ {
  214. select {
  215. case <-timeout:
  216. t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
  217. case seen := <-stateNotifications:
  218. if seen == connectivity.Ready {
  219. close(sawReady)
  220. }
  221. if seen != want[i] {
  222. t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
  223. }
  224. }
  225. }
  226. }
  227. // When the first connection is closed, the client enters stays in CONNECTING
  228. // until it tries the second address (which succeeds, and then it enters READY).
  229. func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {
  230. want := []connectivity.State{
  231. connectivity.Connecting,
  232. connectivity.Ready,
  233. }
  234. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  235. defer cancel()
  236. lis1, err := net.Listen("tcp", "localhost:0")
  237. if err != nil {
  238. t.Fatalf("Error while listening. Err: %v", err)
  239. }
  240. defer lis1.Close()
  241. lis2, err := net.Listen("tcp", "localhost:0")
  242. if err != nil {
  243. t.Fatalf("Error while listening. Err: %v", err)
  244. }
  245. defer lis2.Close()
  246. server1Done := make(chan struct{})
  247. server2Done := make(chan struct{})
  248. // Launch server 1.
  249. go func() {
  250. conn, err := lis1.Accept()
  251. if err != nil {
  252. t.Error(err)
  253. return
  254. }
  255. conn.Close()
  256. close(server1Done)
  257. }()
  258. // Launch server 2.
  259. go func() {
  260. conn, err := lis2.Accept()
  261. if err != nil {
  262. t.Error(err)
  263. return
  264. }
  265. go keepReading(conn)
  266. framer := http2.NewFramer(conn, conn)
  267. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  268. t.Errorf("Error while writing settings frame. %v", err)
  269. return
  270. }
  271. close(server2Done)
  272. }()
  273. rb := manual.NewBuilderWithScheme("whatever")
  274. rb.InitialAddrs([]resolver.Address{
  275. {Addr: lis1.Addr().String()},
  276. {Addr: lis2.Addr().String()},
  277. })
  278. client, err := DialContext(ctx, "this-gets-overwritten", WithInsecure(), WithWaitForHandshake(), WithBalancerName(stateRecordingBalancerName), withResolverBuilder(rb))
  279. if err != nil {
  280. t.Fatal(err)
  281. }
  282. defer client.Close()
  283. stateNotifications := testBalancerBuilder.nextStateNotifier()
  284. timeout := time.After(5 * time.Second)
  285. for i := 0; i < len(want); i++ {
  286. select {
  287. case <-timeout:
  288. t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
  289. case seen := <-stateNotifications:
  290. if seen != want[i] {
  291. t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
  292. }
  293. }
  294. }
  295. select {
  296. case <-timeout:
  297. t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
  298. case <-server1Done:
  299. }
  300. select {
  301. case <-timeout:
  302. t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 2")
  303. case <-server2Done:
  304. }
  305. }
  306. // When there are multiple addresses, and we enter READY on one of them, a
  307. // later closure should cause the client to enter TRANSIENT FAILURE before it
  308. // re-enters CONNECTING.
  309. func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
  310. want := []connectivity.State{
  311. connectivity.Connecting,
  312. connectivity.Ready,
  313. connectivity.TransientFailure,
  314. connectivity.Connecting,
  315. }
  316. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  317. defer cancel()
  318. lis1, err := net.Listen("tcp", "localhost:0")
  319. if err != nil {
  320. t.Fatalf("Error while listening. Err: %v", err)
  321. }
  322. defer lis1.Close()
  323. // Never actually gets used; we just want it to be alive so that the resolver has two addresses to target.
  324. lis2, err := net.Listen("tcp", "localhost:0")
  325. if err != nil {
  326. t.Fatalf("Error while listening. Err: %v", err)
  327. }
  328. defer lis2.Close()
  329. server1Done := make(chan struct{})
  330. sawReady := make(chan struct{})
  331. // Launch server 1.
  332. go func() {
  333. conn, err := lis1.Accept()
  334. if err != nil {
  335. t.Error(err)
  336. return
  337. }
  338. go keepReading(conn)
  339. framer := http2.NewFramer(conn, conn)
  340. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  341. t.Errorf("Error while writing settings frame. %v", err)
  342. return
  343. }
  344. <-sawReady
  345. conn.Close()
  346. _, err = lis1.Accept()
  347. if err != nil {
  348. t.Error(err)
  349. return
  350. }
  351. close(server1Done)
  352. }()
  353. rb := manual.NewBuilderWithScheme("whatever")
  354. rb.InitialAddrs([]resolver.Address{
  355. {Addr: lis1.Addr().String()},
  356. {Addr: lis2.Addr().String()},
  357. })
  358. client, err := DialContext(ctx, "this-gets-overwritten", WithInsecure(), WithWaitForHandshake(), WithBalancerName(stateRecordingBalancerName), withResolverBuilder(rb))
  359. if err != nil {
  360. t.Fatal(err)
  361. }
  362. defer client.Close()
  363. stateNotifications := testBalancerBuilder.nextStateNotifier()
  364. timeout := time.After(2 * time.Second)
  365. for i := 0; i < len(want); i++ {
  366. select {
  367. case <-timeout:
  368. t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
  369. case seen := <-stateNotifications:
  370. if seen == connectivity.Ready {
  371. close(sawReady)
  372. }
  373. if seen != want[i] {
  374. t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
  375. }
  376. }
  377. }
  378. select {
  379. case <-timeout:
  380. t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
  381. case <-server1Done:
  382. }
  383. }
  384. type stateRecordingBalancer struct {
  385. notifier chan<- connectivity.State
  386. balancer.Balancer
  387. }
  388. func (b *stateRecordingBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
  389. b.notifier <- s
  390. b.Balancer.HandleSubConnStateChange(sc, s)
  391. }
  392. func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) {
  393. b.notifier = r
  394. }
  395. func (b *stateRecordingBalancer) Close() {
  396. b.Balancer.Close()
  397. }
  398. type stateRecordingBalancerBuilder struct {
  399. mu sync.Mutex
  400. notifier chan connectivity.State // The notifier used in the last Balancer.
  401. }
  402. func newStateRecordingBalancerBuilder() *stateRecordingBalancerBuilder {
  403. return &stateRecordingBalancerBuilder{}
  404. }
  405. func (b *stateRecordingBalancerBuilder) Name() string {
  406. return stateRecordingBalancerName
  407. }
  408. func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
  409. stateNotifications := make(chan connectivity.State, 10)
  410. b.mu.Lock()
  411. b.notifier = stateNotifications
  412. b.mu.Unlock()
  413. return &stateRecordingBalancer{
  414. notifier: stateNotifications,
  415. Balancer: balancer.Get(PickFirstBalancerName).Build(cc, opts),
  416. }
  417. }
  418. func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.State {
  419. b.mu.Lock()
  420. defer b.mu.Unlock()
  421. ret := b.notifier
  422. b.notifier = nil
  423. return ret
  424. }
  425. type noBackoff struct{}
  426. func (b noBackoff) Backoff(int) time.Duration { return time.Duration(0) }
  427. // Keep reading until something causes the connection to die (EOF, server
  428. // closed, etc). Useful as a tool for mindlessly keeping the connection
  429. // healthy, since the client will error if things like client prefaces are not
  430. // accepted in a timely fashion.
  431. func keepReading(conn net.Conn) {
  432. buf := make([]byte, 1024)
  433. for _, err := conn.Read(buf); err == nil; _, err = conn.Read(buf) {
  434. }
  435. }