You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

1076 lines
35 KiB

  1. // Copyright 2014 The Prometheus Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package prometheus
  14. import (
  15. "bytes"
  16. "errors"
  17. "fmt"
  18. "os"
  19. "path/filepath"
  20. "runtime"
  21. "sort"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "unicode/utf8"
  26. "github.com/prometheus/client_golang/prometheus/internal"
  27. "github.com/cespare/xxhash/v2"
  28. dto "github.com/prometheus/client_model/go"
  29. "github.com/prometheus/common/expfmt"
  30. "google.golang.org/protobuf/proto"
  31. )
  32. const (
  33. // Capacity for the channel to collect metrics and descriptors.
  34. capMetricChan = 1000
  35. capDescChan = 10
  36. )
  37. // DefaultRegisterer and DefaultGatherer are the implementations of the
  38. // Registerer and Gatherer interface a number of convenience functions in this
  39. // package act on. Initially, both variables point to the same Registry, which
  40. // has a process collector (currently on Linux only, see NewProcessCollector)
  41. // and a Go collector (see NewGoCollector, in particular the note about
  42. // stop-the-world implication with Go versions older than 1.9) already
  43. // registered. This approach to keep default instances as global state mirrors
  44. // the approach of other packages in the Go standard library. Note that there
  45. // are caveats. Change the variables with caution and only if you understand the
  46. // consequences. Users who want to avoid global state altogether should not use
  47. // the convenience functions and act on custom instances instead.
  48. var (
  49. defaultRegistry = NewRegistry()
  50. DefaultRegisterer Registerer = defaultRegistry
  51. DefaultGatherer Gatherer = defaultRegistry
  52. )
  53. func init() {
  54. MustRegister(NewProcessCollector(ProcessCollectorOpts{}))
  55. MustRegister(NewGoCollector())
  56. }
  57. // NewRegistry creates a new vanilla Registry without any Collectors
  58. // pre-registered.
  59. func NewRegistry() *Registry {
  60. return &Registry{
  61. collectorsByID: map[uint64]Collector{},
  62. descIDs: map[uint64]struct{}{},
  63. dimHashesByName: map[string]uint64{},
  64. }
  65. }
  66. // NewPedanticRegistry returns a registry that checks during collection if each
  67. // collected Metric is consistent with its reported Desc, and if the Desc has
  68. // actually been registered with the registry. Unchecked Collectors (those whose
  69. // Describe method does not yield any descriptors) are excluded from the check.
  70. //
  71. // Usually, a Registry will be happy as long as the union of all collected
  72. // Metrics is consistent and valid even if some metrics are not consistent with
  73. // their own Desc or a Desc provided by their registered Collector. Well-behaved
  74. // Collectors and Metrics will only provide consistent Descs. This Registry is
  75. // useful to test the implementation of Collectors and Metrics.
  76. func NewPedanticRegistry() *Registry {
  77. r := NewRegistry()
  78. r.pedanticChecksEnabled = true
  79. return r
  80. }
  81. // Registerer is the interface for the part of a registry in charge of
  82. // registering and unregistering. Users of custom registries should use
  83. // Registerer as type for registration purposes (rather than the Registry type
  84. // directly). In that way, they are free to use custom Registerer implementation
  85. // (e.g. for testing purposes).
  86. type Registerer interface {
  87. // Register registers a new Collector to be included in metrics
  88. // collection. It returns an error if the descriptors provided by the
  89. // Collector are invalid or if they — in combination with descriptors of
  90. // already registered Collectors — do not fulfill the consistency and
  91. // uniqueness criteria described in the documentation of metric.Desc.
  92. //
  93. // If the provided Collector is equal to a Collector already registered
  94. // (which includes the case of re-registering the same Collector), the
  95. // returned error is an instance of AlreadyRegisteredError, which
  96. // contains the previously registered Collector.
  97. //
  98. // A Collector whose Describe method does not yield any Desc is treated
  99. // as unchecked. Registration will always succeed. No check for
  100. // re-registering (see previous paragraph) is performed. Thus, the
  101. // caller is responsible for not double-registering the same unchecked
  102. // Collector, and for providing a Collector that will not cause
  103. // inconsistent metrics on collection. (This would lead to scrape
  104. // errors.)
  105. Register(Collector) error
  106. // MustRegister works like Register but registers any number of
  107. // Collectors and panics upon the first registration that causes an
  108. // error.
  109. MustRegister(...Collector)
  110. // Unregister unregisters the Collector that equals the Collector passed
  111. // in as an argument. (Two Collectors are considered equal if their
  112. // Describe method yields the same set of descriptors.) The function
  113. // returns whether a Collector was unregistered. Note that an unchecked
  114. // Collector cannot be unregistered (as its Describe method does not
  115. // yield any descriptor).
  116. //
  117. // Note that even after unregistering, it will not be possible to
  118. // register a new Collector that is inconsistent with the unregistered
  119. // Collector, e.g. a Collector collecting metrics with the same name but
  120. // a different help string. The rationale here is that the same registry
  121. // instance must only collect consistent metrics throughout its
  122. // lifetime.
  123. Unregister(Collector) bool
  124. }
  125. // Gatherer is the interface for the part of a registry in charge of gathering
  126. // the collected metrics into a number of MetricFamilies. The Gatherer interface
  127. // comes with the same general implication as described for the Registerer
  128. // interface.
  129. type Gatherer interface {
  130. // Gather calls the Collect method of the registered Collectors and then
  131. // gathers the collected metrics into a lexicographically sorted slice
  132. // of uniquely named MetricFamily protobufs. Gather ensures that the
  133. // returned slice is valid and self-consistent so that it can be used
  134. // for valid exposition. As an exception to the strict consistency
  135. // requirements described for metric.Desc, Gather will tolerate
  136. // different sets of label names for metrics of the same metric family.
  137. //
  138. // Even if an error occurs, Gather attempts to gather as many metrics as
  139. // possible. Hence, if a non-nil error is returned, the returned
  140. // MetricFamily slice could be nil (in case of a fatal error that
  141. // prevented any meaningful metric collection) or contain a number of
  142. // MetricFamily protobufs, some of which might be incomplete, and some
  143. // might be missing altogether. The returned error (which might be a
  144. // MultiError) explains the details. Note that this is mostly useful for
  145. // debugging purposes. If the gathered protobufs are to be used for
  146. // exposition in actual monitoring, it is almost always better to not
  147. // expose an incomplete result and instead disregard the returned
  148. // MetricFamily protobufs in case the returned error is non-nil.
  149. Gather() ([]*dto.MetricFamily, error)
  150. }
  151. // Register registers the provided Collector with the DefaultRegisterer.
  152. //
  153. // Register is a shortcut for DefaultRegisterer.Register(c). See there for more
  154. // details.
  155. func Register(c Collector) error {
  156. return DefaultRegisterer.Register(c)
  157. }
  158. // MustRegister registers the provided Collectors with the DefaultRegisterer and
  159. // panics if any error occurs.
  160. //
  161. // MustRegister is a shortcut for DefaultRegisterer.MustRegister(cs...). See
  162. // there for more details.
  163. func MustRegister(cs ...Collector) {
  164. DefaultRegisterer.MustRegister(cs...)
  165. }
  166. // Unregister removes the registration of the provided Collector from the
  167. // DefaultRegisterer.
  168. //
  169. // Unregister is a shortcut for DefaultRegisterer.Unregister(c). See there for
  170. // more details.
  171. func Unregister(c Collector) bool {
  172. return DefaultRegisterer.Unregister(c)
  173. }
  174. // GathererFunc turns a function into a Gatherer.
  175. type GathererFunc func() ([]*dto.MetricFamily, error)
  176. // Gather implements Gatherer.
  177. func (gf GathererFunc) Gather() ([]*dto.MetricFamily, error) {
  178. return gf()
  179. }
  180. // AlreadyRegisteredError is returned by the Register method if the Collector to
  181. // be registered has already been registered before, or a different Collector
  182. // that collects the same metrics has been registered before. Registration fails
  183. // in that case, but you can detect from the kind of error what has
  184. // happened. The error contains fields for the existing Collector and the
  185. // (rejected) new Collector that equals the existing one. This can be used to
  186. // find out if an equal Collector has been registered before and switch over to
  187. // using the old one, as demonstrated in the example.
  188. type AlreadyRegisteredError struct {
  189. ExistingCollector, NewCollector Collector
  190. }
  191. func (err AlreadyRegisteredError) Error() string {
  192. return "duplicate metrics collector registration attempted"
  193. }
  194. // MultiError is a slice of errors implementing the error interface. It is used
  195. // by a Gatherer to report multiple errors during MetricFamily gathering.
  196. type MultiError []error
  197. // Error formats the contained errors as a bullet point list, preceded by the
  198. // total number of errors. Note that this results in a multi-line string.
  199. func (errs MultiError) Error() string {
  200. if len(errs) == 0 {
  201. return ""
  202. }
  203. buf := &bytes.Buffer{}
  204. fmt.Fprintf(buf, "%d error(s) occurred:", len(errs))
  205. for _, err := range errs {
  206. fmt.Fprintf(buf, "\n* %s", err)
  207. }
  208. return buf.String()
  209. }
  210. // Append appends the provided error if it is not nil.
  211. func (errs *MultiError) Append(err error) {
  212. if err != nil {
  213. *errs = append(*errs, err)
  214. }
  215. }
  216. // MaybeUnwrap returns nil if len(errs) is 0. It returns the first and only
  217. // contained error as error if len(errs is 1). In all other cases, it returns
  218. // the MultiError directly. This is helpful for returning a MultiError in a way
  219. // that only uses the MultiError if needed.
  220. func (errs MultiError) MaybeUnwrap() error {
  221. switch len(errs) {
  222. case 0:
  223. return nil
  224. case 1:
  225. return errs[0]
  226. default:
  227. return errs
  228. }
  229. }
  230. // Registry registers Prometheus collectors, collects their metrics, and gathers
  231. // them into MetricFamilies for exposition. It implements Registerer, Gatherer,
  232. // and Collector. The zero value is not usable. Create instances with
  233. // NewRegistry or NewPedanticRegistry.
  234. //
  235. // Registry implements Collector to allow it to be used for creating groups of
  236. // metrics. See the Grouping example for how this can be done.
  237. type Registry struct {
  238. mtx sync.RWMutex
  239. collectorsByID map[uint64]Collector // ID is a hash of the descIDs.
  240. descIDs map[uint64]struct{}
  241. dimHashesByName map[string]uint64
  242. uncheckedCollectors []Collector
  243. pedanticChecksEnabled bool
  244. }
  245. // Register implements Registerer.
  246. func (r *Registry) Register(c Collector) error {
  247. var (
  248. descChan = make(chan *Desc, capDescChan)
  249. newDescIDs = map[uint64]struct{}{}
  250. newDimHashesByName = map[string]uint64{}
  251. collectorID uint64 // All desc IDs XOR'd together.
  252. duplicateDescErr error
  253. )
  254. go func() {
  255. c.Describe(descChan)
  256. close(descChan)
  257. }()
  258. r.mtx.Lock()
  259. defer func() {
  260. // Drain channel in case of premature return to not leak a goroutine.
  261. for range descChan {
  262. }
  263. r.mtx.Unlock()
  264. }()
  265. // Conduct various tests...
  266. for desc := range descChan {
  267. // Is the descriptor valid at all?
  268. if desc.err != nil {
  269. return fmt.Errorf("descriptor %s is invalid: %w", desc, desc.err)
  270. }
  271. // Is the descID unique?
  272. // (In other words: Is the fqName + constLabel combination unique?)
  273. if _, exists := r.descIDs[desc.id]; exists {
  274. duplicateDescErr = fmt.Errorf("descriptor %s already exists with the same fully-qualified name and const label values", desc)
  275. }
  276. // If it is not a duplicate desc in this collector, XOR it to
  277. // the collectorID. (We allow duplicate descs within the same
  278. // collector, but their existence must be a no-op.)
  279. if _, exists := newDescIDs[desc.id]; !exists {
  280. newDescIDs[desc.id] = struct{}{}
  281. collectorID ^= desc.id
  282. }
  283. // Are all the label names and the help string consistent with
  284. // previous descriptors of the same name?
  285. // First check existing descriptors...
  286. if dimHash, exists := r.dimHashesByName[desc.fqName]; exists {
  287. if dimHash != desc.dimHash {
  288. return fmt.Errorf("a previously registered descriptor with the same fully-qualified name as %s has different label names or a different help string", desc)
  289. }
  290. } else {
  291. // ...then check the new descriptors already seen.
  292. if dimHash, exists := newDimHashesByName[desc.fqName]; exists {
  293. if dimHash != desc.dimHash {
  294. return fmt.Errorf("descriptors reported by collector have inconsistent label names or help strings for the same fully-qualified name, offender is %s", desc)
  295. }
  296. } else {
  297. newDimHashesByName[desc.fqName] = desc.dimHash
  298. }
  299. }
  300. }
  301. // A Collector yielding no Desc at all is considered unchecked.
  302. if len(newDescIDs) == 0 {
  303. r.uncheckedCollectors = append(r.uncheckedCollectors, c)
  304. return nil
  305. }
  306. if existing, exists := r.collectorsByID[collectorID]; exists {
  307. switch e := existing.(type) {
  308. case *wrappingCollector:
  309. return AlreadyRegisteredError{
  310. ExistingCollector: e.unwrapRecursively(),
  311. NewCollector: c,
  312. }
  313. default:
  314. return AlreadyRegisteredError{
  315. ExistingCollector: e,
  316. NewCollector: c,
  317. }
  318. }
  319. }
  320. // If the collectorID is new, but at least one of the descs existed
  321. // before, we are in trouble.
  322. if duplicateDescErr != nil {
  323. return duplicateDescErr
  324. }
  325. // Only after all tests have passed, actually register.
  326. r.collectorsByID[collectorID] = c
  327. for hash := range newDescIDs {
  328. r.descIDs[hash] = struct{}{}
  329. }
  330. for name, dimHash := range newDimHashesByName {
  331. r.dimHashesByName[name] = dimHash
  332. }
  333. return nil
  334. }
  335. // Unregister implements Registerer.
  336. func (r *Registry) Unregister(c Collector) bool {
  337. var (
  338. descChan = make(chan *Desc, capDescChan)
  339. descIDs = map[uint64]struct{}{}
  340. collectorID uint64 // All desc IDs XOR'd together.
  341. )
  342. go func() {
  343. c.Describe(descChan)
  344. close(descChan)
  345. }()
  346. for desc := range descChan {
  347. if _, exists := descIDs[desc.id]; !exists {
  348. collectorID ^= desc.id
  349. descIDs[desc.id] = struct{}{}
  350. }
  351. }
  352. r.mtx.RLock()
  353. if _, exists := r.collectorsByID[collectorID]; !exists {
  354. r.mtx.RUnlock()
  355. return false
  356. }
  357. r.mtx.RUnlock()
  358. r.mtx.Lock()
  359. defer r.mtx.Unlock()
  360. delete(r.collectorsByID, collectorID)
  361. for id := range descIDs {
  362. delete(r.descIDs, id)
  363. }
  364. // dimHashesByName is left untouched as those must be consistent
  365. // throughout the lifetime of a program.
  366. return true
  367. }
  368. // MustRegister implements Registerer.
  369. func (r *Registry) MustRegister(cs ...Collector) {
  370. for _, c := range cs {
  371. if err := r.Register(c); err != nil {
  372. panic(err)
  373. }
  374. }
  375. }
  376. // Gather implements Gatherer.
  377. func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
  378. r.mtx.RLock()
  379. if len(r.collectorsByID) == 0 && len(r.uncheckedCollectors) == 0 {
  380. // Fast path.
  381. r.mtx.RUnlock()
  382. return nil, nil
  383. }
  384. var (
  385. checkedMetricChan = make(chan Metric, capMetricChan)
  386. uncheckedMetricChan = make(chan Metric, capMetricChan)
  387. metricHashes = map[uint64]struct{}{}
  388. wg sync.WaitGroup
  389. errs MultiError // The collected errors to return in the end.
  390. registeredDescIDs map[uint64]struct{} // Only used for pedantic checks
  391. )
  392. goroutineBudget := len(r.collectorsByID) + len(r.uncheckedCollectors)
  393. metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName))
  394. checkedCollectors := make(chan Collector, len(r.collectorsByID))
  395. uncheckedCollectors := make(chan Collector, len(r.uncheckedCollectors))
  396. for _, collector := range r.collectorsByID {
  397. checkedCollectors <- collector
  398. }
  399. for _, collector := range r.uncheckedCollectors {
  400. uncheckedCollectors <- collector
  401. }
  402. // In case pedantic checks are enabled, we have to copy the map before
  403. // giving up the RLock.
  404. if r.pedanticChecksEnabled {
  405. registeredDescIDs = make(map[uint64]struct{}, len(r.descIDs))
  406. for id := range r.descIDs {
  407. registeredDescIDs[id] = struct{}{}
  408. }
  409. }
  410. r.mtx.RUnlock()
  411. wg.Add(goroutineBudget)
  412. collectWorker := func() {
  413. for {
  414. select {
  415. case collector := <-checkedCollectors:
  416. collector.Collect(checkedMetricChan)
  417. case collector := <-uncheckedCollectors:
  418. collector.Collect(uncheckedMetricChan)
  419. default:
  420. return
  421. }
  422. wg.Done()
  423. }
  424. }
  425. // Start the first worker now to make sure at least one is running.
  426. go collectWorker()
  427. goroutineBudget--
  428. // Close checkedMetricChan and uncheckedMetricChan once all collectors
  429. // are collected.
  430. go func() {
  431. wg.Wait()
  432. close(checkedMetricChan)
  433. close(uncheckedMetricChan)
  434. }()
  435. // Drain checkedMetricChan and uncheckedMetricChan in case of premature return.
  436. defer func() {
  437. if checkedMetricChan != nil {
  438. for range checkedMetricChan {
  439. }
  440. }
  441. if uncheckedMetricChan != nil {
  442. for range uncheckedMetricChan {
  443. }
  444. }
  445. }()
  446. // Copy the channel references so we can nil them out later to remove
  447. // them from the select statements below.
  448. cmc := checkedMetricChan
  449. umc := uncheckedMetricChan
  450. for {
  451. select {
  452. case metric, ok := <-cmc:
  453. if !ok {
  454. cmc = nil
  455. break
  456. }
  457. errs.Append(processMetric(
  458. metric, metricFamiliesByName,
  459. metricHashes,
  460. registeredDescIDs,
  461. ))
  462. case metric, ok := <-umc:
  463. if !ok {
  464. umc = nil
  465. break
  466. }
  467. errs.Append(processMetric(
  468. metric, metricFamiliesByName,
  469. metricHashes,
  470. nil,
  471. ))
  472. default:
  473. if goroutineBudget <= 0 || len(checkedCollectors)+len(uncheckedCollectors) == 0 {
  474. // All collectors are already being worked on or
  475. // we have already as many goroutines started as
  476. // there are collectors. Do the same as above,
  477. // just without the default.
  478. select {
  479. case metric, ok := <-cmc:
  480. if !ok {
  481. cmc = nil
  482. break
  483. }
  484. errs.Append(processMetric(
  485. metric, metricFamiliesByName,
  486. metricHashes,
  487. registeredDescIDs,
  488. ))
  489. case metric, ok := <-umc:
  490. if !ok {
  491. umc = nil
  492. break
  493. }
  494. errs.Append(processMetric(
  495. metric, metricFamiliesByName,
  496. metricHashes,
  497. nil,
  498. ))
  499. }
  500. break
  501. }
  502. // Start more workers.
  503. go collectWorker()
  504. goroutineBudget--
  505. runtime.Gosched()
  506. }
  507. // Once both checkedMetricChan and uncheckdMetricChan are closed
  508. // and drained, the contraption above will nil out cmc and umc,
  509. // and then we can leave the collect loop here.
  510. if cmc == nil && umc == nil {
  511. break
  512. }
  513. }
  514. return internal.NormalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap()
  515. }
  516. // Describe implements Collector.
  517. func (r *Registry) Describe(ch chan<- *Desc) {
  518. r.mtx.RLock()
  519. defer r.mtx.RUnlock()
  520. // Only report the checked Collectors; unchecked collectors don't report any
  521. // Desc.
  522. for _, c := range r.collectorsByID {
  523. c.Describe(ch)
  524. }
  525. }
  526. // Collect implements Collector.
  527. func (r *Registry) Collect(ch chan<- Metric) {
  528. r.mtx.RLock()
  529. defer r.mtx.RUnlock()
  530. for _, c := range r.collectorsByID {
  531. c.Collect(ch)
  532. }
  533. for _, c := range r.uncheckedCollectors {
  534. c.Collect(ch)
  535. }
  536. }
  537. // WriteToTextfile calls Gather on the provided Gatherer, encodes the result in the
  538. // Prometheus text format, and writes it to a temporary file. Upon success, the
  539. // temporary file is renamed to the provided filename.
  540. //
  541. // This is intended for use with the textfile collector of the node exporter.
  542. // Note that the node exporter expects the filename to be suffixed with ".prom".
  543. func WriteToTextfile(filename string, g Gatherer) error {
  544. tmp, err := os.CreateTemp(filepath.Dir(filename), filepath.Base(filename))
  545. if err != nil {
  546. return err
  547. }
  548. defer os.Remove(tmp.Name())
  549. mfs, err := g.Gather()
  550. if err != nil {
  551. return err
  552. }
  553. for _, mf := range mfs {
  554. if _, err := expfmt.MetricFamilyToText(tmp, mf); err != nil {
  555. return err
  556. }
  557. }
  558. if err := tmp.Close(); err != nil {
  559. return err
  560. }
  561. if err := os.Chmod(tmp.Name(), 0o644); err != nil {
  562. return err
  563. }
  564. return os.Rename(tmp.Name(), filename)
  565. }
  566. // processMetric is an internal helper method only used by the Gather method.
  567. func processMetric(
  568. metric Metric,
  569. metricFamiliesByName map[string]*dto.MetricFamily,
  570. metricHashes map[uint64]struct{},
  571. registeredDescIDs map[uint64]struct{},
  572. ) error {
  573. desc := metric.Desc()
  574. // Wrapped metrics collected by an unchecked Collector can have an
  575. // invalid Desc.
  576. if desc.err != nil {
  577. return desc.err
  578. }
  579. dtoMetric := &dto.Metric{}
  580. if err := metric.Write(dtoMetric); err != nil {
  581. return fmt.Errorf("error collecting metric %v: %w", desc, err)
  582. }
  583. metricFamily, ok := metricFamiliesByName[desc.fqName]
  584. if ok { // Existing name.
  585. if metricFamily.GetHelp() != desc.help {
  586. return fmt.Errorf(
  587. "collected metric %s %s has help %q but should have %q",
  588. desc.fqName, dtoMetric, desc.help, metricFamily.GetHelp(),
  589. )
  590. }
  591. // TODO(beorn7): Simplify switch once Desc has type.
  592. switch metricFamily.GetType() {
  593. case dto.MetricType_COUNTER:
  594. if dtoMetric.Counter == nil {
  595. return fmt.Errorf(
  596. "collected metric %s %s should be a Counter",
  597. desc.fqName, dtoMetric,
  598. )
  599. }
  600. case dto.MetricType_GAUGE:
  601. if dtoMetric.Gauge == nil {
  602. return fmt.Errorf(
  603. "collected metric %s %s should be a Gauge",
  604. desc.fqName, dtoMetric,
  605. )
  606. }
  607. case dto.MetricType_SUMMARY:
  608. if dtoMetric.Summary == nil {
  609. return fmt.Errorf(
  610. "collected metric %s %s should be a Summary",
  611. desc.fqName, dtoMetric,
  612. )
  613. }
  614. case dto.MetricType_UNTYPED:
  615. if dtoMetric.Untyped == nil {
  616. return fmt.Errorf(
  617. "collected metric %s %s should be Untyped",
  618. desc.fqName, dtoMetric,
  619. )
  620. }
  621. case dto.MetricType_HISTOGRAM:
  622. if dtoMetric.Histogram == nil {
  623. return fmt.Errorf(
  624. "collected metric %s %s should be a Histogram",
  625. desc.fqName, dtoMetric,
  626. )
  627. }
  628. default:
  629. panic("encountered MetricFamily with invalid type")
  630. }
  631. } else { // New name.
  632. metricFamily = &dto.MetricFamily{}
  633. metricFamily.Name = proto.String(desc.fqName)
  634. metricFamily.Help = proto.String(desc.help)
  635. // TODO(beorn7): Simplify switch once Desc has type.
  636. switch {
  637. case dtoMetric.Gauge != nil:
  638. metricFamily.Type = dto.MetricType_GAUGE.Enum()
  639. case dtoMetric.Counter != nil:
  640. metricFamily.Type = dto.MetricType_COUNTER.Enum()
  641. case dtoMetric.Summary != nil:
  642. metricFamily.Type = dto.MetricType_SUMMARY.Enum()
  643. case dtoMetric.Untyped != nil:
  644. metricFamily.Type = dto.MetricType_UNTYPED.Enum()
  645. case dtoMetric.Histogram != nil:
  646. metricFamily.Type = dto.MetricType_HISTOGRAM.Enum()
  647. default:
  648. return fmt.Errorf("empty metric collected: %s", dtoMetric)
  649. }
  650. if err := checkSuffixCollisions(metricFamily, metricFamiliesByName); err != nil {
  651. return err
  652. }
  653. metricFamiliesByName[desc.fqName] = metricFamily
  654. }
  655. if err := checkMetricConsistency(metricFamily, dtoMetric, metricHashes); err != nil {
  656. return err
  657. }
  658. if registeredDescIDs != nil {
  659. // Is the desc registered at all?
  660. if _, exist := registeredDescIDs[desc.id]; !exist {
  661. return fmt.Errorf(
  662. "collected metric %s %s with unregistered descriptor %s",
  663. metricFamily.GetName(), dtoMetric, desc,
  664. )
  665. }
  666. if err := checkDescConsistency(metricFamily, dtoMetric, desc); err != nil {
  667. return err
  668. }
  669. }
  670. metricFamily.Metric = append(metricFamily.Metric, dtoMetric)
  671. return nil
  672. }
  673. // Gatherers is a slice of Gatherer instances that implements the Gatherer
  674. // interface itself. Its Gather method calls Gather on all Gatherers in the
  675. // slice in order and returns the merged results. Errors returned from the
  676. // Gather calls are all returned in a flattened MultiError. Duplicate and
  677. // inconsistent Metrics are skipped (first occurrence in slice order wins) and
  678. // reported in the returned error.
  679. //
  680. // Gatherers can be used to merge the Gather results from multiple
  681. // Registries. It also provides a way to directly inject existing MetricFamily
  682. // protobufs into the gathering by creating a custom Gatherer with a Gather
  683. // method that simply returns the existing MetricFamily protobufs. Note that no
  684. // registration is involved (in contrast to Collector registration), so
  685. // obviously registration-time checks cannot happen. Any inconsistencies between
  686. // the gathered MetricFamilies are reported as errors by the Gather method, and
  687. // inconsistent Metrics are dropped. Invalid parts of the MetricFamilies
  688. // (e.g. syntactically invalid metric or label names) will go undetected.
  689. type Gatherers []Gatherer
  690. // Gather implements Gatherer.
  691. func (gs Gatherers) Gather() ([]*dto.MetricFamily, error) {
  692. var (
  693. metricFamiliesByName = map[string]*dto.MetricFamily{}
  694. metricHashes = map[uint64]struct{}{}
  695. errs MultiError // The collected errors to return in the end.
  696. )
  697. for i, g := range gs {
  698. mfs, err := g.Gather()
  699. if err != nil {
  700. multiErr := MultiError{}
  701. if errors.As(err, &multiErr) {
  702. for _, err := range multiErr {
  703. errs = append(errs, fmt.Errorf("[from Gatherer #%d] %w", i+1, err))
  704. }
  705. } else {
  706. errs = append(errs, fmt.Errorf("[from Gatherer #%d] %w", i+1, err))
  707. }
  708. }
  709. for _, mf := range mfs {
  710. existingMF, exists := metricFamiliesByName[mf.GetName()]
  711. if exists {
  712. if existingMF.GetHelp() != mf.GetHelp() {
  713. errs = append(errs, fmt.Errorf(
  714. "gathered metric family %s has help %q but should have %q",
  715. mf.GetName(), mf.GetHelp(), existingMF.GetHelp(),
  716. ))
  717. continue
  718. }
  719. if existingMF.GetType() != mf.GetType() {
  720. errs = append(errs, fmt.Errorf(
  721. "gathered metric family %s has type %s but should have %s",
  722. mf.GetName(), mf.GetType(), existingMF.GetType(),
  723. ))
  724. continue
  725. }
  726. } else {
  727. existingMF = &dto.MetricFamily{}
  728. existingMF.Name = mf.Name
  729. existingMF.Help = mf.Help
  730. existingMF.Type = mf.Type
  731. if err := checkSuffixCollisions(existingMF, metricFamiliesByName); err != nil {
  732. errs = append(errs, err)
  733. continue
  734. }
  735. metricFamiliesByName[mf.GetName()] = existingMF
  736. }
  737. for _, m := range mf.Metric {
  738. if err := checkMetricConsistency(existingMF, m, metricHashes); err != nil {
  739. errs = append(errs, err)
  740. continue
  741. }
  742. existingMF.Metric = append(existingMF.Metric, m)
  743. }
  744. }
  745. }
  746. return internal.NormalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap()
  747. }
  748. // checkSuffixCollisions checks for collisions with the “magic” suffixes the
  749. // Prometheus text format and the internal metric representation of the
  750. // Prometheus server add while flattening Summaries and Histograms.
  751. func checkSuffixCollisions(mf *dto.MetricFamily, mfs map[string]*dto.MetricFamily) error {
  752. var (
  753. newName = mf.GetName()
  754. newType = mf.GetType()
  755. newNameWithoutSuffix = ""
  756. )
  757. switch {
  758. case strings.HasSuffix(newName, "_count"):
  759. newNameWithoutSuffix = newName[:len(newName)-6]
  760. case strings.HasSuffix(newName, "_sum"):
  761. newNameWithoutSuffix = newName[:len(newName)-4]
  762. case strings.HasSuffix(newName, "_bucket"):
  763. newNameWithoutSuffix = newName[:len(newName)-7]
  764. }
  765. if newNameWithoutSuffix != "" {
  766. if existingMF, ok := mfs[newNameWithoutSuffix]; ok {
  767. switch existingMF.GetType() {
  768. case dto.MetricType_SUMMARY:
  769. if !strings.HasSuffix(newName, "_bucket") {
  770. return fmt.Errorf(
  771. "collected metric named %q collides with previously collected summary named %q",
  772. newName, newNameWithoutSuffix,
  773. )
  774. }
  775. case dto.MetricType_HISTOGRAM:
  776. return fmt.Errorf(
  777. "collected metric named %q collides with previously collected histogram named %q",
  778. newName, newNameWithoutSuffix,
  779. )
  780. }
  781. }
  782. }
  783. if newType == dto.MetricType_SUMMARY || newType == dto.MetricType_HISTOGRAM {
  784. if _, ok := mfs[newName+"_count"]; ok {
  785. return fmt.Errorf(
  786. "collected histogram or summary named %q collides with previously collected metric named %q",
  787. newName, newName+"_count",
  788. )
  789. }
  790. if _, ok := mfs[newName+"_sum"]; ok {
  791. return fmt.Errorf(
  792. "collected histogram or summary named %q collides with previously collected metric named %q",
  793. newName, newName+"_sum",
  794. )
  795. }
  796. }
  797. if newType == dto.MetricType_HISTOGRAM {
  798. if _, ok := mfs[newName+"_bucket"]; ok {
  799. return fmt.Errorf(
  800. "collected histogram named %q collides with previously collected metric named %q",
  801. newName, newName+"_bucket",
  802. )
  803. }
  804. }
  805. return nil
  806. }
  807. // checkMetricConsistency checks if the provided Metric is consistent with the
  808. // provided MetricFamily. It also hashes the Metric labels and the MetricFamily
  809. // name. If the resulting hash is already in the provided metricHashes, an error
  810. // is returned. If not, it is added to metricHashes.
  811. func checkMetricConsistency(
  812. metricFamily *dto.MetricFamily,
  813. dtoMetric *dto.Metric,
  814. metricHashes map[uint64]struct{},
  815. ) error {
  816. name := metricFamily.GetName()
  817. // Type consistency with metric family.
  818. if metricFamily.GetType() == dto.MetricType_GAUGE && dtoMetric.Gauge == nil ||
  819. metricFamily.GetType() == dto.MetricType_COUNTER && dtoMetric.Counter == nil ||
  820. metricFamily.GetType() == dto.MetricType_SUMMARY && dtoMetric.Summary == nil ||
  821. metricFamily.GetType() == dto.MetricType_HISTOGRAM && dtoMetric.Histogram == nil ||
  822. metricFamily.GetType() == dto.MetricType_UNTYPED && dtoMetric.Untyped == nil {
  823. return fmt.Errorf(
  824. "collected metric %q { %s} is not a %s",
  825. name, dtoMetric, metricFamily.GetType(),
  826. )
  827. }
  828. previousLabelName := ""
  829. for _, labelPair := range dtoMetric.GetLabel() {
  830. labelName := labelPair.GetName()
  831. if labelName == previousLabelName {
  832. return fmt.Errorf(
  833. "collected metric %q { %s} has two or more labels with the same name: %s",
  834. name, dtoMetric, labelName,
  835. )
  836. }
  837. if !checkLabelName(labelName) {
  838. return fmt.Errorf(
  839. "collected metric %q { %s} has a label with an invalid name: %s",
  840. name, dtoMetric, labelName,
  841. )
  842. }
  843. if dtoMetric.Summary != nil && labelName == quantileLabel {
  844. return fmt.Errorf(
  845. "collected metric %q { %s} must not have an explicit %q label",
  846. name, dtoMetric, quantileLabel,
  847. )
  848. }
  849. if !utf8.ValidString(labelPair.GetValue()) {
  850. return fmt.Errorf(
  851. "collected metric %q { %s} has a label named %q whose value is not utf8: %#v",
  852. name, dtoMetric, labelName, labelPair.GetValue())
  853. }
  854. previousLabelName = labelName
  855. }
  856. // Is the metric unique (i.e. no other metric with the same name and the same labels)?
  857. h := xxhash.New()
  858. h.WriteString(name)
  859. h.Write(separatorByteSlice)
  860. // Make sure label pairs are sorted. We depend on it for the consistency
  861. // check.
  862. if !sort.IsSorted(internal.LabelPairSorter(dtoMetric.Label)) {
  863. // We cannot sort dtoMetric.Label in place as it is immutable by contract.
  864. copiedLabels := make([]*dto.LabelPair, len(dtoMetric.Label))
  865. copy(copiedLabels, dtoMetric.Label)
  866. sort.Sort(internal.LabelPairSorter(copiedLabels))
  867. dtoMetric.Label = copiedLabels
  868. }
  869. for _, lp := range dtoMetric.Label {
  870. h.WriteString(lp.GetName())
  871. h.Write(separatorByteSlice)
  872. h.WriteString(lp.GetValue())
  873. h.Write(separatorByteSlice)
  874. }
  875. if dtoMetric.TimestampMs != nil {
  876. h.WriteString(strconv.FormatInt(*(dtoMetric.TimestampMs), 10))
  877. h.Write(separatorByteSlice)
  878. }
  879. hSum := h.Sum64()
  880. if _, exists := metricHashes[hSum]; exists {
  881. return fmt.Errorf(
  882. "collected metric %q { %s} was collected before with the same name and label values",
  883. name, dtoMetric,
  884. )
  885. }
  886. metricHashes[hSum] = struct{}{}
  887. return nil
  888. }
  889. func checkDescConsistency(
  890. metricFamily *dto.MetricFamily,
  891. dtoMetric *dto.Metric,
  892. desc *Desc,
  893. ) error {
  894. // Desc help consistency with metric family help.
  895. if metricFamily.GetHelp() != desc.help {
  896. return fmt.Errorf(
  897. "collected metric %s %s has help %q but should have %q",
  898. metricFamily.GetName(), dtoMetric, metricFamily.GetHelp(), desc.help,
  899. )
  900. }
  901. // Is the desc consistent with the content of the metric?
  902. lpsFromDesc := make([]*dto.LabelPair, len(desc.constLabelPairs), len(dtoMetric.Label))
  903. copy(lpsFromDesc, desc.constLabelPairs)
  904. for _, l := range desc.variableLabels {
  905. lpsFromDesc = append(lpsFromDesc, &dto.LabelPair{
  906. Name: proto.String(l.Name),
  907. })
  908. }
  909. if len(lpsFromDesc) != len(dtoMetric.Label) {
  910. return fmt.Errorf(
  911. "labels in collected metric %s %s are inconsistent with descriptor %s",
  912. metricFamily.GetName(), dtoMetric, desc,
  913. )
  914. }
  915. sort.Sort(internal.LabelPairSorter(lpsFromDesc))
  916. for i, lpFromDesc := range lpsFromDesc {
  917. lpFromMetric := dtoMetric.Label[i]
  918. if lpFromDesc.GetName() != lpFromMetric.GetName() ||
  919. lpFromDesc.Value != nil && lpFromDesc.GetValue() != lpFromMetric.GetValue() {
  920. return fmt.Errorf(
  921. "labels in collected metric %s %s are inconsistent with descriptor %s",
  922. metricFamily.GetName(), dtoMetric, desc,
  923. )
  924. }
  925. }
  926. return nil
  927. }
  928. var _ TransactionalGatherer = &MultiTRegistry{}
  929. // MultiTRegistry is a TransactionalGatherer that joins gathered metrics from multiple
  930. // transactional gatherers.
  931. //
  932. // It is caller responsibility to ensure two registries have mutually exclusive metric families,
  933. // no deduplication will happen.
  934. type MultiTRegistry struct {
  935. tGatherers []TransactionalGatherer
  936. }
  937. // NewMultiTRegistry creates MultiTRegistry.
  938. func NewMultiTRegistry(tGatherers ...TransactionalGatherer) *MultiTRegistry {
  939. return &MultiTRegistry{
  940. tGatherers: tGatherers,
  941. }
  942. }
  943. // Gather implements TransactionalGatherer interface.
  944. func (r *MultiTRegistry) Gather() (mfs []*dto.MetricFamily, done func(), err error) {
  945. errs := MultiError{}
  946. dFns := make([]func(), 0, len(r.tGatherers))
  947. // TODO(bwplotka): Implement concurrency for those?
  948. for _, g := range r.tGatherers {
  949. // TODO(bwplotka): Check for duplicates?
  950. m, d, err := g.Gather()
  951. errs.Append(err)
  952. mfs = append(mfs, m...)
  953. dFns = append(dFns, d)
  954. }
  955. // TODO(bwplotka): Consider sort in place, given metric family in gather is sorted already.
  956. sort.Slice(mfs, func(i, j int) bool {
  957. return *mfs[i].Name < *mfs[j].Name
  958. })
  959. return mfs, func() {
  960. for _, d := range dFns {
  961. d()
  962. }
  963. }, errs.MaybeUnwrap()
  964. }
  965. // TransactionalGatherer represents transactional gatherer that can be triggered to notify gatherer that memory
  966. // used by metric family is no longer used by a caller. This allows implementations with cache.
  967. type TransactionalGatherer interface {
  968. // Gather returns metrics in a lexicographically sorted slice
  969. // of uniquely named MetricFamily protobufs. Gather ensures that the
  970. // returned slice is valid and self-consistent so that it can be used
  971. // for valid exposition. As an exception to the strict consistency
  972. // requirements described for metric.Desc, Gather will tolerate
  973. // different sets of label names for metrics of the same metric family.
  974. //
  975. // Even if an error occurs, Gather attempts to gather as many metrics as
  976. // possible. Hence, if a non-nil error is returned, the returned
  977. // MetricFamily slice could be nil (in case of a fatal error that
  978. // prevented any meaningful metric collection) or contain a number of
  979. // MetricFamily protobufs, some of which might be incomplete, and some
  980. // might be missing altogether. The returned error (which might be a
  981. // MultiError) explains the details. Note that this is mostly useful for
  982. // debugging purposes. If the gathered protobufs are to be used for
  983. // exposition in actual monitoring, it is almost always better to not
  984. // expose an incomplete result and instead disregard the returned
  985. // MetricFamily protobufs in case the returned error is non-nil.
  986. //
  987. // Important: done is expected to be triggered (even if the error occurs!)
  988. // once caller does not need returned slice of dto.MetricFamily.
  989. Gather() (_ []*dto.MetricFamily, done func(), err error)
  990. }
  991. // ToTransactionalGatherer transforms Gatherer to transactional one with noop as done function.
  992. func ToTransactionalGatherer(g Gatherer) TransactionalGatherer {
  993. return &noTransactionGatherer{g: g}
  994. }
  995. type noTransactionGatherer struct {
  996. g Gatherer
  997. }
  998. // Gather implements TransactionalGatherer interface.
  999. func (g *noTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
  1000. mfs, err := g.g.Gather()
  1001. return mfs, func() {}, err
  1002. }