Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

85 рядки
2.6 KiB

  1. use std::collections::HashSet;
  2. use prometheus::{IntGaugeVec, Opts, Registry};
  3. use redis::Commands;
  4. use crate::client_manager::ClientManager;
  5. use crate::metrics::MetricsProvider;
  6. use crate::models::TrackerInfo;
  7. pub struct QueueMetrics {
  8. registry: Registry,
  9. queue_size: IntGaugeVec,
  10. }
  11. impl QueueMetrics {
  12. pub fn new() -> color_eyre::Result<Self> {
  13. let registry = Registry::new_custom(Some("at_tracker".to_string()), None)?;
  14. let s = Self {
  15. registry,
  16. queue_size: IntGaugeVec::new(Opts::new("queue_size", "queue_size"), &["project", "queue"])?,
  17. };
  18. s.registry.register(Box::new(s.queue_size.clone()))?;
  19. Ok(s)
  20. }
  21. }
  22. impl MetricsProvider for QueueMetrics {
  23. fn get_registry(&self) -> color_eyre::Result<&Registry> {
  24. Ok(&self.registry)
  25. }
  26. fn update(&self, client_mgr: &ClientManager, project: &str, info: &TrackerInfo) -> color_eyre::Result<()> {
  27. let mut con = client_mgr.get_connection_from_tracker(info)?;
  28. // Default queue set.
  29. let mut queues: HashSet<String> = HashSet::from([
  30. "todo".to_string(),
  31. "todo:redo".to_string(),
  32. "todo:backfeed".to_string(),
  33. "unretrievable".to_string(),
  34. ]);
  35. // Read the queues from the offloaded key.
  36. let offloaded_key = format!("{project}:offloaded");
  37. let offloaded_keys: Vec<String> = con.hkeys(&offloaded_key)?;
  38. // Remove queues that need special handling.
  39. queues.extend(offloaded_keys.into_iter().filter(|v| (v != "done") && (v != "claims")));
  40. for queue in queues {
  41. let k = format!("{project}:{queue}");
  42. let c: Option<i64> = con.scard(k)?;
  43. if let Some(a) = c {
  44. let d: Option<i64> = con.hget(&offloaded_key, &queue)?;
  45. let a = a + d.unwrap_or(0);
  46. let m = self.queue_size.get_metric_with_label_values(&[project, queue.as_str()])?;
  47. m.set(a);
  48. }
  49. }
  50. // Handle done queue.
  51. {
  52. let c: Option<i64> = con.get(format!("{project}:done_counter"))?;
  53. if let Some(a) = c {
  54. let m = self.queue_size.get_metric_with_label_values(&[project, "done"])?;
  55. m.set(a);
  56. }
  57. }
  58. // Handle claims queue.
  59. {
  60. let c: Option<i64> = con.hlen(format!("{project}:claims"))?;
  61. if let Some(a) = c {
  62. let m = self.queue_size.get_metric_with_label_values(&[project, "claims"])?;
  63. m.set(a);
  64. }
  65. }
  66. Ok(())
  67. }
  68. fn get_metric_type_name(&self) -> &str {
  69. "queue"
  70. }
  71. }