You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

436 lines
8.6 KiB

  1. // Copyright 2017, OpenCensus Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //
  15. package view
  16. import (
  17. "context"
  18. "errors"
  19. "sync"
  20. "testing"
  21. "time"
  22. "go.opencensus.io/stats"
  23. "go.opencensus.io/tag"
  24. )
  25. func Test_Worker_ViewRegistration(t *testing.T) {
  26. someError := errors.New("some error")
  27. sc1 := make(chan *Data)
  28. type registration struct {
  29. c chan *Data
  30. vID string
  31. err error
  32. }
  33. type testCase struct {
  34. label string
  35. registrations []registration
  36. }
  37. tcs := []testCase{
  38. {
  39. "register v1ID",
  40. []registration{
  41. {
  42. sc1,
  43. "v1ID",
  44. nil,
  45. },
  46. },
  47. },
  48. {
  49. "register v1ID+v2ID",
  50. []registration{
  51. {
  52. sc1,
  53. "v1ID",
  54. nil,
  55. },
  56. },
  57. },
  58. {
  59. "register to v1ID; ??? to v1ID and view with same ID",
  60. []registration{
  61. {
  62. sc1,
  63. "v1ID",
  64. nil,
  65. },
  66. {
  67. sc1,
  68. "v1SameNameID",
  69. someError,
  70. },
  71. },
  72. },
  73. }
  74. mf1 := stats.Float64("MF1/Test_Worker_ViewSubscription", "desc MF1", "unit")
  75. mf2 := stats.Float64("MF2/Test_Worker_ViewSubscription", "desc MF2", "unit")
  76. for _, tc := range tcs {
  77. t.Run(tc.label, func(t *testing.T) {
  78. restart()
  79. views := map[string]*View{
  80. "v1ID": {
  81. Name: "VF1",
  82. Measure: mf1,
  83. Aggregation: Count(),
  84. },
  85. "v1SameNameID": {
  86. Name: "VF1",
  87. Description: "desc duplicate name VF1",
  88. Measure: mf1,
  89. Aggregation: Sum(),
  90. },
  91. "v2ID": {
  92. Name: "VF2",
  93. Measure: mf2,
  94. Aggregation: Count(),
  95. },
  96. "vNilID": nil,
  97. }
  98. for _, r := range tc.registrations {
  99. v := views[r.vID]
  100. err := Register(v)
  101. if (err != nil) != (r.err != nil) {
  102. t.Errorf("%v: Register() = %v, want %v", tc.label, err, r.err)
  103. }
  104. }
  105. })
  106. }
  107. }
  108. func Test_Worker_RecordFloat64(t *testing.T) {
  109. restart()
  110. someError := errors.New("some error")
  111. m := stats.Float64("Test_Worker_RecordFloat64/MF1", "desc MF1", "unit")
  112. k1, _ := tag.NewKey("k1")
  113. k2, _ := tag.NewKey("k2")
  114. ctx, err := tag.New(context.Background(),
  115. tag.Insert(k1, "v1"),
  116. tag.Insert(k2, "v2"),
  117. )
  118. if err != nil {
  119. t.Fatal(err)
  120. }
  121. v1 := &View{"VF1", "desc VF1", []tag.Key{k1, k2}, m, Count()}
  122. v2 := &View{"VF2", "desc VF2", []tag.Key{k1, k2}, m, Count()}
  123. type want struct {
  124. v *View
  125. rows []*Row
  126. err error
  127. }
  128. type testCase struct {
  129. label string
  130. registrations []*View
  131. records []float64
  132. wants []want
  133. }
  134. tcs := []testCase{
  135. {
  136. label: "0",
  137. registrations: []*View{},
  138. records: []float64{1, 1},
  139. wants: []want{{v1, nil, someError}, {v2, nil, someError}},
  140. },
  141. {
  142. label: "1",
  143. registrations: []*View{v1},
  144. records: []float64{1, 1},
  145. wants: []want{
  146. {
  147. v1,
  148. []*Row{
  149. {
  150. []tag.Tag{{Key: k1, Value: "v1"}, {Key: k2, Value: "v2"}},
  151. &CountData{Value: 2},
  152. },
  153. },
  154. nil,
  155. },
  156. {v2, nil, someError},
  157. },
  158. },
  159. {
  160. label: "2",
  161. registrations: []*View{v1, v2},
  162. records: []float64{1, 1},
  163. wants: []want{
  164. {
  165. v1,
  166. []*Row{
  167. {
  168. []tag.Tag{{Key: k1, Value: "v1"}, {Key: k2, Value: "v2"}},
  169. &CountData{Value: 2},
  170. },
  171. },
  172. nil,
  173. },
  174. {
  175. v2,
  176. []*Row{
  177. {
  178. []tag.Tag{{Key: k1, Value: "v1"}, {Key: k2, Value: "v2"}},
  179. &CountData{Value: 2},
  180. },
  181. },
  182. nil,
  183. },
  184. },
  185. },
  186. }
  187. for _, tc := range tcs {
  188. for _, v := range tc.registrations {
  189. if err := Register(v); err != nil {
  190. t.Fatalf("%v: Register(%v) = %v; want no errors", tc.label, v.Name, err)
  191. }
  192. }
  193. for _, value := range tc.records {
  194. stats.Record(ctx, m.M(value))
  195. }
  196. for _, w := range tc.wants {
  197. gotRows, err := RetrieveData(w.v.Name)
  198. if (err != nil) != (w.err != nil) {
  199. t.Fatalf("%s: RetrieveData(%v) = %v; want error = %v", tc.label, w.v.Name, err, w.err)
  200. }
  201. for _, got := range gotRows {
  202. if !containsRow(w.rows, got) {
  203. t.Errorf("%s: got row %#v; want none", tc.label, got)
  204. break
  205. }
  206. }
  207. for _, want := range w.rows {
  208. if !containsRow(gotRows, want) {
  209. t.Errorf("%s: got none; want %#v'", tc.label, want)
  210. break
  211. }
  212. }
  213. }
  214. // Cleaning up.
  215. Unregister(tc.registrations...)
  216. }
  217. }
  218. func TestReportUsage(t *testing.T) {
  219. ctx := context.Background()
  220. m := stats.Int64("measure", "desc", "unit")
  221. tests := []struct {
  222. name string
  223. view *View
  224. wantMaxCount int64
  225. }{
  226. {
  227. name: "cum",
  228. view: &View{Name: "cum1", Measure: m, Aggregation: Count()},
  229. wantMaxCount: 8,
  230. },
  231. {
  232. name: "cum2",
  233. view: &View{Name: "cum1", Measure: m, Aggregation: Count()},
  234. wantMaxCount: 8,
  235. },
  236. }
  237. for _, tt := range tests {
  238. restart()
  239. SetReportingPeriod(25 * time.Millisecond)
  240. if err := Register(tt.view); err != nil {
  241. t.Fatalf("%v: cannot register: %v", tt.name, err)
  242. }
  243. e := &countExporter{}
  244. RegisterExporter(e)
  245. stats.Record(ctx, m.M(1))
  246. stats.Record(ctx, m.M(1))
  247. stats.Record(ctx, m.M(1))
  248. stats.Record(ctx, m.M(1))
  249. time.Sleep(50 * time.Millisecond)
  250. stats.Record(ctx, m.M(1))
  251. stats.Record(ctx, m.M(1))
  252. stats.Record(ctx, m.M(1))
  253. stats.Record(ctx, m.M(1))
  254. time.Sleep(50 * time.Millisecond)
  255. e.Lock()
  256. count := e.count
  257. e.Unlock()
  258. if got, want := count, tt.wantMaxCount; got > want {
  259. t.Errorf("%v: got count data = %v; want at most %v", tt.name, got, want)
  260. }
  261. }
  262. }
  263. func Test_SetReportingPeriodReqNeverBlocks(t *testing.T) {
  264. t.Parallel()
  265. worker := newWorker()
  266. durations := []time.Duration{-1, 0, 10, 100 * time.Millisecond}
  267. for i, duration := range durations {
  268. ackChan := make(chan bool, 1)
  269. cmd := &setReportingPeriodReq{c: ackChan, d: duration}
  270. cmd.handleCommand(worker)
  271. select {
  272. case <-ackChan:
  273. case <-time.After(500 * time.Millisecond): // Arbitrarily using 500ms as the timeout duration.
  274. t.Errorf("#%d: duration %v blocks", i, duration)
  275. }
  276. }
  277. }
  278. func TestWorkerStarttime(t *testing.T) {
  279. restart()
  280. ctx := context.Background()
  281. m := stats.Int64("measure/TestWorkerStarttime", "desc", "unit")
  282. v := &View{
  283. Name: "testview",
  284. Measure: m,
  285. Aggregation: Count(),
  286. }
  287. SetReportingPeriod(25 * time.Millisecond)
  288. if err := Register(v); err != nil {
  289. t.Fatalf("cannot register to %v: %v", v.Name, err)
  290. }
  291. e := &vdExporter{}
  292. RegisterExporter(e)
  293. defer UnregisterExporter(e)
  294. stats.Record(ctx, m.M(1))
  295. stats.Record(ctx, m.M(1))
  296. stats.Record(ctx, m.M(1))
  297. stats.Record(ctx, m.M(1))
  298. time.Sleep(50 * time.Millisecond)
  299. stats.Record(ctx, m.M(1))
  300. stats.Record(ctx, m.M(1))
  301. stats.Record(ctx, m.M(1))
  302. stats.Record(ctx, m.M(1))
  303. time.Sleep(50 * time.Millisecond)
  304. e.Lock()
  305. if len(e.vds) == 0 {
  306. t.Fatal("Got no view data; want at least one")
  307. }
  308. var start time.Time
  309. for _, vd := range e.vds {
  310. if start.IsZero() {
  311. start = vd.Start
  312. }
  313. if !vd.Start.Equal(start) {
  314. t.Errorf("Cumulative view data start time = %v; want %v", vd.Start, start)
  315. }
  316. }
  317. e.Unlock()
  318. }
  319. func TestUnregisterReportsUsage(t *testing.T) {
  320. restart()
  321. ctx := context.Background()
  322. m1 := stats.Int64("measure", "desc", "unit")
  323. view1 := &View{Name: "count", Measure: m1, Aggregation: Count()}
  324. m2 := stats.Int64("measure2", "desc", "unit")
  325. view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()}
  326. SetReportingPeriod(time.Hour)
  327. if err := Register(view1, view2); err != nil {
  328. t.Fatalf("cannot register: %v", err)
  329. }
  330. e := &countExporter{}
  331. RegisterExporter(e)
  332. stats.Record(ctx, m1.M(1))
  333. stats.Record(ctx, m2.M(1))
  334. stats.Record(ctx, m2.M(1))
  335. Unregister(view2)
  336. // Unregister should only flush view2, so expect the count of 2.
  337. want := int64(2)
  338. e.Lock()
  339. got := e.totalCount
  340. e.Unlock()
  341. if got != want {
  342. t.Errorf("got count data = %v; want %v", got, want)
  343. }
  344. }
  345. type countExporter struct {
  346. sync.Mutex
  347. count int64
  348. totalCount int64
  349. }
  350. func (e *countExporter) ExportView(vd *Data) {
  351. if len(vd.Rows) == 0 {
  352. return
  353. }
  354. d := vd.Rows[0].Data.(*CountData)
  355. e.Lock()
  356. defer e.Unlock()
  357. e.count = d.Value
  358. e.totalCount += d.Value
  359. }
  360. type vdExporter struct {
  361. sync.Mutex
  362. vds []*Data
  363. }
  364. func (e *vdExporter) ExportView(vd *Data) {
  365. e.Lock()
  366. defer e.Unlock()
  367. e.vds = append(e.vds, vd)
  368. }
  369. // restart stops the current processors and creates a new one.
  370. func restart() {
  371. defaultWorker.stop()
  372. defaultWorker = newWorker()
  373. go defaultWorker.start()
  374. }