You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

158 lines
6.2 KiB

  1. // Copyright 2018 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "context"
  17. "log"
  18. "sync"
  19. "go.opencensus.io/plugin/ocgrpc"
  20. "go.opencensus.io/stats"
  21. "go.opencensus.io/stats/view"
  22. "go.opencensus.io/tag"
  23. "google.golang.org/api/option"
  24. "google.golang.org/grpc"
  25. )
  26. func openCensusOptions() []option.ClientOption {
  27. return []option.ClientOption{
  28. option.WithGRPCDialOption(grpc.WithStatsHandler(&ocgrpc.ClientHandler{})),
  29. }
  30. }
  31. var subscriptionKey tag.Key
  32. func init() {
  33. var err error
  34. if subscriptionKey, err = tag.NewKey("subscription"); err != nil {
  35. log.Fatal("cannot create 'subscription' key")
  36. }
  37. }
  38. const statsPrefix = "cloud.google.com/go/pubsub/"
  39. var (
  40. // PullCount is a measure of the number of messages pulled.
  41. // It is EXPERIMENTAL and subject to change or removal without notice.
  42. PullCount = stats.Int64(statsPrefix+"pull_count", "Number of PubSub messages pulled", stats.UnitDimensionless)
  43. // AckCount is a measure of the number of messages acked.
  44. // It is EXPERIMENTAL and subject to change or removal without notice.
  45. AckCount = stats.Int64(statsPrefix+"ack_count", "Number of PubSub messages acked", stats.UnitDimensionless)
  46. // NackCount is a measure of the number of messages nacked.
  47. // It is EXPERIMENTAL and subject to change or removal without notice.
  48. NackCount = stats.Int64(statsPrefix+"nack_count", "Number of PubSub messages nacked", stats.UnitDimensionless)
  49. // ModAckCount is a measure of the number of messages whose ack-deadline was modified.
  50. // It is EXPERIMENTAL and subject to change or removal without notice.
  51. ModAckCount = stats.Int64(statsPrefix+"mod_ack_count", "Number of ack-deadlines modified", stats.UnitDimensionless)
  52. // ModAckTimeoutCount is a measure of the number ModifyAckDeadline RPCs that timed out.
  53. // It is EXPERIMENTAL and subject to change or removal without notice.
  54. ModAckTimeoutCount = stats.Int64(statsPrefix+"mod_ack_timeout_count", "Number of ModifyAckDeadline RPCs that timed out", stats.UnitDimensionless)
  55. // StreamOpenCount is a measure of the number of times a streaming-pull stream was opened.
  56. // It is EXPERIMENTAL and subject to change or removal without notice.
  57. StreamOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of calls opening a new streaming pull", stats.UnitDimensionless)
  58. // StreamRetryCount is a measure of the number of times a streaming-pull operation was retried.
  59. // It is EXPERIMENTAL and subject to change or removal without notice.
  60. StreamRetryCount = stats.Int64(statsPrefix+"stream_retry_count", "Number of retries of a stream send or receive", stats.UnitDimensionless)
  61. // StreamRequestCount is a measure of the number of requests sent on a streaming-pull stream.
  62. // It is EXPERIMENTAL and subject to change or removal without notice.
  63. StreamRequestCount = stats.Int64(statsPrefix+"stream_request_count", "Number gRPC StreamingPull request messages sent", stats.UnitDimensionless)
  64. // StreamResponseCount is a measure of the number of responses received on a streaming-pull stream.
  65. // It is EXPERIMENTAL and subject to change or removal without notice.
  66. StreamResponseCount = stats.Int64(statsPrefix+"stream_response_count", "Number of gRPC StreamingPull response messages received", stats.UnitDimensionless)
  67. // PullCountView is a cumulative sum of PullCount.
  68. // It is EXPERIMENTAL and subject to change or removal without notice.
  69. PullCountView *view.View
  70. // AckCountView is a cumulative sum of AckCount.
  71. // It is EXPERIMENTAL and subject to change or removal without notice.
  72. AckCountView *view.View
  73. // NackCountView is a cumulative sum of NackCount.
  74. // It is EXPERIMENTAL and subject to change or removal without notice.
  75. NackCountView *view.View
  76. // ModAckCountView is a cumulative sum of ModAckCount.
  77. // It is EXPERIMENTAL and subject to change or removal without notice.
  78. ModAckCountView *view.View
  79. // ModAckTimeoutCountView is a cumulative sum of ModAckTimeoutCount.
  80. // It is EXPERIMENTAL and subject to change or removal without notice.
  81. ModAckTimeoutCountView *view.View
  82. // StreamOpenCountView is a cumulative sum of StreamOpenCount.
  83. // It is EXPERIMENTAL and subject to change or removal without notice.
  84. StreamOpenCountView *view.View
  85. // StreamRetryCountView is a cumulative sum of StreamRetryCount.
  86. // It is EXPERIMENTAL and subject to change or removal without notice.
  87. StreamRetryCountView *view.View
  88. // StreamRequestCountView is a cumulative sum of StreamRequestCount.
  89. // It is EXPERIMENTAL and subject to change or removal without notice.
  90. StreamRequestCountView *view.View
  91. // StreamResponseCountView is a cumulative sum of StreamResponseCount.
  92. // It is EXPERIMENTAL and subject to change or removal without notice.
  93. StreamResponseCountView *view.View
  94. )
  95. func init() {
  96. PullCountView = countView(PullCount)
  97. AckCountView = countView(AckCount)
  98. NackCountView = countView(NackCount)
  99. ModAckCountView = countView(ModAckCount)
  100. ModAckTimeoutCountView = countView(ModAckTimeoutCount)
  101. StreamOpenCountView = countView(StreamOpenCount)
  102. StreamRetryCountView = countView(StreamRetryCount)
  103. StreamRequestCountView = countView(StreamRequestCount)
  104. StreamResponseCountView = countView(StreamResponseCount)
  105. }
  106. func countView(m *stats.Int64Measure) *view.View {
  107. return &view.View{
  108. Name: m.Name(),
  109. Description: m.Description(),
  110. TagKeys: []tag.Key{subscriptionKey},
  111. Measure: m,
  112. Aggregation: view.Sum(),
  113. }
  114. }
  115. var logOnce sync.Once
  116. func withSubscriptionKey(ctx context.Context, subName string) context.Context {
  117. ctx, err := tag.New(ctx, tag.Upsert(subscriptionKey, subName))
  118. if err != nil {
  119. logOnce.Do(func() {
  120. log.Printf("pubsub: error creating tag map: %v", err)
  121. })
  122. }
  123. return ctx
  124. }
  125. func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) {
  126. stats.Record(ctx, m.M(n))
  127. }