You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

225 lines
5.4 KiB

  1. // Copyright 2017, OpenCensus Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //
  15. package view
  16. import (
  17. "fmt"
  18. "time"
  19. "go.opencensus.io/stats"
  20. "go.opencensus.io/stats/internal"
  21. "go.opencensus.io/tag"
  22. )
  23. func init() {
  24. defaultWorker = newWorker()
  25. go defaultWorker.start()
  26. internal.DefaultRecorder = record
  27. }
  28. type measureRef struct {
  29. measure string
  30. views map[*viewInternal]struct{}
  31. }
  32. type worker struct {
  33. measures map[string]*measureRef
  34. views map[string]*viewInternal
  35. startTimes map[*viewInternal]time.Time
  36. timer *time.Ticker
  37. c chan command
  38. quit, done chan bool
  39. }
  40. var defaultWorker *worker
  41. var defaultReportingDuration = 10 * time.Second
  42. // Find returns a registered view associated with this name.
  43. // If no registered view is found, nil is returned.
  44. func Find(name string) (v *View) {
  45. req := &getViewByNameReq{
  46. name: name,
  47. c: make(chan *getViewByNameResp),
  48. }
  49. defaultWorker.c <- req
  50. resp := <-req.c
  51. return resp.v
  52. }
  53. // Register begins collecting data for the given views.
  54. // Once a view is registered, it reports data to the registered exporters.
  55. func Register(views ...*View) error {
  56. req := &registerViewReq{
  57. views: views,
  58. err: make(chan error),
  59. }
  60. defaultWorker.c <- req
  61. return <-req.err
  62. }
  63. // Unregister the given views. Data will not longer be exported for these views
  64. // after Unregister returns.
  65. // It is not necessary to unregister from views you expect to collect for the
  66. // duration of your program execution.
  67. func Unregister(views ...*View) {
  68. names := make([]string, len(views))
  69. for i := range views {
  70. names[i] = views[i].Name
  71. }
  72. req := &unregisterFromViewReq{
  73. views: names,
  74. done: make(chan struct{}),
  75. }
  76. defaultWorker.c <- req
  77. <-req.done
  78. }
  79. // RetrieveData gets a snapshot of the data collected for the the view registered
  80. // with the given name. It is intended for testing only.
  81. func RetrieveData(viewName string) ([]*Row, error) {
  82. req := &retrieveDataReq{
  83. now: time.Now(),
  84. v: viewName,
  85. c: make(chan *retrieveDataResp),
  86. }
  87. defaultWorker.c <- req
  88. resp := <-req.c
  89. return resp.rows, resp.err
  90. }
  91. func record(tags *tag.Map, ms interface{}, attachments map[string]string) {
  92. req := &recordReq{
  93. tm: tags,
  94. ms: ms.([]stats.Measurement),
  95. attachments: attachments,
  96. t: time.Now(),
  97. }
  98. defaultWorker.c <- req
  99. }
  100. // SetReportingPeriod sets the interval between reporting aggregated views in
  101. // the program. If duration is less than or equal to zero, it enables the
  102. // default behavior.
  103. //
  104. // Note: each exporter makes different promises about what the lowest supported
  105. // duration is. For example, the Stackdriver exporter recommends a value no
  106. // lower than 1 minute. Consult each exporter per your needs.
  107. func SetReportingPeriod(d time.Duration) {
  108. // TODO(acetechnologist): ensure that the duration d is more than a certain
  109. // value. e.g. 1s
  110. req := &setReportingPeriodReq{
  111. d: d,
  112. c: make(chan bool),
  113. }
  114. defaultWorker.c <- req
  115. <-req.c // don't return until the timer is set to the new duration.
  116. }
  117. func newWorker() *worker {
  118. return &worker{
  119. measures: make(map[string]*measureRef),
  120. views: make(map[string]*viewInternal),
  121. startTimes: make(map[*viewInternal]time.Time),
  122. timer: time.NewTicker(defaultReportingDuration),
  123. c: make(chan command, 1024),
  124. quit: make(chan bool),
  125. done: make(chan bool),
  126. }
  127. }
  128. func (w *worker) start() {
  129. for {
  130. select {
  131. case cmd := <-w.c:
  132. cmd.handleCommand(w)
  133. case <-w.timer.C:
  134. w.reportUsage(time.Now())
  135. case <-w.quit:
  136. w.timer.Stop()
  137. close(w.c)
  138. w.done <- true
  139. return
  140. }
  141. }
  142. }
  143. func (w *worker) stop() {
  144. w.quit <- true
  145. <-w.done
  146. }
  147. func (w *worker) getMeasureRef(name string) *measureRef {
  148. if mr, ok := w.measures[name]; ok {
  149. return mr
  150. }
  151. mr := &measureRef{
  152. measure: name,
  153. views: make(map[*viewInternal]struct{}),
  154. }
  155. w.measures[name] = mr
  156. return mr
  157. }
  158. func (w *worker) tryRegisterView(v *View) (*viewInternal, error) {
  159. vi, err := newViewInternal(v)
  160. if err != nil {
  161. return nil, err
  162. }
  163. if x, ok := w.views[vi.view.Name]; ok {
  164. if !x.view.same(vi.view) {
  165. return nil, fmt.Errorf("cannot register view %q; a different view with the same name is already registered", v.Name)
  166. }
  167. // the view is already registered so there is nothing to do and the
  168. // command is considered successful.
  169. return x, nil
  170. }
  171. w.views[vi.view.Name] = vi
  172. ref := w.getMeasureRef(vi.view.Measure.Name())
  173. ref.views[vi] = struct{}{}
  174. return vi, nil
  175. }
  176. func (w *worker) reportView(v *viewInternal, now time.Time) {
  177. if !v.isSubscribed() {
  178. return
  179. }
  180. rows := v.collectedRows()
  181. _, ok := w.startTimes[v]
  182. if !ok {
  183. w.startTimes[v] = now
  184. }
  185. viewData := &Data{
  186. View: v.view,
  187. Start: w.startTimes[v],
  188. End: time.Now(),
  189. Rows: rows,
  190. }
  191. exportersMu.Lock()
  192. for e := range exporters {
  193. e.ExportView(viewData)
  194. }
  195. exportersMu.Unlock()
  196. }
  197. func (w *worker) reportUsage(now time.Time) {
  198. for _, v := range w.views {
  199. w.reportView(v, now)
  200. }
  201. }