package main import ( "context" "fmt" "github.com/prometheus/client_golang/api" v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "sync" "time" ) type State struct { config Config weights map[string]int spaces map[string]int64 inflights map[string]int64 lock sync.RWMutex } type TargetState struct { Target ServerSpec Weight int FreeSpace int64 Inflight int64 } var state *State func initState() { c, err := NewConfigFromFile("config.yaml") if err != nil { log.WithError(err).Fatal("Unable to read config") } state = &State{ config: c, } } func (state *State) GetListenAddr() string { state.lock.RLock() defer state.lock.RUnlock() return state.config.Listen } func (state *State) getPromAddr() string { state.lock.RLock() defer state.lock.RUnlock() return state.config.PromAddr } func (state *State) MakeClient() (v1.API, error) { addr := state.getPromAddr() log.WithField("prometheus", addr).Info("Making new prom client...") client, err := api.NewClient(api.Config{ Address: addr, }) if err != nil { return nil, err } c := v1.NewAPI(client) return c, nil } func (state *State) GetTargets() []TargetState { state.lock.RLock() defer state.lock.RUnlock() var states []TargetState for _, targetSpec := range state.config.Targets { w := 0 if val, ok := state.weights[targetSpec.Id]; ok { w = val } space := int64(0) if val, ok := state.spaces[targetSpec.Id]; ok { space = val } inflights := int64(0) if val, ok := state.inflights[targetSpec.Id]; ok { inflights = val } states = append(states, TargetState{ Target: targetSpec, Weight: w, FreeSpace: space, Inflight: inflights, }) } return states } var ( weightsGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "offload_dispatcher_target_weights", Help: "Weights of the targets in selection", }, []string{"target"}) ) func (state *State) updateTargets(targets []TargetState) { newWeights := make(map[string]int) newInflights := make(map[string]int64) newSpaces := make(map[string]int64) for _, t := range targets { newWeights[t.Target.Id] = t.Weight newInflights[t.Target.Id] = t.Inflight newSpaces[t.Target.Id] = t.FreeSpace weightsGauge.WithLabelValues(t.Target.Id).Set(float64(t.Weight)) } state.lock.Lock() defer state.lock.Unlock() state.weights = newWeights state.inflights = newInflights state.spaces = newSpaces } func (state *State) UpdateThread() { log.Info("Starting update thread...") v1api, err := state.MakeClient() if err != nil { log.WithError(err).Fatal("Unable to make prom client") } for { log.Info("Starting update run...") ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) targets := state.GetTargets() for i, target := range targets { { result, warnings, err := v1api.Query(ctx, fmt.Sprintf("minio_cluster_capacity_usable_free_bytes{job=\"%s\"}", target.Target.JobName), time.Now(), v1.WithTimeout(5*time.Second)) if err != nil { log.WithField("target", target.Target.Id).WithError(err).Error("Error querying Prometheus for free space") continue } if len(warnings) > 0 { log.WithField("target", target.Target.Id).Warnf("Free space warnings: %v\n", warnings) } resultVec := result.(model.Vector) if resultVec.Len() > 0 { resultVal := resultVec[0] log.WithField("target", target.Target.Id).Debugf("Computed free space: %v", resultVal.Value) targets[i].FreeSpace = int64(resultVal.Value) } } { result, warnings, err := v1api.Query(ctx, fmt.Sprintf("sum(minio_s3_requests_inflight_total{job=\"%s\"})", target.Target.JobName), time.Now(), v1.WithTimeout(5*time.Second)) if err != nil { log.WithField("target", target.Target.Id).WithError(err).Error("Error querying Prometheus for inflights") continue } if len(warnings) > 0 { log.WithField("target", target.Target.Id).Warnf("Inflight warnings: %v\n", warnings) } resultVec := result.(model.Vector) if resultVec.Len() > 0 { resultVal := resultVec[0] log.WithField("target", target.Target.Id).Debugf("Computed inflight: %v", resultVal.Value) targets[i].Inflight = int64(resultVal.Value) } } } cancel() inflightsSum := int64(0) inflightsCount := 0 for _, t := range targets { inflightsSum = inflightsSum + t.Inflight inflightsCount = inflightsCount + 1 } averageInflights := float64(inflightsSum) / float64(inflightsCount) log.Debugf("Average inflights: %v", averageInflights) anyNonZero := false for i, t := range targets { if t.FreeSpace > int64(t.Target.MinimumFreeSpace) { targets[i].Weight = int(int64(averageInflights) - int64(t.Inflight)) // Clamp to 0 if targets[i].Weight < 0 { targets[i].Weight = 0 } // Record if we found anything nonzero if targets[i].Weight > 0 { anyNonZero = true } } else { targets[i].Weight = 0 } } // Failsafe, if nobody is eligible, everybody is (that has space) if !anyNonZero { for i, t := range targets { if t.FreeSpace > int64(t.Target.MinimumFreeSpace) { targets[i].Weight = 1 } } } for _, t := range targets { log.WithField("target", t.Target.Id).Debugf("Computed weight: %v", t.Weight) } state.updateTargets(targets) time.Sleep(15 * time.Second) } }