use std::collections::HashMap; use std::sync::Arc; use std::thread::JoinHandle; use std::time::SystemTime; use clap::Parser; use prometheus::push_metrics; use skedge::{every, Interval, Scheduler}; use tracing::{info, warn}; use rayon::prelude::*; use crate::args::Args; use crate::client_manager::ClientManager; use crate::metrics::MetricsProvider; use crate::metrics::downloader_metrics::DownloaderMetrics; use crate::metrics::project_metrics::ProjectMetrics; use crate::metrics::queue_metrics::QueueMetrics; use crate::tracker_manager::TrackerInfoManager; pub mod models; pub mod args; pub mod client_manager; pub mod tracker_manager; pub mod helpers; pub mod metrics; fn push_process_metrics_job(url: String) { let mfs = prometheus::gather(); if let Err(e) = push_metrics("tracker-redis-exporter", HashMap::from([("instance".to_string(), "process".to_string())]), &url, mfs, None) { warn!("Unable to push process metrics: {e}") } } fn metric_refresh_job(client_mgr: Arc, project_mgr: Arc, url: String, provider: Arc) { let start = SystemTime::now(); project_mgr.enabled_projects().into_par_iter().for_each({ let client_mgr = client_mgr.clone(); let provider = provider.clone(); move |(name, info)| { info!("Refresing {} metrics for {name}...", provider.get_metric_type_name()); if let Err(e) = provider.update(&client_mgr, &name, &info) { warn!("Unable to refresh {} metrics: {e}", provider.get_metric_type_name()) } } } ); let mfs = provider.get_registry().unwrap().gather(); info!("Pushing {} metric families for {}.", mfs.len(), provider.get_metric_type_name()); if let Err(e) = push_metrics("tracker-redis-exporter", HashMap::from([("instance".to_string(), provider.get_metric_type_name().to_string())]), &url, mfs, None) { warn!("Unable to push {} metrics: {e}", provider.get_metric_type_name()) } let delta = SystemTime::now().duration_since(start).unwrap().as_secs_f32(); info!("Refreshed {} in {delta:.04} seconds.", provider.get_metric_type_name()); } fn projects_refresh_job(client_mgr: Arc, project_mgr: Arc) { info!("Refreshing project list..."); if let Err(e) = project_mgr.refresh_trackers(&client_mgr) { warn!("Unable to refresh projects: {e}") } info!("Updated project list. Updating active list in background..."); rayon::spawn(move || { if let Err(e) = project_mgr.refresh_active_trackers(&client_mgr) { warn!("Unable to refresh active projects: {e}") } }); } fn perform_metrics_job(interval: usize, client_mgr: Arc, project_mgr: Arc, p_url: String, provider: Arc) { let mut schedule = Scheduler::new(); metric_refresh_job(client_mgr.clone(), project_mgr.clone(), p_url.clone(), provider.clone()); every(interval as Interval) .seconds().unwrap() .run_four_args(&mut schedule, metric_refresh_job, client_mgr, project_mgr, p_url, provider).unwrap(); loop { if let Err(e) = schedule.run_pending() { warn!("Error: {e}"); } std::thread::sleep(std::time::Duration::from_secs(1)); } } fn spawn_metrics_job(interval: usize, client_mgr: Arc, project_mgr: Arc, p_url: String, provider: Arc) -> JoinHandle<()> { std::thread::spawn(move || { perform_metrics_job(interval, client_mgr, project_mgr, p_url, provider); }) } pub fn main() -> color_eyre::Result<()> { let subscriber = tracing_subscriber::FmtSubscriber::new(); tracing::subscriber::set_global_default(subscriber)?; color_eyre::install()?; let args = Args::parse(); let client_mgr = Arc::new(ClientManager::new(args.get_master_redis_info())); let project_mgr = Arc::new(TrackerInfoManager::new(&client_mgr, args.included_projects.clone(), args.excluded_projects.clone(), args.projects_inactive_minutes)?); let project_metrics = Arc::new(ProjectMetrics::new()?); let queue_metrics = Arc::new(QueueMetrics::new()?); let downloader_metrics = Arc::new(DownloaderMetrics::new()?); let p_url = args.push_url.to_string(); let mut schedule = Scheduler::new(); every(args.projects_refresh_interval as Interval) .seconds()? .run_two_args(&mut schedule, projects_refresh_job, client_mgr.clone(), project_mgr.clone())?; every(10) .seconds()? .run_one_arg(&mut schedule, push_process_metrics_job, p_url.clone())?; spawn_metrics_job( args.project_metrics_refresh_interval, client_mgr.clone(), project_mgr.clone(), p_url.clone(), project_metrics.clone()); spawn_metrics_job( args.queue_metrics_refresh_interval, client_mgr.clone(), project_mgr.clone(), p_url.clone(), queue_metrics.clone()); spawn_metrics_job( args.downloader_metrics_refresh_interval, client_mgr.clone(), project_mgr.clone(), p_url.clone(), downloader_metrics.clone()); loop { if let Err(e) = schedule.run_pending() { warn!("Error: {e}"); } std::thread::sleep(std::time::Duration::from_secs(1)); } }