25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.
 
 

237 satır
5.5 KiB

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/prometheus/client_golang/api"
  6. v1 "github.com/prometheus/client_golang/api/prometheus/v1"
  7. "github.com/prometheus/client_golang/prometheus"
  8. "github.com/prometheus/client_golang/prometheus/promauto"
  9. "github.com/prometheus/common/model"
  10. "sync"
  11. "time"
  12. )
  13. type State struct {
  14. config Config
  15. weights map[string]int
  16. spaces map[string]int64
  17. inflights map[string]int64
  18. lock sync.RWMutex
  19. }
  20. type TargetState struct {
  21. Target ServerSpec
  22. Weight int
  23. FreeSpace int64
  24. Inflight int64
  25. }
  26. var state *State
  27. func initState() {
  28. c, err := NewConfigFromFile("config.yaml")
  29. if err != nil {
  30. log.WithError(err).Fatal("Unable to read config")
  31. }
  32. state = &State{
  33. config: c,
  34. }
  35. }
  36. func (state *State) GetListenAddr() string {
  37. state.lock.RLock()
  38. defer state.lock.RUnlock()
  39. return state.config.Listen
  40. }
  41. func (state *State) getPromAddr() string {
  42. state.lock.RLock()
  43. defer state.lock.RUnlock()
  44. return state.config.PromAddr
  45. }
  46. func (state *State) MakeClient() (v1.API, error) {
  47. addr := state.getPromAddr()
  48. log.WithField("prometheus", addr).Info("Making new prom client...")
  49. client, err := api.NewClient(api.Config{
  50. Address: addr,
  51. })
  52. if err != nil {
  53. return nil, err
  54. }
  55. c := v1.NewAPI(client)
  56. return c, nil
  57. }
  58. func (state *State) GetTargets() []TargetState {
  59. state.lock.RLock()
  60. defer state.lock.RUnlock()
  61. var states []TargetState
  62. for _, targetSpec := range state.config.Targets {
  63. w := 0
  64. if val, ok := state.weights[targetSpec.Id]; ok {
  65. w = val
  66. }
  67. space := int64(0)
  68. if val, ok := state.spaces[targetSpec.Id]; ok {
  69. space = val
  70. }
  71. inflights := int64(0)
  72. if val, ok := state.inflights[targetSpec.Id]; ok {
  73. inflights = val
  74. }
  75. states = append(states, TargetState{
  76. Target: targetSpec,
  77. Weight: w,
  78. FreeSpace: space,
  79. Inflight: inflights,
  80. })
  81. }
  82. return states
  83. }
  84. var (
  85. weightsGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
  86. Name: "offload_dispatcher_target_weights",
  87. Help: "Weights of the targets in selection",
  88. }, []string{"target"})
  89. )
  90. func (state *State) updateTargets(targets []TargetState) {
  91. newWeights := make(map[string]int)
  92. newInflights := make(map[string]int64)
  93. newSpaces := make(map[string]int64)
  94. for _, t := range targets {
  95. newWeights[t.Target.Id] = t.Weight
  96. newInflights[t.Target.Id] = t.Inflight
  97. newSpaces[t.Target.Id] = t.FreeSpace
  98. weightsGauge.WithLabelValues(t.Target.Id).Set(float64(t.Weight))
  99. }
  100. state.lock.Lock()
  101. defer state.lock.Unlock()
  102. state.weights = newWeights
  103. state.inflights = newInflights
  104. state.spaces = newSpaces
  105. }
  106. func (state *State) UpdateThread() {
  107. log.Info("Starting update thread...")
  108. v1api, err := state.MakeClient()
  109. if err != nil {
  110. log.WithError(err).Fatal("Unable to make prom client")
  111. }
  112. for {
  113. log.Info("Starting update run...")
  114. ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
  115. targets := state.GetTargets()
  116. for i, target := range targets {
  117. {
  118. result, warnings, err := v1api.Query(ctx, fmt.Sprintf("minio_cluster_capacity_usable_free_bytes{job=\"%s\"}", target.Target.JobName), time.Now(), v1.WithTimeout(5*time.Second))
  119. if err != nil {
  120. log.WithField("target", target.Target.Id).WithError(err).Error("Error querying Prometheus for free space")
  121. continue
  122. }
  123. if len(warnings) > 0 {
  124. log.WithField("target", target.Target.Id).Warnf("Free space warnings: %v\n", warnings)
  125. }
  126. resultVec := result.(model.Vector)
  127. if resultVec.Len() > 0 {
  128. resultVal := resultVec[0]
  129. log.WithField("target", target.Target.Id).Debugf("Computed free space: %v", resultVal.Value)
  130. targets[i].FreeSpace = int64(resultVal.Value)
  131. }
  132. }
  133. {
  134. result, warnings, err := v1api.Query(ctx, fmt.Sprintf("sum(minio_s3_requests_inflight_total{job=\"%s\"})", target.Target.JobName), time.Now(), v1.WithTimeout(5*time.Second))
  135. if err != nil {
  136. log.WithField("target", target.Target.Id).WithError(err).Error("Error querying Prometheus for inflights")
  137. continue
  138. }
  139. if len(warnings) > 0 {
  140. log.WithField("target", target.Target.Id).Warnf("Inflight warnings: %v\n", warnings)
  141. }
  142. resultVec := result.(model.Vector)
  143. if resultVec.Len() > 0 {
  144. resultVal := resultVec[0]
  145. log.WithField("target", target.Target.Id).Debugf("Computed inflight: %v", resultVal.Value)
  146. targets[i].Inflight = int64(resultVal.Value)
  147. }
  148. }
  149. }
  150. cancel()
  151. inflightsSum := int64(0)
  152. inflightsCount := 0
  153. for _, t := range targets {
  154. // Only count targets that can actually handle inbound in this calculation.
  155. if t.FreeSpace > int64(t.Target.MinimumFreeSpace) {
  156. inflightsSum = inflightsSum + t.Inflight
  157. inflightsCount = inflightsCount + 1
  158. }
  159. }
  160. averageInflights := float64(inflightsSum) / float64(inflightsCount)
  161. log.Debugf("Average inflights: %v", averageInflights)
  162. anyNonZero := false
  163. for i, t := range targets {
  164. if t.FreeSpace > int64(t.Target.MinimumFreeSpace) {
  165. targets[i].Weight = int(int64(averageInflights) - int64(t.Inflight))
  166. // Clamp to 0
  167. if targets[i].Weight < 0 {
  168. targets[i].Weight = 0
  169. }
  170. // Record if we found anything nonzero
  171. if targets[i].Weight > 0 {
  172. anyNonZero = true
  173. }
  174. } else {
  175. targets[i].Weight = 0
  176. }
  177. }
  178. // Failsafe, if nobody is eligible, everybody is (that has space)
  179. if !anyNonZero {
  180. for i, t := range targets {
  181. if t.FreeSpace > int64(t.Target.MinimumFreeSpace) {
  182. targets[i].Weight = 1
  183. }
  184. }
  185. }
  186. for _, t := range targets {
  187. log.WithField("target", t.Target.Id).Debugf("Computed weight: %v", t.Weight)
  188. }
  189. state.updateTargets(targets)
  190. time.Sleep(15 * time.Second)
  191. }
  192. }