Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.
 
 
 

512 linhas
15 KiB

  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "fmt"
  22. "math"
  23. "testing"
  24. "time"
  25. "google.golang.org/grpc/balancer"
  26. "google.golang.org/grpc/balancer/roundrobin"
  27. "google.golang.org/grpc/connectivity"
  28. _ "google.golang.org/grpc/grpclog/glogger"
  29. "google.golang.org/grpc/internal"
  30. "google.golang.org/grpc/resolver"
  31. "google.golang.org/grpc/resolver/manual"
  32. )
  33. var _ balancer.Builder = &magicalLB{}
  34. var _ balancer.Balancer = &magicalLB{}
  35. // magicalLB is a ringer for grpclb. It is used to avoid circular dependencies on the grpclb package
  36. type magicalLB struct{}
  37. func (b *magicalLB) Name() string {
  38. return "grpclb"
  39. }
  40. func (b *magicalLB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
  41. return b
  42. }
  43. func (b *magicalLB) HandleSubConnStateChange(balancer.SubConn, connectivity.State) {}
  44. func (b *magicalLB) HandleResolvedAddrs([]resolver.Address, error) {}
  45. func (b *magicalLB) Close() {}
  46. func init() {
  47. balancer.Register(&magicalLB{})
  48. }
  49. func checkPickFirst(cc *ClientConn, servers []*server) error {
  50. var (
  51. req = "port"
  52. reply string
  53. err error
  54. )
  55. connected := false
  56. for i := 0; i < 5000; i++ {
  57. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); errorDesc(err) == servers[0].port {
  58. if connected {
  59. // connected is set to false if peer is not server[0]. So if
  60. // connected is true here, this is the second time we saw
  61. // server[0] in a row. Break because pickfirst is in effect.
  62. break
  63. }
  64. connected = true
  65. } else {
  66. connected = false
  67. }
  68. time.Sleep(time.Millisecond)
  69. }
  70. if !connected {
  71. return fmt.Errorf("pickfirst is not in effect after 5 second, EmptyCall() = _, %v, want _, %v", err, servers[0].port)
  72. }
  73. // The following RPCs should all succeed with the first server.
  74. for i := 0; i < 3; i++ {
  75. err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
  76. if errorDesc(err) != servers[0].port {
  77. return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[0].port, err)
  78. }
  79. }
  80. return nil
  81. }
  82. func checkRoundRobin(cc *ClientConn, servers []*server) error {
  83. var (
  84. req = "port"
  85. reply string
  86. err error
  87. )
  88. // Make sure connections to all servers are up.
  89. for i := 0; i < 2; i++ {
  90. // Do this check twice, otherwise the first RPC's transport may still be
  91. // picked by the closing pickfirst balancer, and the test becomes flaky.
  92. for _, s := range servers {
  93. var up bool
  94. for i := 0; i < 5000; i++ {
  95. if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); errorDesc(err) == s.port {
  96. up = true
  97. break
  98. }
  99. time.Sleep(time.Millisecond)
  100. }
  101. if !up {
  102. return fmt.Errorf("server %v is not up within 5 second", s.port)
  103. }
  104. }
  105. }
  106. serverCount := len(servers)
  107. for i := 0; i < 3*serverCount; i++ {
  108. err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
  109. if errorDesc(err) != servers[i%serverCount].port {
  110. return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[i%serverCount].port, err)
  111. }
  112. }
  113. return nil
  114. }
  115. func (s) TestSwitchBalancer(t *testing.T) {
  116. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  117. defer rcleanup()
  118. const numServers = 2
  119. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  120. defer scleanup()
  121. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  122. if err != nil {
  123. t.Fatalf("failed to dial: %v", err)
  124. }
  125. defer cc.Close()
  126. r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
  127. // The default balancer is pickfirst.
  128. if err := checkPickFirst(cc, servers); err != nil {
  129. t.Fatalf("check pickfirst returned non-nil error: %v", err)
  130. }
  131. // Switch to roundrobin.
  132. cc.handleServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
  133. if err := checkRoundRobin(cc, servers); err != nil {
  134. t.Fatalf("check roundrobin returned non-nil error: %v", err)
  135. }
  136. // Switch to pickfirst.
  137. cc.handleServiceConfig(`{"loadBalancingPolicy": "pick_first"}`)
  138. if err := checkPickFirst(cc, servers); err != nil {
  139. t.Fatalf("check pickfirst returned non-nil error: %v", err)
  140. }
  141. }
  142. // Test that balancer specified by dial option will not be overridden.
  143. func (s) TestBalancerDialOption(t *testing.T) {
  144. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  145. defer rcleanup()
  146. const numServers = 2
  147. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  148. defer scleanup()
  149. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name))
  150. if err != nil {
  151. t.Fatalf("failed to dial: %v", err)
  152. }
  153. defer cc.Close()
  154. r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
  155. // The init balancer is roundrobin.
  156. if err := checkRoundRobin(cc, servers); err != nil {
  157. t.Fatalf("check roundrobin returned non-nil error: %v", err)
  158. }
  159. // Switch to pickfirst.
  160. cc.handleServiceConfig(`{"loadBalancingPolicy": "pick_first"}`)
  161. // Balancer is still roundrobin.
  162. if err := checkRoundRobin(cc, servers); err != nil {
  163. t.Fatalf("check roundrobin returned non-nil error: %v", err)
  164. }
  165. }
  166. // First addr update contains grpclb.
  167. func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) {
  168. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  169. defer rcleanup()
  170. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  171. if err != nil {
  172. t.Fatalf("failed to dial: %v", err)
  173. }
  174. defer cc.Close()
  175. // ClientConn will switch balancer to grpclb when receives an address of
  176. // type GRPCLB.
  177. r.NewAddress([]resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}})
  178. var isGRPCLB bool
  179. for i := 0; i < 5000; i++ {
  180. cc.mu.Lock()
  181. isGRPCLB = cc.curBalancerName == "grpclb"
  182. cc.mu.Unlock()
  183. if isGRPCLB {
  184. break
  185. }
  186. time.Sleep(time.Millisecond)
  187. }
  188. if !isGRPCLB {
  189. t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
  190. }
  191. // New update containing new backend and new grpclb. Should not switch
  192. // balancer.
  193. r.NewAddress([]resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}})
  194. for i := 0; i < 200; i++ {
  195. cc.mu.Lock()
  196. isGRPCLB = cc.curBalancerName == "grpclb"
  197. cc.mu.Unlock()
  198. if !isGRPCLB {
  199. break
  200. }
  201. time.Sleep(time.Millisecond)
  202. }
  203. if !isGRPCLB {
  204. t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb")
  205. }
  206. var isPickFirst bool
  207. // Switch balancer to pickfirst.
  208. r.NewAddress([]resolver.Address{{Addr: "backend"}})
  209. for i := 0; i < 5000; i++ {
  210. cc.mu.Lock()
  211. isPickFirst = cc.curBalancerName == PickFirstBalancerName
  212. cc.mu.Unlock()
  213. if isPickFirst {
  214. break
  215. }
  216. time.Sleep(time.Millisecond)
  217. }
  218. if !isPickFirst {
  219. t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
  220. }
  221. }
  222. // First addr update does not contain grpclb.
  223. func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) {
  224. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  225. defer rcleanup()
  226. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  227. if err != nil {
  228. t.Fatalf("failed to dial: %v", err)
  229. }
  230. defer cc.Close()
  231. r.NewAddress([]resolver.Address{{Addr: "backend"}})
  232. var isPickFirst bool
  233. for i := 0; i < 5000; i++ {
  234. cc.mu.Lock()
  235. isPickFirst = cc.curBalancerName == PickFirstBalancerName
  236. cc.mu.Unlock()
  237. if isPickFirst {
  238. break
  239. }
  240. time.Sleep(time.Millisecond)
  241. }
  242. if !isPickFirst {
  243. t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
  244. }
  245. // ClientConn will switch balancer to grpclb when receives an address of
  246. // type GRPCLB.
  247. r.NewAddress([]resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}})
  248. var isGRPCLB bool
  249. for i := 0; i < 5000; i++ {
  250. cc.mu.Lock()
  251. isGRPCLB = cc.curBalancerName == "grpclb"
  252. cc.mu.Unlock()
  253. if isGRPCLB {
  254. break
  255. }
  256. time.Sleep(time.Millisecond)
  257. }
  258. if !isGRPCLB {
  259. t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
  260. }
  261. // New update containing new backend and new grpclb. Should not switch
  262. // balancer.
  263. r.NewAddress([]resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}})
  264. for i := 0; i < 200; i++ {
  265. cc.mu.Lock()
  266. isGRPCLB = cc.curBalancerName == "grpclb"
  267. cc.mu.Unlock()
  268. if !isGRPCLB {
  269. break
  270. }
  271. time.Sleep(time.Millisecond)
  272. }
  273. if !isGRPCLB {
  274. t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb")
  275. }
  276. // Switch balancer back.
  277. r.NewAddress([]resolver.Address{{Addr: "backend"}})
  278. for i := 0; i < 5000; i++ {
  279. cc.mu.Lock()
  280. isPickFirst = cc.curBalancerName == PickFirstBalancerName
  281. cc.mu.Unlock()
  282. if isPickFirst {
  283. break
  284. }
  285. time.Sleep(time.Millisecond)
  286. }
  287. if !isPickFirst {
  288. t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
  289. }
  290. }
  291. // Test that if the current balancer is roundrobin, after switching to grpclb,
  292. // when the resolved address doesn't contain grpclb addresses, balancer will be
  293. // switched back to roundrobin.
  294. func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
  295. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  296. defer rcleanup()
  297. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  298. if err != nil {
  299. t.Fatalf("failed to dial: %v", err)
  300. }
  301. defer cc.Close()
  302. r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
  303. r.NewAddress([]resolver.Address{{Addr: "backend"}})
  304. var isRoundRobin bool
  305. for i := 0; i < 5000; i++ {
  306. cc.mu.Lock()
  307. isRoundRobin = cc.curBalancerName == "round_robin"
  308. cc.mu.Unlock()
  309. if isRoundRobin {
  310. break
  311. }
  312. time.Sleep(time.Millisecond)
  313. }
  314. if !isRoundRobin {
  315. t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
  316. }
  317. // ClientConn will switch balancer to grpclb when receives an address of
  318. // type GRPCLB.
  319. r.NewAddress([]resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}})
  320. var isGRPCLB bool
  321. for i := 0; i < 5000; i++ {
  322. cc.mu.Lock()
  323. isGRPCLB = cc.curBalancerName == "grpclb"
  324. cc.mu.Unlock()
  325. if isGRPCLB {
  326. break
  327. }
  328. time.Sleep(time.Millisecond)
  329. }
  330. if !isGRPCLB {
  331. t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
  332. }
  333. // Switch balancer back.
  334. r.NewAddress([]resolver.Address{{Addr: "backend"}})
  335. for i := 0; i < 5000; i++ {
  336. cc.mu.Lock()
  337. isRoundRobin = cc.curBalancerName == "round_robin"
  338. cc.mu.Unlock()
  339. if isRoundRobin {
  340. break
  341. }
  342. time.Sleep(time.Millisecond)
  343. }
  344. if !isRoundRobin {
  345. t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
  346. }
  347. }
  348. // Test that if resolved address list contains grpclb, the balancer option in
  349. // service config won't take effect. But when there's no grpclb address in a new
  350. // resolved address list, balancer will be switched to the new one.
  351. func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
  352. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  353. defer rcleanup()
  354. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  355. if err != nil {
  356. t.Fatalf("failed to dial: %v", err)
  357. }
  358. defer cc.Close()
  359. r.NewAddress([]resolver.Address{{Addr: "backend"}})
  360. var isPickFirst bool
  361. for i := 0; i < 5000; i++ {
  362. cc.mu.Lock()
  363. isPickFirst = cc.curBalancerName == PickFirstBalancerName
  364. cc.mu.Unlock()
  365. if isPickFirst {
  366. break
  367. }
  368. time.Sleep(time.Millisecond)
  369. }
  370. if !isPickFirst {
  371. t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
  372. }
  373. // ClientConn will switch balancer to grpclb when receives an address of
  374. // type GRPCLB.
  375. r.NewAddress([]resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}})
  376. var isGRPCLB bool
  377. for i := 0; i < 5000; i++ {
  378. cc.mu.Lock()
  379. isGRPCLB = cc.curBalancerName == "grpclb"
  380. cc.mu.Unlock()
  381. if isGRPCLB {
  382. break
  383. }
  384. time.Sleep(time.Millisecond)
  385. }
  386. if !isGRPCLB {
  387. t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
  388. }
  389. r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
  390. var isRoundRobin bool
  391. for i := 0; i < 200; i++ {
  392. cc.mu.Lock()
  393. isRoundRobin = cc.curBalancerName == "round_robin"
  394. cc.mu.Unlock()
  395. if isRoundRobin {
  396. break
  397. }
  398. time.Sleep(time.Millisecond)
  399. }
  400. // Balancer should NOT switch to round_robin because resolved list contains
  401. // grpclb.
  402. if isRoundRobin {
  403. t.Fatalf("within 200 ms, cc.balancer switched to round_robin, want grpclb")
  404. }
  405. // Switch balancer back.
  406. r.NewAddress([]resolver.Address{{Addr: "backend"}})
  407. for i := 0; i < 5000; i++ {
  408. cc.mu.Lock()
  409. isRoundRobin = cc.curBalancerName == "round_robin"
  410. cc.mu.Unlock()
  411. if isRoundRobin {
  412. break
  413. }
  414. time.Sleep(time.Millisecond)
  415. }
  416. if !isRoundRobin {
  417. t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
  418. }
  419. }
  420. // Test that when switching to grpclb fails because grpclb is not registered,
  421. // the fallback balancer will only get backend addresses, not the grpclb server
  422. // address.
  423. //
  424. // The tests sends 3 server addresses (all backends) as resolved addresses, but
  425. // claim the first one is grpclb server. The all RPCs should all be send to the
  426. // other addresses, not the first one.
  427. func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
  428. internal.BalancerUnregister("grpclb")
  429. defer balancer.Register(&magicalLB{})
  430. r, rcleanup := manual.GenerateAndRegisterManualResolver()
  431. defer rcleanup()
  432. const numServers = 3
  433. servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
  434. defer scleanup()
  435. cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
  436. if err != nil {
  437. t.Fatalf("failed to dial: %v", err)
  438. }
  439. defer cc.Close()
  440. r.NewAddress([]resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}})
  441. // The default balancer is pickfirst.
  442. if err := checkPickFirst(cc, servers[1:]); err != nil {
  443. t.Fatalf("check pickfirst returned non-nil error: %v", err)
  444. }
  445. // Try switching to grpclb by sending servers[0] as grpclb address. It's
  446. // expected that servers[0] will be filtered out, so it will not be used by
  447. // the balancer.
  448. //
  449. // If the filtering failed, servers[0] will be used for RPCs and the RPCs
  450. // will succeed. The following checks will catch this and fail.
  451. r.NewAddress([]resolver.Address{
  452. {Addr: servers[0].addr, Type: resolver.GRPCLB},
  453. {Addr: servers[1].addr}, {Addr: servers[2].addr}})
  454. // Still check for pickfirst, but only with server[1] and server[2].
  455. if err := checkPickFirst(cc, servers[1:]); err != nil {
  456. t.Fatalf("check pickfirst returned non-nil error: %v", err)
  457. }
  458. // Switch to roundrobin, anc check against server[1] and server[2].
  459. cc.handleServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
  460. if err := checkRoundRobin(cc, servers[1:]); err != nil {
  461. t.Fatalf("check roundrobin returned non-nil error: %v", err)
  462. }
  463. }