Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

1045 строки
35 KiB

  1. // Copyright 2014 The Prometheus Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package prometheus
  14. import (
  15. "bytes"
  16. "errors"
  17. "fmt"
  18. "os"
  19. "path/filepath"
  20. "runtime"
  21. "sort"
  22. "strings"
  23. "sync"
  24. "unicode/utf8"
  25. "github.com/cespare/xxhash/v2"
  26. //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
  27. "github.com/golang/protobuf/proto"
  28. "github.com/prometheus/common/expfmt"
  29. dto "github.com/prometheus/client_model/go"
  30. "github.com/prometheus/client_golang/prometheus/internal"
  31. )
  32. const (
  33. // Capacity for the channel to collect metrics and descriptors.
  34. capMetricChan = 1000
  35. capDescChan = 10
  36. )
  37. // DefaultRegisterer and DefaultGatherer are the implementations of the
  38. // Registerer and Gatherer interface a number of convenience functions in this
  39. // package act on. Initially, both variables point to the same Registry, which
  40. // has a process collector (currently on Linux only, see NewProcessCollector)
  41. // and a Go collector (see NewGoCollector, in particular the note about
  42. // stop-the-world implication with Go versions older than 1.9) already
  43. // registered. This approach to keep default instances as global state mirrors
  44. // the approach of other packages in the Go standard library. Note that there
  45. // are caveats. Change the variables with caution and only if you understand the
  46. // consequences. Users who want to avoid global state altogether should not use
  47. // the convenience functions and act on custom instances instead.
  48. var (
  49. defaultRegistry = NewRegistry()
  50. DefaultRegisterer Registerer = defaultRegistry
  51. DefaultGatherer Gatherer = defaultRegistry
  52. )
  53. func init() {
  54. MustRegister(NewProcessCollector(ProcessCollectorOpts{}))
  55. MustRegister(NewGoCollector())
  56. }
  57. // NewRegistry creates a new vanilla Registry without any Collectors
  58. // pre-registered.
  59. func NewRegistry() *Registry {
  60. return &Registry{
  61. collectorsByID: map[uint64]Collector{},
  62. descIDs: map[uint64]struct{}{},
  63. dimHashesByName: map[string]uint64{},
  64. }
  65. }
  66. // NewPedanticRegistry returns a registry that checks during collection if each
  67. // collected Metric is consistent with its reported Desc, and if the Desc has
  68. // actually been registered with the registry. Unchecked Collectors (those whose
  69. // Describe method does not yield any descriptors) are excluded from the check.
  70. //
  71. // Usually, a Registry will be happy as long as the union of all collected
  72. // Metrics is consistent and valid even if some metrics are not consistent with
  73. // their own Desc or a Desc provided by their registered Collector. Well-behaved
  74. // Collectors and Metrics will only provide consistent Descs. This Registry is
  75. // useful to test the implementation of Collectors and Metrics.
  76. func NewPedanticRegistry() *Registry {
  77. r := NewRegistry()
  78. r.pedanticChecksEnabled = true
  79. return r
  80. }
  81. // Registerer is the interface for the part of a registry in charge of
  82. // registering and unregistering. Users of custom registries should use
  83. // Registerer as type for registration purposes (rather than the Registry type
  84. // directly). In that way, they are free to use custom Registerer implementation
  85. // (e.g. for testing purposes).
  86. type Registerer interface {
  87. // Register registers a new Collector to be included in metrics
  88. // collection. It returns an error if the descriptors provided by the
  89. // Collector are invalid or if they — in combination with descriptors of
  90. // already registered Collectors — do not fulfill the consistency and
  91. // uniqueness criteria described in the documentation of metric.Desc.
  92. //
  93. // If the provided Collector is equal to a Collector already registered
  94. // (which includes the case of re-registering the same Collector), the
  95. // returned error is an instance of AlreadyRegisteredError, which
  96. // contains the previously registered Collector.
  97. //
  98. // A Collector whose Describe method does not yield any Desc is treated
  99. // as unchecked. Registration will always succeed. No check for
  100. // re-registering (see previous paragraph) is performed. Thus, the
  101. // caller is responsible for not double-registering the same unchecked
  102. // Collector, and for providing a Collector that will not cause
  103. // inconsistent metrics on collection. (This would lead to scrape
  104. // errors.)
  105. Register(Collector) error
  106. // MustRegister works like Register but registers any number of
  107. // Collectors and panics upon the first registration that causes an
  108. // error.
  109. MustRegister(...Collector)
  110. // Unregister unregisters the Collector that equals the Collector passed
  111. // in as an argument. (Two Collectors are considered equal if their
  112. // Describe method yields the same set of descriptors.) The function
  113. // returns whether a Collector was unregistered. Note that an unchecked
  114. // Collector cannot be unregistered (as its Describe method does not
  115. // yield any descriptor).
  116. //
  117. // Note that even after unregistering, it will not be possible to
  118. // register a new Collector that is inconsistent with the unregistered
  119. // Collector, e.g. a Collector collecting metrics with the same name but
  120. // a different help string. The rationale here is that the same registry
  121. // instance must only collect consistent metrics throughout its
  122. // lifetime.
  123. Unregister(Collector) bool
  124. }
  125. // Gatherer is the interface for the part of a registry in charge of gathering
  126. // the collected metrics into a number of MetricFamilies. The Gatherer interface
  127. // comes with the same general implication as described for the Registerer
  128. // interface.
  129. type Gatherer interface {
  130. // Gather calls the Collect method of the registered Collectors and then
  131. // gathers the collected metrics into a lexicographically sorted slice
  132. // of uniquely named MetricFamily protobufs. Gather ensures that the
  133. // returned slice is valid and self-consistent so that it can be used
  134. // for valid exposition. As an exception to the strict consistency
  135. // requirements described for metric.Desc, Gather will tolerate
  136. // different sets of label names for metrics of the same metric family.
  137. //
  138. // Even if an error occurs, Gather attempts to gather as many metrics as
  139. // possible. Hence, if a non-nil error is returned, the returned
  140. // MetricFamily slice could be nil (in case of a fatal error that
  141. // prevented any meaningful metric collection) or contain a number of
  142. // MetricFamily protobufs, some of which might be incomplete, and some
  143. // might be missing altogether. The returned error (which might be a
  144. // MultiError) explains the details. Note that this is mostly useful for
  145. // debugging purposes. If the gathered protobufs are to be used for
  146. // exposition in actual monitoring, it is almost always better to not
  147. // expose an incomplete result and instead disregard the returned
  148. // MetricFamily protobufs in case the returned error is non-nil.
  149. Gather() ([]*dto.MetricFamily, error)
  150. }
  151. // Register registers the provided Collector with the DefaultRegisterer.
  152. //
  153. // Register is a shortcut for DefaultRegisterer.Register(c). See there for more
  154. // details.
  155. func Register(c Collector) error {
  156. return DefaultRegisterer.Register(c)
  157. }
  158. // MustRegister registers the provided Collectors with the DefaultRegisterer and
  159. // panics if any error occurs.
  160. //
  161. // MustRegister is a shortcut for DefaultRegisterer.MustRegister(cs...). See
  162. // there for more details.
  163. func MustRegister(cs ...Collector) {
  164. DefaultRegisterer.MustRegister(cs...)
  165. }
  166. // Unregister removes the registration of the provided Collector from the
  167. // DefaultRegisterer.
  168. //
  169. // Unregister is a shortcut for DefaultRegisterer.Unregister(c). See there for
  170. // more details.
  171. func Unregister(c Collector) bool {
  172. return DefaultRegisterer.Unregister(c)
  173. }
  174. // GathererFunc turns a function into a Gatherer.
  175. type GathererFunc func() ([]*dto.MetricFamily, error)
  176. // Gather implements Gatherer.
  177. func (gf GathererFunc) Gather() ([]*dto.MetricFamily, error) {
  178. return gf()
  179. }
  180. // AlreadyRegisteredError is returned by the Register method if the Collector to
  181. // be registered has already been registered before, or a different Collector
  182. // that collects the same metrics has been registered before. Registration fails
  183. // in that case, but you can detect from the kind of error what has
  184. // happened. The error contains fields for the existing Collector and the
  185. // (rejected) new Collector that equals the existing one. This can be used to
  186. // find out if an equal Collector has been registered before and switch over to
  187. // using the old one, as demonstrated in the example.
  188. type AlreadyRegisteredError struct {
  189. ExistingCollector, NewCollector Collector
  190. }
  191. func (err AlreadyRegisteredError) Error() string {
  192. return "duplicate metrics collector registration attempted"
  193. }
  194. // MultiError is a slice of errors implementing the error interface. It is used
  195. // by a Gatherer to report multiple errors during MetricFamily gathering.
  196. type MultiError []error
  197. // Error formats the contained errors as a bullet point list, preceded by the
  198. // total number of errors. Note that this results in a multi-line string.
  199. func (errs MultiError) Error() string {
  200. if len(errs) == 0 {
  201. return ""
  202. }
  203. buf := &bytes.Buffer{}
  204. fmt.Fprintf(buf, "%d error(s) occurred:", len(errs))
  205. for _, err := range errs {
  206. fmt.Fprintf(buf, "\n* %s", err)
  207. }
  208. return buf.String()
  209. }
  210. // Append appends the provided error if it is not nil.
  211. func (errs *MultiError) Append(err error) {
  212. if err != nil {
  213. *errs = append(*errs, err)
  214. }
  215. }
  216. // MaybeUnwrap returns nil if len(errs) is 0. It returns the first and only
  217. // contained error as error if len(errs is 1). In all other cases, it returns
  218. // the MultiError directly. This is helpful for returning a MultiError in a way
  219. // that only uses the MultiError if needed.
  220. func (errs MultiError) MaybeUnwrap() error {
  221. switch len(errs) {
  222. case 0:
  223. return nil
  224. case 1:
  225. return errs[0]
  226. default:
  227. return errs
  228. }
  229. }
  230. // Registry registers Prometheus collectors, collects their metrics, and gathers
  231. // them into MetricFamilies for exposition. It implements both Registerer and
  232. // Gatherer. The zero value is not usable. Create instances with NewRegistry or
  233. // NewPedanticRegistry.
  234. type Registry struct {
  235. mtx sync.RWMutex
  236. collectorsByID map[uint64]Collector // ID is a hash of the descIDs.
  237. descIDs map[uint64]struct{}
  238. dimHashesByName map[string]uint64
  239. uncheckedCollectors []Collector
  240. pedanticChecksEnabled bool
  241. }
  242. // Register implements Registerer.
  243. func (r *Registry) Register(c Collector) error {
  244. var (
  245. descChan = make(chan *Desc, capDescChan)
  246. newDescIDs = map[uint64]struct{}{}
  247. newDimHashesByName = map[string]uint64{}
  248. collectorID uint64 // All desc IDs XOR'd together.
  249. duplicateDescErr error
  250. )
  251. go func() {
  252. c.Describe(descChan)
  253. close(descChan)
  254. }()
  255. r.mtx.Lock()
  256. defer func() {
  257. // Drain channel in case of premature return to not leak a goroutine.
  258. for range descChan {
  259. }
  260. r.mtx.Unlock()
  261. }()
  262. // Conduct various tests...
  263. for desc := range descChan {
  264. // Is the descriptor valid at all?
  265. if desc.err != nil {
  266. return fmt.Errorf("descriptor %s is invalid: %w", desc, desc.err)
  267. }
  268. // Is the descID unique?
  269. // (In other words: Is the fqName + constLabel combination unique?)
  270. if _, exists := r.descIDs[desc.id]; exists {
  271. duplicateDescErr = fmt.Errorf("descriptor %s already exists with the same fully-qualified name and const label values", desc)
  272. }
  273. // If it is not a duplicate desc in this collector, XOR it to
  274. // the collectorID. (We allow duplicate descs within the same
  275. // collector, but their existence must be a no-op.)
  276. if _, exists := newDescIDs[desc.id]; !exists {
  277. newDescIDs[desc.id] = struct{}{}
  278. collectorID ^= desc.id
  279. }
  280. // Are all the label names and the help string consistent with
  281. // previous descriptors of the same name?
  282. // First check existing descriptors...
  283. if dimHash, exists := r.dimHashesByName[desc.fqName]; exists {
  284. if dimHash != desc.dimHash {
  285. return fmt.Errorf("a previously registered descriptor with the same fully-qualified name as %s has different label names or a different help string", desc)
  286. }
  287. } else {
  288. // ...then check the new descriptors already seen.
  289. if dimHash, exists := newDimHashesByName[desc.fqName]; exists {
  290. if dimHash != desc.dimHash {
  291. return fmt.Errorf("descriptors reported by collector have inconsistent label names or help strings for the same fully-qualified name, offender is %s", desc)
  292. }
  293. } else {
  294. newDimHashesByName[desc.fqName] = desc.dimHash
  295. }
  296. }
  297. }
  298. // A Collector yielding no Desc at all is considered unchecked.
  299. if len(newDescIDs) == 0 {
  300. r.uncheckedCollectors = append(r.uncheckedCollectors, c)
  301. return nil
  302. }
  303. if existing, exists := r.collectorsByID[collectorID]; exists {
  304. switch e := existing.(type) {
  305. case *wrappingCollector:
  306. return AlreadyRegisteredError{
  307. ExistingCollector: e.unwrapRecursively(),
  308. NewCollector: c,
  309. }
  310. default:
  311. return AlreadyRegisteredError{
  312. ExistingCollector: e,
  313. NewCollector: c,
  314. }
  315. }
  316. }
  317. // If the collectorID is new, but at least one of the descs existed
  318. // before, we are in trouble.
  319. if duplicateDescErr != nil {
  320. return duplicateDescErr
  321. }
  322. // Only after all tests have passed, actually register.
  323. r.collectorsByID[collectorID] = c
  324. for hash := range newDescIDs {
  325. r.descIDs[hash] = struct{}{}
  326. }
  327. for name, dimHash := range newDimHashesByName {
  328. r.dimHashesByName[name] = dimHash
  329. }
  330. return nil
  331. }
  332. // Unregister implements Registerer.
  333. func (r *Registry) Unregister(c Collector) bool {
  334. var (
  335. descChan = make(chan *Desc, capDescChan)
  336. descIDs = map[uint64]struct{}{}
  337. collectorID uint64 // All desc IDs XOR'd together.
  338. )
  339. go func() {
  340. c.Describe(descChan)
  341. close(descChan)
  342. }()
  343. for desc := range descChan {
  344. if _, exists := descIDs[desc.id]; !exists {
  345. collectorID ^= desc.id
  346. descIDs[desc.id] = struct{}{}
  347. }
  348. }
  349. r.mtx.RLock()
  350. if _, exists := r.collectorsByID[collectorID]; !exists {
  351. r.mtx.RUnlock()
  352. return false
  353. }
  354. r.mtx.RUnlock()
  355. r.mtx.Lock()
  356. defer r.mtx.Unlock()
  357. delete(r.collectorsByID, collectorID)
  358. for id := range descIDs {
  359. delete(r.descIDs, id)
  360. }
  361. // dimHashesByName is left untouched as those must be consistent
  362. // throughout the lifetime of a program.
  363. return true
  364. }
  365. // MustRegister implements Registerer.
  366. func (r *Registry) MustRegister(cs ...Collector) {
  367. for _, c := range cs {
  368. if err := r.Register(c); err != nil {
  369. panic(err)
  370. }
  371. }
  372. }
  373. // Gather implements Gatherer.
  374. func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
  375. r.mtx.RLock()
  376. if len(r.collectorsByID) == 0 && len(r.uncheckedCollectors) == 0 {
  377. // Fast path.
  378. r.mtx.RUnlock()
  379. return nil, nil
  380. }
  381. var (
  382. checkedMetricChan = make(chan Metric, capMetricChan)
  383. uncheckedMetricChan = make(chan Metric, capMetricChan)
  384. metricHashes = map[uint64]struct{}{}
  385. wg sync.WaitGroup
  386. errs MultiError // The collected errors to return in the end.
  387. registeredDescIDs map[uint64]struct{} // Only used for pedantic checks
  388. )
  389. goroutineBudget := len(r.collectorsByID) + len(r.uncheckedCollectors)
  390. metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName))
  391. checkedCollectors := make(chan Collector, len(r.collectorsByID))
  392. uncheckedCollectors := make(chan Collector, len(r.uncheckedCollectors))
  393. for _, collector := range r.collectorsByID {
  394. checkedCollectors <- collector
  395. }
  396. for _, collector := range r.uncheckedCollectors {
  397. uncheckedCollectors <- collector
  398. }
  399. // In case pedantic checks are enabled, we have to copy the map before
  400. // giving up the RLock.
  401. if r.pedanticChecksEnabled {
  402. registeredDescIDs = make(map[uint64]struct{}, len(r.descIDs))
  403. for id := range r.descIDs {
  404. registeredDescIDs[id] = struct{}{}
  405. }
  406. }
  407. r.mtx.RUnlock()
  408. wg.Add(goroutineBudget)
  409. collectWorker := func() {
  410. for {
  411. select {
  412. case collector := <-checkedCollectors:
  413. collector.Collect(checkedMetricChan)
  414. case collector := <-uncheckedCollectors:
  415. collector.Collect(uncheckedMetricChan)
  416. default:
  417. return
  418. }
  419. wg.Done()
  420. }
  421. }
  422. // Start the first worker now to make sure at least one is running.
  423. go collectWorker()
  424. goroutineBudget--
  425. // Close checkedMetricChan and uncheckedMetricChan once all collectors
  426. // are collected.
  427. go func() {
  428. wg.Wait()
  429. close(checkedMetricChan)
  430. close(uncheckedMetricChan)
  431. }()
  432. // Drain checkedMetricChan and uncheckedMetricChan in case of premature return.
  433. defer func() {
  434. if checkedMetricChan != nil {
  435. for range checkedMetricChan {
  436. }
  437. }
  438. if uncheckedMetricChan != nil {
  439. for range uncheckedMetricChan {
  440. }
  441. }
  442. }()
  443. // Copy the channel references so we can nil them out later to remove
  444. // them from the select statements below.
  445. cmc := checkedMetricChan
  446. umc := uncheckedMetricChan
  447. for {
  448. select {
  449. case metric, ok := <-cmc:
  450. if !ok {
  451. cmc = nil
  452. break
  453. }
  454. errs.Append(processMetric(
  455. metric, metricFamiliesByName,
  456. metricHashes,
  457. registeredDescIDs,
  458. ))
  459. case metric, ok := <-umc:
  460. if !ok {
  461. umc = nil
  462. break
  463. }
  464. errs.Append(processMetric(
  465. metric, metricFamiliesByName,
  466. metricHashes,
  467. nil,
  468. ))
  469. default:
  470. if goroutineBudget <= 0 || len(checkedCollectors)+len(uncheckedCollectors) == 0 {
  471. // All collectors are already being worked on or
  472. // we have already as many goroutines started as
  473. // there are collectors. Do the same as above,
  474. // just without the default.
  475. select {
  476. case metric, ok := <-cmc:
  477. if !ok {
  478. cmc = nil
  479. break
  480. }
  481. errs.Append(processMetric(
  482. metric, metricFamiliesByName,
  483. metricHashes,
  484. registeredDescIDs,
  485. ))
  486. case metric, ok := <-umc:
  487. if !ok {
  488. umc = nil
  489. break
  490. }
  491. errs.Append(processMetric(
  492. metric, metricFamiliesByName,
  493. metricHashes,
  494. nil,
  495. ))
  496. }
  497. break
  498. }
  499. // Start more workers.
  500. go collectWorker()
  501. goroutineBudget--
  502. runtime.Gosched()
  503. }
  504. // Once both checkedMetricChan and uncheckdMetricChan are closed
  505. // and drained, the contraption above will nil out cmc and umc,
  506. // and then we can leave the collect loop here.
  507. if cmc == nil && umc == nil {
  508. break
  509. }
  510. }
  511. return internal.NormalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap()
  512. }
  513. // WriteToTextfile calls Gather on the provided Gatherer, encodes the result in the
  514. // Prometheus text format, and writes it to a temporary file. Upon success, the
  515. // temporary file is renamed to the provided filename.
  516. //
  517. // This is intended for use with the textfile collector of the node exporter.
  518. // Note that the node exporter expects the filename to be suffixed with ".prom".
  519. func WriteToTextfile(filename string, g Gatherer) error {
  520. tmp, err := os.CreateTemp(filepath.Dir(filename), filepath.Base(filename))
  521. if err != nil {
  522. return err
  523. }
  524. defer os.Remove(tmp.Name())
  525. mfs, err := g.Gather()
  526. if err != nil {
  527. return err
  528. }
  529. for _, mf := range mfs {
  530. if _, err := expfmt.MetricFamilyToText(tmp, mf); err != nil {
  531. return err
  532. }
  533. }
  534. if err := tmp.Close(); err != nil {
  535. return err
  536. }
  537. if err := os.Chmod(tmp.Name(), 0o644); err != nil {
  538. return err
  539. }
  540. return os.Rename(tmp.Name(), filename)
  541. }
  542. // processMetric is an internal helper method only used by the Gather method.
  543. func processMetric(
  544. metric Metric,
  545. metricFamiliesByName map[string]*dto.MetricFamily,
  546. metricHashes map[uint64]struct{},
  547. registeredDescIDs map[uint64]struct{},
  548. ) error {
  549. desc := metric.Desc()
  550. // Wrapped metrics collected by an unchecked Collector can have an
  551. // invalid Desc.
  552. if desc.err != nil {
  553. return desc.err
  554. }
  555. dtoMetric := &dto.Metric{}
  556. if err := metric.Write(dtoMetric); err != nil {
  557. return fmt.Errorf("error collecting metric %v: %w", desc, err)
  558. }
  559. metricFamily, ok := metricFamiliesByName[desc.fqName]
  560. if ok { // Existing name.
  561. if metricFamily.GetHelp() != desc.help {
  562. return fmt.Errorf(
  563. "collected metric %s %s has help %q but should have %q",
  564. desc.fqName, dtoMetric, desc.help, metricFamily.GetHelp(),
  565. )
  566. }
  567. // TODO(beorn7): Simplify switch once Desc has type.
  568. switch metricFamily.GetType() {
  569. case dto.MetricType_COUNTER:
  570. if dtoMetric.Counter == nil {
  571. return fmt.Errorf(
  572. "collected metric %s %s should be a Counter",
  573. desc.fqName, dtoMetric,
  574. )
  575. }
  576. case dto.MetricType_GAUGE:
  577. if dtoMetric.Gauge == nil {
  578. return fmt.Errorf(
  579. "collected metric %s %s should be a Gauge",
  580. desc.fqName, dtoMetric,
  581. )
  582. }
  583. case dto.MetricType_SUMMARY:
  584. if dtoMetric.Summary == nil {
  585. return fmt.Errorf(
  586. "collected metric %s %s should be a Summary",
  587. desc.fqName, dtoMetric,
  588. )
  589. }
  590. case dto.MetricType_UNTYPED:
  591. if dtoMetric.Untyped == nil {
  592. return fmt.Errorf(
  593. "collected metric %s %s should be Untyped",
  594. desc.fqName, dtoMetric,
  595. )
  596. }
  597. case dto.MetricType_HISTOGRAM:
  598. if dtoMetric.Histogram == nil {
  599. return fmt.Errorf(
  600. "collected metric %s %s should be a Histogram",
  601. desc.fqName, dtoMetric,
  602. )
  603. }
  604. default:
  605. panic("encountered MetricFamily with invalid type")
  606. }
  607. } else { // New name.
  608. metricFamily = &dto.MetricFamily{}
  609. metricFamily.Name = proto.String(desc.fqName)
  610. metricFamily.Help = proto.String(desc.help)
  611. // TODO(beorn7): Simplify switch once Desc has type.
  612. switch {
  613. case dtoMetric.Gauge != nil:
  614. metricFamily.Type = dto.MetricType_GAUGE.Enum()
  615. case dtoMetric.Counter != nil:
  616. metricFamily.Type = dto.MetricType_COUNTER.Enum()
  617. case dtoMetric.Summary != nil:
  618. metricFamily.Type = dto.MetricType_SUMMARY.Enum()
  619. case dtoMetric.Untyped != nil:
  620. metricFamily.Type = dto.MetricType_UNTYPED.Enum()
  621. case dtoMetric.Histogram != nil:
  622. metricFamily.Type = dto.MetricType_HISTOGRAM.Enum()
  623. default:
  624. return fmt.Errorf("empty metric collected: %s", dtoMetric)
  625. }
  626. if err := checkSuffixCollisions(metricFamily, metricFamiliesByName); err != nil {
  627. return err
  628. }
  629. metricFamiliesByName[desc.fqName] = metricFamily
  630. }
  631. if err := checkMetricConsistency(metricFamily, dtoMetric, metricHashes); err != nil {
  632. return err
  633. }
  634. if registeredDescIDs != nil {
  635. // Is the desc registered at all?
  636. if _, exist := registeredDescIDs[desc.id]; !exist {
  637. return fmt.Errorf(
  638. "collected metric %s %s with unregistered descriptor %s",
  639. metricFamily.GetName(), dtoMetric, desc,
  640. )
  641. }
  642. if err := checkDescConsistency(metricFamily, dtoMetric, desc); err != nil {
  643. return err
  644. }
  645. }
  646. metricFamily.Metric = append(metricFamily.Metric, dtoMetric)
  647. return nil
  648. }
  649. // Gatherers is a slice of Gatherer instances that implements the Gatherer
  650. // interface itself. Its Gather method calls Gather on all Gatherers in the
  651. // slice in order and returns the merged results. Errors returned from the
  652. // Gather calls are all returned in a flattened MultiError. Duplicate and
  653. // inconsistent Metrics are skipped (first occurrence in slice order wins) and
  654. // reported in the returned error.
  655. //
  656. // Gatherers can be used to merge the Gather results from multiple
  657. // Registries. It also provides a way to directly inject existing MetricFamily
  658. // protobufs into the gathering by creating a custom Gatherer with a Gather
  659. // method that simply returns the existing MetricFamily protobufs. Note that no
  660. // registration is involved (in contrast to Collector registration), so
  661. // obviously registration-time checks cannot happen. Any inconsistencies between
  662. // the gathered MetricFamilies are reported as errors by the Gather method, and
  663. // inconsistent Metrics are dropped. Invalid parts of the MetricFamilies
  664. // (e.g. syntactically invalid metric or label names) will go undetected.
  665. type Gatherers []Gatherer
  666. // Gather implements Gatherer.
  667. func (gs Gatherers) Gather() ([]*dto.MetricFamily, error) {
  668. var (
  669. metricFamiliesByName = map[string]*dto.MetricFamily{}
  670. metricHashes = map[uint64]struct{}{}
  671. errs MultiError // The collected errors to return in the end.
  672. )
  673. for i, g := range gs {
  674. mfs, err := g.Gather()
  675. if err != nil {
  676. multiErr := MultiError{}
  677. if errors.As(err, &multiErr) {
  678. for _, err := range multiErr {
  679. errs = append(errs, fmt.Errorf("[from Gatherer #%d] %w", i+1, err))
  680. }
  681. } else {
  682. errs = append(errs, fmt.Errorf("[from Gatherer #%d] %w", i+1, err))
  683. }
  684. }
  685. for _, mf := range mfs {
  686. existingMF, exists := metricFamiliesByName[mf.GetName()]
  687. if exists {
  688. if existingMF.GetHelp() != mf.GetHelp() {
  689. errs = append(errs, fmt.Errorf(
  690. "gathered metric family %s has help %q but should have %q",
  691. mf.GetName(), mf.GetHelp(), existingMF.GetHelp(),
  692. ))
  693. continue
  694. }
  695. if existingMF.GetType() != mf.GetType() {
  696. errs = append(errs, fmt.Errorf(
  697. "gathered metric family %s has type %s but should have %s",
  698. mf.GetName(), mf.GetType(), existingMF.GetType(),
  699. ))
  700. continue
  701. }
  702. } else {
  703. existingMF = &dto.MetricFamily{}
  704. existingMF.Name = mf.Name
  705. existingMF.Help = mf.Help
  706. existingMF.Type = mf.Type
  707. if err := checkSuffixCollisions(existingMF, metricFamiliesByName); err != nil {
  708. errs = append(errs, err)
  709. continue
  710. }
  711. metricFamiliesByName[mf.GetName()] = existingMF
  712. }
  713. for _, m := range mf.Metric {
  714. if err := checkMetricConsistency(existingMF, m, metricHashes); err != nil {
  715. errs = append(errs, err)
  716. continue
  717. }
  718. existingMF.Metric = append(existingMF.Metric, m)
  719. }
  720. }
  721. }
  722. return internal.NormalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap()
  723. }
  724. // checkSuffixCollisions checks for collisions with the “magic” suffixes the
  725. // Prometheus text format and the internal metric representation of the
  726. // Prometheus server add while flattening Summaries and Histograms.
  727. func checkSuffixCollisions(mf *dto.MetricFamily, mfs map[string]*dto.MetricFamily) error {
  728. var (
  729. newName = mf.GetName()
  730. newType = mf.GetType()
  731. newNameWithoutSuffix = ""
  732. )
  733. switch {
  734. case strings.HasSuffix(newName, "_count"):
  735. newNameWithoutSuffix = newName[:len(newName)-6]
  736. case strings.HasSuffix(newName, "_sum"):
  737. newNameWithoutSuffix = newName[:len(newName)-4]
  738. case strings.HasSuffix(newName, "_bucket"):
  739. newNameWithoutSuffix = newName[:len(newName)-7]
  740. }
  741. if newNameWithoutSuffix != "" {
  742. if existingMF, ok := mfs[newNameWithoutSuffix]; ok {
  743. switch existingMF.GetType() {
  744. case dto.MetricType_SUMMARY:
  745. if !strings.HasSuffix(newName, "_bucket") {
  746. return fmt.Errorf(
  747. "collected metric named %q collides with previously collected summary named %q",
  748. newName, newNameWithoutSuffix,
  749. )
  750. }
  751. case dto.MetricType_HISTOGRAM:
  752. return fmt.Errorf(
  753. "collected metric named %q collides with previously collected histogram named %q",
  754. newName, newNameWithoutSuffix,
  755. )
  756. }
  757. }
  758. }
  759. if newType == dto.MetricType_SUMMARY || newType == dto.MetricType_HISTOGRAM {
  760. if _, ok := mfs[newName+"_count"]; ok {
  761. return fmt.Errorf(
  762. "collected histogram or summary named %q collides with previously collected metric named %q",
  763. newName, newName+"_count",
  764. )
  765. }
  766. if _, ok := mfs[newName+"_sum"]; ok {
  767. return fmt.Errorf(
  768. "collected histogram or summary named %q collides with previously collected metric named %q",
  769. newName, newName+"_sum",
  770. )
  771. }
  772. }
  773. if newType == dto.MetricType_HISTOGRAM {
  774. if _, ok := mfs[newName+"_bucket"]; ok {
  775. return fmt.Errorf(
  776. "collected histogram named %q collides with previously collected metric named %q",
  777. newName, newName+"_bucket",
  778. )
  779. }
  780. }
  781. return nil
  782. }
  783. // checkMetricConsistency checks if the provided Metric is consistent with the
  784. // provided MetricFamily. It also hashes the Metric labels and the MetricFamily
  785. // name. If the resulting hash is already in the provided metricHashes, an error
  786. // is returned. If not, it is added to metricHashes.
  787. func checkMetricConsistency(
  788. metricFamily *dto.MetricFamily,
  789. dtoMetric *dto.Metric,
  790. metricHashes map[uint64]struct{},
  791. ) error {
  792. name := metricFamily.GetName()
  793. // Type consistency with metric family.
  794. if metricFamily.GetType() == dto.MetricType_GAUGE && dtoMetric.Gauge == nil ||
  795. metricFamily.GetType() == dto.MetricType_COUNTER && dtoMetric.Counter == nil ||
  796. metricFamily.GetType() == dto.MetricType_SUMMARY && dtoMetric.Summary == nil ||
  797. metricFamily.GetType() == dto.MetricType_HISTOGRAM && dtoMetric.Histogram == nil ||
  798. metricFamily.GetType() == dto.MetricType_UNTYPED && dtoMetric.Untyped == nil {
  799. return fmt.Errorf(
  800. "collected metric %q { %s} is not a %s",
  801. name, dtoMetric, metricFamily.GetType(),
  802. )
  803. }
  804. previousLabelName := ""
  805. for _, labelPair := range dtoMetric.GetLabel() {
  806. labelName := labelPair.GetName()
  807. if labelName == previousLabelName {
  808. return fmt.Errorf(
  809. "collected metric %q { %s} has two or more labels with the same name: %s",
  810. name, dtoMetric, labelName,
  811. )
  812. }
  813. if !checkLabelName(labelName) {
  814. return fmt.Errorf(
  815. "collected metric %q { %s} has a label with an invalid name: %s",
  816. name, dtoMetric, labelName,
  817. )
  818. }
  819. if dtoMetric.Summary != nil && labelName == quantileLabel {
  820. return fmt.Errorf(
  821. "collected metric %q { %s} must not have an explicit %q label",
  822. name, dtoMetric, quantileLabel,
  823. )
  824. }
  825. if !utf8.ValidString(labelPair.GetValue()) {
  826. return fmt.Errorf(
  827. "collected metric %q { %s} has a label named %q whose value is not utf8: %#v",
  828. name, dtoMetric, labelName, labelPair.GetValue())
  829. }
  830. previousLabelName = labelName
  831. }
  832. // Is the metric unique (i.e. no other metric with the same name and the same labels)?
  833. h := xxhash.New()
  834. h.WriteString(name)
  835. h.Write(separatorByteSlice)
  836. // Make sure label pairs are sorted. We depend on it for the consistency
  837. // check.
  838. if !sort.IsSorted(internal.LabelPairSorter(dtoMetric.Label)) {
  839. // We cannot sort dtoMetric.Label in place as it is immutable by contract.
  840. copiedLabels := make([]*dto.LabelPair, len(dtoMetric.Label))
  841. copy(copiedLabels, dtoMetric.Label)
  842. sort.Sort(internal.LabelPairSorter(copiedLabels))
  843. dtoMetric.Label = copiedLabels
  844. }
  845. for _, lp := range dtoMetric.Label {
  846. h.WriteString(lp.GetName())
  847. h.Write(separatorByteSlice)
  848. h.WriteString(lp.GetValue())
  849. h.Write(separatorByteSlice)
  850. }
  851. hSum := h.Sum64()
  852. if _, exists := metricHashes[hSum]; exists {
  853. return fmt.Errorf(
  854. "collected metric %q { %s} was collected before with the same name and label values",
  855. name, dtoMetric,
  856. )
  857. }
  858. metricHashes[hSum] = struct{}{}
  859. return nil
  860. }
  861. func checkDescConsistency(
  862. metricFamily *dto.MetricFamily,
  863. dtoMetric *dto.Metric,
  864. desc *Desc,
  865. ) error {
  866. // Desc help consistency with metric family help.
  867. if metricFamily.GetHelp() != desc.help {
  868. return fmt.Errorf(
  869. "collected metric %s %s has help %q but should have %q",
  870. metricFamily.GetName(), dtoMetric, metricFamily.GetHelp(), desc.help,
  871. )
  872. }
  873. // Is the desc consistent with the content of the metric?
  874. lpsFromDesc := make([]*dto.LabelPair, len(desc.constLabelPairs), len(dtoMetric.Label))
  875. copy(lpsFromDesc, desc.constLabelPairs)
  876. for _, l := range desc.variableLabels {
  877. lpsFromDesc = append(lpsFromDesc, &dto.LabelPair{
  878. Name: proto.String(l),
  879. })
  880. }
  881. if len(lpsFromDesc) != len(dtoMetric.Label) {
  882. return fmt.Errorf(
  883. "labels in collected metric %s %s are inconsistent with descriptor %s",
  884. metricFamily.GetName(), dtoMetric, desc,
  885. )
  886. }
  887. sort.Sort(internal.LabelPairSorter(lpsFromDesc))
  888. for i, lpFromDesc := range lpsFromDesc {
  889. lpFromMetric := dtoMetric.Label[i]
  890. if lpFromDesc.GetName() != lpFromMetric.GetName() ||
  891. lpFromDesc.Value != nil && lpFromDesc.GetValue() != lpFromMetric.GetValue() {
  892. return fmt.Errorf(
  893. "labels in collected metric %s %s are inconsistent with descriptor %s",
  894. metricFamily.GetName(), dtoMetric, desc,
  895. )
  896. }
  897. }
  898. return nil
  899. }
  900. var _ TransactionalGatherer = &MultiTRegistry{}
  901. // MultiTRegistry is a TransactionalGatherer that joins gathered metrics from multiple
  902. // transactional gatherers.
  903. //
  904. // It is caller responsibility to ensure two registries have mutually exclusive metric families,
  905. // no deduplication will happen.
  906. type MultiTRegistry struct {
  907. tGatherers []TransactionalGatherer
  908. }
  909. // NewMultiTRegistry creates MultiTRegistry.
  910. func NewMultiTRegistry(tGatherers ...TransactionalGatherer) *MultiTRegistry {
  911. return &MultiTRegistry{
  912. tGatherers: tGatherers,
  913. }
  914. }
  915. // Gather implements TransactionalGatherer interface.
  916. func (r *MultiTRegistry) Gather() (mfs []*dto.MetricFamily, done func(), err error) {
  917. errs := MultiError{}
  918. dFns := make([]func(), 0, len(r.tGatherers))
  919. // TODO(bwplotka): Implement concurrency for those?
  920. for _, g := range r.tGatherers {
  921. // TODO(bwplotka): Check for duplicates?
  922. m, d, err := g.Gather()
  923. errs.Append(err)
  924. mfs = append(mfs, m...)
  925. dFns = append(dFns, d)
  926. }
  927. // TODO(bwplotka): Consider sort in place, given metric family in gather is sorted already.
  928. sort.Slice(mfs, func(i, j int) bool {
  929. return *mfs[i].Name < *mfs[j].Name
  930. })
  931. return mfs, func() {
  932. for _, d := range dFns {
  933. d()
  934. }
  935. }, errs.MaybeUnwrap()
  936. }
  937. // TransactionalGatherer represents transactional gatherer that can be triggered to notify gatherer that memory
  938. // used by metric family is no longer used by a caller. This allows implementations with cache.
  939. type TransactionalGatherer interface {
  940. // Gather returns metrics in a lexicographically sorted slice
  941. // of uniquely named MetricFamily protobufs. Gather ensures that the
  942. // returned slice is valid and self-consistent so that it can be used
  943. // for valid exposition. As an exception to the strict consistency
  944. // requirements described for metric.Desc, Gather will tolerate
  945. // different sets of label names for metrics of the same metric family.
  946. //
  947. // Even if an error occurs, Gather attempts to gather as many metrics as
  948. // possible. Hence, if a non-nil error is returned, the returned
  949. // MetricFamily slice could be nil (in case of a fatal error that
  950. // prevented any meaningful metric collection) or contain a number of
  951. // MetricFamily protobufs, some of which might be incomplete, and some
  952. // might be missing altogether. The returned error (which might be a
  953. // MultiError) explains the details. Note that this is mostly useful for
  954. // debugging purposes. If the gathered protobufs are to be used for
  955. // exposition in actual monitoring, it is almost always better to not
  956. // expose an incomplete result and instead disregard the returned
  957. // MetricFamily protobufs in case the returned error is non-nil.
  958. //
  959. // Important: done is expected to be triggered (even if the error occurs!)
  960. // once caller does not need returned slice of dto.MetricFamily.
  961. Gather() (_ []*dto.MetricFamily, done func(), err error)
  962. }
  963. // ToTransactionalGatherer transforms Gatherer to transactional one with noop as done function.
  964. func ToTransactionalGatherer(g Gatherer) TransactionalGatherer {
  965. return &noTransactionGatherer{g: g}
  966. }
  967. type noTransactionGatherer struct {
  968. g Gatherer
  969. }
  970. // Gather implements TransactionalGatherer interface.
  971. func (g *noTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
  972. mfs, err := g.g.Gather()
  973. return mfs, func() {}, err
  974. }