Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.
 
 
 

190 lignes
4.2 KiB

  1. // Copyright 2017, OpenCensus Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //
  15. package view
  16. import (
  17. "errors"
  18. "fmt"
  19. "strings"
  20. "time"
  21. "go.opencensus.io/exemplar"
  22. "go.opencensus.io/stats"
  23. "go.opencensus.io/stats/internal"
  24. "go.opencensus.io/tag"
  25. )
  26. type command interface {
  27. handleCommand(w *worker)
  28. }
  29. // getViewByNameReq is the command to get a view given its name.
  30. type getViewByNameReq struct {
  31. name string
  32. c chan *getViewByNameResp
  33. }
  34. type getViewByNameResp struct {
  35. v *View
  36. }
  37. func (cmd *getViewByNameReq) handleCommand(w *worker) {
  38. v := w.views[cmd.name]
  39. if v == nil {
  40. cmd.c <- &getViewByNameResp{nil}
  41. return
  42. }
  43. cmd.c <- &getViewByNameResp{v.view}
  44. }
  45. // registerViewReq is the command to register a view.
  46. type registerViewReq struct {
  47. views []*View
  48. err chan error
  49. }
  50. func (cmd *registerViewReq) handleCommand(w *worker) {
  51. for _, v := range cmd.views {
  52. if err := v.canonicalize(); err != nil {
  53. cmd.err <- err
  54. return
  55. }
  56. }
  57. var errstr []string
  58. for _, view := range cmd.views {
  59. vi, err := w.tryRegisterView(view)
  60. if err != nil {
  61. errstr = append(errstr, fmt.Sprintf("%s: %v", view.Name, err))
  62. continue
  63. }
  64. internal.SubscriptionReporter(view.Measure.Name())
  65. vi.subscribe()
  66. }
  67. if len(errstr) > 0 {
  68. cmd.err <- errors.New(strings.Join(errstr, "\n"))
  69. } else {
  70. cmd.err <- nil
  71. }
  72. }
  73. // unregisterFromViewReq is the command to unregister to a view. Has no
  74. // impact on the data collection for client that are pulling data from the
  75. // library.
  76. type unregisterFromViewReq struct {
  77. views []string
  78. done chan struct{}
  79. }
  80. func (cmd *unregisterFromViewReq) handleCommand(w *worker) {
  81. for _, name := range cmd.views {
  82. vi, ok := w.views[name]
  83. if !ok {
  84. continue
  85. }
  86. // Report pending data for this view before removing it.
  87. w.reportView(vi, time.Now())
  88. vi.unsubscribe()
  89. if !vi.isSubscribed() {
  90. // this was the last subscription and view is not collecting anymore.
  91. // The collected data can be cleared.
  92. vi.clearRows()
  93. }
  94. delete(w.views, name)
  95. }
  96. cmd.done <- struct{}{}
  97. }
  98. // retrieveDataReq is the command to retrieve data for a view.
  99. type retrieveDataReq struct {
  100. now time.Time
  101. v string
  102. c chan *retrieveDataResp
  103. }
  104. type retrieveDataResp struct {
  105. rows []*Row
  106. err error
  107. }
  108. func (cmd *retrieveDataReq) handleCommand(w *worker) {
  109. vi, ok := w.views[cmd.v]
  110. if !ok {
  111. cmd.c <- &retrieveDataResp{
  112. nil,
  113. fmt.Errorf("cannot retrieve data; view %q is not registered", cmd.v),
  114. }
  115. return
  116. }
  117. if !vi.isSubscribed() {
  118. cmd.c <- &retrieveDataResp{
  119. nil,
  120. fmt.Errorf("cannot retrieve data; view %q has no subscriptions or collection is not forcibly started", cmd.v),
  121. }
  122. return
  123. }
  124. cmd.c <- &retrieveDataResp{
  125. vi.collectedRows(),
  126. nil,
  127. }
  128. }
  129. // recordReq is the command to record data related to multiple measures
  130. // at once.
  131. type recordReq struct {
  132. tm *tag.Map
  133. ms []stats.Measurement
  134. attachments map[string]string
  135. t time.Time
  136. }
  137. func (cmd *recordReq) handleCommand(w *worker) {
  138. for _, m := range cmd.ms {
  139. if (m == stats.Measurement{}) { // not registered
  140. continue
  141. }
  142. ref := w.getMeasureRef(m.Measure().Name())
  143. for v := range ref.views {
  144. e := &exemplar.Exemplar{
  145. Value: m.Value(),
  146. Timestamp: cmd.t,
  147. Attachments: cmd.attachments,
  148. }
  149. v.addSample(cmd.tm, e)
  150. }
  151. }
  152. }
  153. // setReportingPeriodReq is the command to modify the duration between
  154. // reporting the collected data to the registered clients.
  155. type setReportingPeriodReq struct {
  156. d time.Duration
  157. c chan bool
  158. }
  159. func (cmd *setReportingPeriodReq) handleCommand(w *worker) {
  160. w.timer.Stop()
  161. if cmd.d <= 0 {
  162. w.timer = time.NewTicker(defaultReportingDuration)
  163. } else {
  164. w.timer = time.NewTicker(cmd.d)
  165. }
  166. cmd.c <- true
  167. }