You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

265 lines
7.0 KiB

  1. /*
  2. *
  3. * Copyright 2019 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package xds
  19. import (
  20. "context"
  21. "sync"
  22. "time"
  23. xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
  24. xdscorepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
  25. xdsdiscoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
  26. "github.com/gogo/protobuf/proto"
  27. "github.com/gogo/protobuf/types"
  28. "google.golang.org/grpc"
  29. "google.golang.org/grpc/balancer"
  30. "google.golang.org/grpc/grpclog"
  31. "google.golang.org/grpc/internal/backoff"
  32. "google.golang.org/grpc/internal/channelz"
  33. )
  34. const (
  35. grpcHostname = "com.googleapis.trafficdirector.grpc_hostname"
  36. cdsType = "type.googleapis.com/envoy.api.v2.Cluster"
  37. edsType = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
  38. endpointRequired = "endpoints_required"
  39. )
  40. var (
  41. defaultBackoffConfig = backoff.Exponential{
  42. MaxDelay: 120 * time.Second,
  43. }
  44. )
  45. // client is responsible for connecting to the specified traffic director, passing the received
  46. // ADS response from the traffic director, and sending notification when communication with the
  47. // traffic director is lost.
  48. type client struct {
  49. ctx context.Context
  50. cancel context.CancelFunc
  51. cli xdsdiscoverypb.AggregatedDiscoveryServiceClient
  52. opts balancer.BuildOptions
  53. balancerName string // the traffic director name
  54. serviceName string // the user dial target name
  55. enableCDS bool
  56. newADS func(ctx context.Context, resp proto.Message) error
  57. loseContact func(ctx context.Context)
  58. cleanup func()
  59. backoff backoff.Strategy
  60. mu sync.Mutex
  61. cc *grpc.ClientConn
  62. }
  63. func (c *client) run() {
  64. c.dial()
  65. c.makeADSCall()
  66. }
  67. func (c *client) close() {
  68. c.cancel()
  69. c.mu.Lock()
  70. if c.cc != nil {
  71. c.cc.Close()
  72. }
  73. c.mu.Unlock()
  74. c.cleanup()
  75. }
  76. func (c *client) dial() {
  77. var dopts []grpc.DialOption
  78. if creds := c.opts.DialCreds; creds != nil {
  79. if err := creds.OverrideServerName(c.balancerName); err == nil {
  80. dopts = append(dopts, grpc.WithTransportCredentials(creds))
  81. } else {
  82. grpclog.Warningf("xds: failed to override the server name in the credentials: %v, using Insecure", err)
  83. dopts = append(dopts, grpc.WithInsecure())
  84. }
  85. } else {
  86. dopts = append(dopts, grpc.WithInsecure())
  87. }
  88. if c.opts.Dialer != nil {
  89. dopts = append(dopts, grpc.WithContextDialer(c.opts.Dialer))
  90. }
  91. // Explicitly set pickfirst as the balancer.
  92. dopts = append(dopts, grpc.WithBalancerName(grpc.PickFirstBalancerName))
  93. if channelz.IsOn() {
  94. dopts = append(dopts, grpc.WithChannelzParentID(c.opts.ChannelzParentID))
  95. }
  96. cc, err := grpc.DialContext(c.ctx, c.balancerName, dopts...)
  97. // Since this is a non-blocking dial, so if it fails, it due to some serious error (not network
  98. // related) error.
  99. if err != nil {
  100. grpclog.Fatalf("xds: failed to dial: %v", err)
  101. }
  102. c.mu.Lock()
  103. select {
  104. case <-c.ctx.Done():
  105. cc.Close()
  106. default:
  107. // only assign c.cc when xds client has not been closed, to prevent ClientConn leak.
  108. c.cc = cc
  109. }
  110. c.mu.Unlock()
  111. }
  112. func (c *client) newCDSRequest() *xdspb.DiscoveryRequest {
  113. cdsReq := &xdspb.DiscoveryRequest{
  114. Node: &xdscorepb.Node{
  115. Metadata: &types.Struct{
  116. Fields: map[string]*types.Value{
  117. grpcHostname: {
  118. Kind: &types.Value_StringValue{StringValue: c.serviceName},
  119. },
  120. },
  121. },
  122. },
  123. TypeUrl: cdsType,
  124. }
  125. return cdsReq
  126. }
  127. func (c *client) newEDSRequest() *xdspb.DiscoveryRequest {
  128. edsReq := &xdspb.DiscoveryRequest{
  129. Node: &xdscorepb.Node{
  130. Metadata: &types.Struct{
  131. Fields: map[string]*types.Value{
  132. endpointRequired: {
  133. Kind: &types.Value_BoolValue{BoolValue: c.enableCDS},
  134. },
  135. },
  136. },
  137. },
  138. ResourceNames: []string{c.serviceName},
  139. TypeUrl: edsType,
  140. }
  141. return edsReq
  142. }
  143. func (c *client) makeADSCall() {
  144. c.cli = xdsdiscoverypb.NewAggregatedDiscoveryServiceClient(c.cc)
  145. retryCount := 0
  146. var doRetry bool
  147. for {
  148. select {
  149. case <-c.ctx.Done():
  150. return
  151. default:
  152. }
  153. if doRetry {
  154. backoffTimer := time.NewTimer(c.backoff.Backoff(retryCount))
  155. select {
  156. case <-backoffTimer.C:
  157. case <-c.ctx.Done():
  158. backoffTimer.Stop()
  159. return
  160. }
  161. retryCount++
  162. }
  163. firstRespReceived := c.adsCallAttempt()
  164. if firstRespReceived {
  165. retryCount = 0
  166. doRetry = false
  167. } else {
  168. doRetry = true
  169. }
  170. c.loseContact(c.ctx)
  171. }
  172. }
  173. func (c *client) adsCallAttempt() (firstRespReceived bool) {
  174. firstRespReceived = false
  175. ctx, cancel := context.WithCancel(c.ctx)
  176. defer cancel()
  177. st, err := c.cli.StreamAggregatedResources(ctx, grpc.WaitForReady(true))
  178. if err != nil {
  179. grpclog.Infof("xds: failed to initial ADS streaming RPC due to %v", err)
  180. return
  181. }
  182. if c.enableCDS {
  183. if err := st.Send(c.newCDSRequest()); err != nil {
  184. // current stream is broken, start a new one.
  185. return
  186. }
  187. }
  188. if err := st.Send(c.newEDSRequest()); err != nil {
  189. // current stream is broken, start a new one.
  190. return
  191. }
  192. expectCDS := c.enableCDS
  193. for {
  194. resp, err := st.Recv()
  195. if err != nil {
  196. // current stream is broken, start a new one.
  197. return
  198. }
  199. firstRespReceived = true
  200. resources := resp.GetResources()
  201. if len(resources) < 1 {
  202. grpclog.Warning("xds: ADS response contains 0 resource info.")
  203. // start a new call as server misbehaves by sending a ADS response with 0 resource info.
  204. return
  205. }
  206. if resp.GetTypeUrl() == cdsType && !c.enableCDS {
  207. grpclog.Warning("xds: received CDS response in custom plugin mode.")
  208. // start a new call as we receive CDS response when in EDS-only mode.
  209. return
  210. }
  211. var adsResp types.DynamicAny
  212. if err := types.UnmarshalAny(&resources[0], &adsResp); err != nil {
  213. grpclog.Warningf("xds: failed to unmarshal resources due to %v.", err)
  214. return
  215. }
  216. switch adsResp.Message.(type) {
  217. case *xdspb.Cluster:
  218. expectCDS = false
  219. case *xdspb.ClusterLoadAssignment:
  220. if expectCDS {
  221. grpclog.Warningf("xds: expecting CDS response, got EDS response instead.")
  222. return
  223. }
  224. }
  225. if err := c.newADS(c.ctx, adsResp.Message); err != nil {
  226. grpclog.Warningf("xds: processing new ADS message failed due to %v.", err)
  227. return
  228. }
  229. }
  230. }
  231. func newXDSClient(balancerName string, serviceName string, enableCDS bool, opts balancer.BuildOptions, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client {
  232. c := &client{
  233. balancerName: balancerName,
  234. serviceName: serviceName,
  235. enableCDS: enableCDS,
  236. opts: opts,
  237. newADS: newADS,
  238. loseContact: loseContact,
  239. cleanup: exitCleanup,
  240. backoff: defaultBackoffConfig,
  241. }
  242. c.ctx, c.cancel = context.WithCancel(context.Background())
  243. return c
  244. }