|
- use std::collections::HashSet;
- use prometheus::{IntGaugeVec, Opts, Registry};
- use redis::Commands;
- use crate::client_manager::ClientManager;
- use crate::metrics::MetricsProvider;
- use crate::models::TrackerInfo;
-
- pub struct QueueMetrics {
- registry: Registry,
- queue_size: IntGaugeVec,
- }
-
- impl QueueMetrics {
- pub fn new() -> color_eyre::Result<Self> {
- let registry = Registry::new_custom(Some("at_tracker".to_string()), None)?;
-
- let s = Self {
- registry,
- queue_size: IntGaugeVec::new(Opts::new("queue_size", "queue_size"), &["project", "queue"])?,
- };
-
- s.registry.register(Box::new(s.queue_size.clone()))?;
-
- Ok(s)
- }
- }
-
- impl MetricsProvider for QueueMetrics {
- fn get_registry(&self) -> color_eyre::Result<&Registry> {
- Ok(&self.registry)
- }
-
- fn update(&self, client_mgr: &ClientManager, project: &str, info: &TrackerInfo) -> color_eyre::Result<()> {
- let mut con = client_mgr.get_connection_from_tracker(info)?;
-
- // Default queue set.
- let mut queues: HashSet<String> = HashSet::from([
- "todo".to_string(),
- "todo:redo".to_string(),
- "todo:backfeed".to_string(),
- "unretrievable".to_string(),
- ]);
-
- // Read the queues from the offloaded key.
- let offloaded_key = format!("{project}:offloaded");
- let offloaded_keys: Vec<String> = con.hkeys(&offloaded_key)?;
-
- // Remove queues that need special handling.
- queues.extend(offloaded_keys.into_iter().filter(|v| (v != "done") && (v != "claims")));
-
- for queue in queues {
- let k = format!("{project}:{queue}");
- let c: Option<i64> = con.scard(k)?;
- if let Some(a) = c {
- let d: Option<i64> = con.hget(&offloaded_key, &queue)?;
- let a = a + d.unwrap_or(0);
- let m = self.queue_size.get_metric_with_label_values(&[project, queue.as_str()])?;
- m.set(a);
- }
- }
-
- // Handle done queue.
- {
- let c: Option<i64> = con.get(format!("{project}:done_counter"))?;
- if let Some(a) = c {
- let m = self.queue_size.get_metric_with_label_values(&[project, "done"])?;
- m.set(a);
- }
- }
-
- // Handle claims queue.
- {
- let c: Option<i64> = con.hlen(format!("{project}:claims"))?;
- if let Some(a) = c {
- let m = self.queue_size.get_metric_with_label_values(&[project, "claims"])?;
- m.set(a);
- }
- }
- Ok(())
- }
-
- fn get_metric_type_name(&self) -> &str {
- "queue"
- }
- }
|