You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

73 lines
1.5 KiB

  1. // Copyright 2018 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // +build psdebug
  15. package pubsub
  16. import (
  17. "sync"
  18. "time"
  19. )
  20. var (
  21. dmu sync.Mutex
  22. msgTraces = map[string][]Event{}
  23. ackIDToMsgID = map[string]string{}
  24. )
  25. type Event struct {
  26. Desc string
  27. At time.Time
  28. }
  29. func MessageEvents(msgID string) []Event {
  30. dmu.Lock()
  31. defer dmu.Unlock()
  32. return msgTraces[msgID]
  33. }
  34. func addRecv(msgID, ackID string, t time.Time) {
  35. dmu.Lock()
  36. defer dmu.Unlock()
  37. ackIDToMsgID[ackID] = msgID
  38. addEvent(msgID, "recv", t)
  39. }
  40. func addAcks(ackIDs []string) {
  41. dmu.Lock()
  42. defer dmu.Unlock()
  43. now := time.Now()
  44. for _, id := range ackIDs {
  45. addEvent(ackIDToMsgID[id], "ack", now)
  46. }
  47. }
  48. func addModAcks(ackIDs []string, deadlineSecs int32) {
  49. dmu.Lock()
  50. defer dmu.Unlock()
  51. desc := "modack"
  52. if deadlineSecs == 0 {
  53. desc = "nack"
  54. }
  55. now := time.Now()
  56. for _, id := range ackIDs {
  57. addEvent(ackIDToMsgID[id], desc, now)
  58. }
  59. }
  60. func addEvent(msgID, desc string, t time.Time) {
  61. msgTraces[msgID] = append(msgTraces[msgID], Event{desc, t})
  62. }