|
- use std::collections::HashMap;
- use std::sync::Arc;
- use std::thread::JoinHandle;
- use std::time::SystemTime;
- use clap::Parser;
- use prometheus::push_metrics;
- use skedge::{every, Interval, Scheduler};
- use tracing::{info, warn};
- use rayon::prelude::*;
- use crate::args::Args;
- use crate::client_manager::ClientManager;
- use crate::metrics::MetricsProvider;
- use crate::metrics::downloader_metrics::DownloaderMetrics;
- use crate::metrics::project_metrics::ProjectMetrics;
- use crate::metrics::queue_metrics::QueueMetrics;
- use crate::tracker_manager::TrackerInfoManager;
-
- pub mod models;
- pub mod args;
- pub mod client_manager;
- pub mod tracker_manager;
- pub mod helpers;
- pub mod metrics;
-
- fn push_process_metrics_job(url: String) {
- let mfs = prometheus::gather();
- if let Err(e) = push_metrics("tracker-redis-exporter", HashMap::from([("instance".to_string(), "process".to_string())]), &url, mfs, None) {
- warn!("Unable to push process metrics: {e}")
- }
- }
-
- fn metric_refresh_job<T: MetricsProvider>(client_mgr: Arc<ClientManager>, project_mgr: Arc<TrackerInfoManager>, url: String, provider: Arc<T>) {
- let start = SystemTime::now();
- project_mgr.enabled_projects().into_par_iter().for_each({
- let client_mgr = client_mgr.clone();
- let provider = provider.clone();
-
- move |(name, info)| {
- info!("Refresing {} metrics for {name}...", provider.get_metric_type_name());
- if let Err(e) = provider.update(&client_mgr, &name, &info) {
- warn!("Unable to refresh {} metrics: {e}", provider.get_metric_type_name())
- }
- }
- }
- );
- let mfs = provider.get_registry().unwrap().gather();
- info!("Pushing {} metric families for {}.", mfs.len(), provider.get_metric_type_name());
- if let Err(e) = push_metrics("tracker-redis-exporter", HashMap::from([("instance".to_string(), provider.get_metric_type_name().to_string())]), &url, mfs, None) {
- warn!("Unable to push {} metrics: {e}", provider.get_metric_type_name())
- }
- let delta = SystemTime::now().duration_since(start).unwrap().as_secs_f32();
- info!("Refreshed {} in {delta:.04} seconds.", provider.get_metric_type_name());
- }
-
- fn projects_refresh_job(client_mgr: Arc<ClientManager>, project_mgr: Arc<TrackerInfoManager>) {
- info!("Refreshing project list...");
- if let Err(e) = project_mgr.refresh_trackers(&client_mgr) {
- warn!("Unable to refresh projects: {e}")
- }
- info!("Updated project list. Updating active list in background...");
- rayon::spawn(move || {
- if let Err(e) = project_mgr.refresh_active_trackers(&client_mgr) {
- warn!("Unable to refresh active projects: {e}")
- }
- });
- }
-
- fn perform_metrics_job<T: MetricsProvider + 'static>(interval: usize, client_mgr: Arc<ClientManager>, project_mgr: Arc<TrackerInfoManager>, p_url: String, provider: Arc<T>) {
- let mut schedule = Scheduler::new();
-
- metric_refresh_job(client_mgr.clone(), project_mgr.clone(), p_url.clone(), provider.clone());
- every(interval as Interval)
- .seconds().unwrap()
- .run_four_args(&mut schedule, metric_refresh_job, client_mgr, project_mgr, p_url, provider).unwrap();
-
- loop {
- if let Err(e) = schedule.run_pending() {
- warn!("Error: {e}");
- }
- std::thread::sleep(std::time::Duration::from_secs(1));
- }
- }
-
- fn spawn_metrics_job<T: MetricsProvider + 'static>(interval: usize, client_mgr: Arc<ClientManager>, project_mgr: Arc<TrackerInfoManager>, p_url: String, provider: Arc<T>) -> JoinHandle<()> {
- std::thread::spawn(move || {
- perform_metrics_job(interval, client_mgr, project_mgr, p_url, provider);
- })
- }
-
-
- pub fn main() -> color_eyre::Result<()> {
- let subscriber = tracing_subscriber::FmtSubscriber::new();
- tracing::subscriber::set_global_default(subscriber)?;
- color_eyre::install()?;
-
- let args = Args::parse();
-
- let client_mgr = Arc::new(ClientManager::new(args.get_master_redis_info()));
- let project_mgr = Arc::new(TrackerInfoManager::new(&client_mgr, args.included_projects.clone(), args.excluded_projects.clone(), args.projects_inactive_minutes)?);
- let project_metrics = Arc::new(ProjectMetrics::new()?);
- let queue_metrics = Arc::new(QueueMetrics::new()?);
- let downloader_metrics = Arc::new(DownloaderMetrics::new()?);
- let p_url = args.push_url.to_string();
-
- let mut schedule = Scheduler::new();
-
- every(args.projects_refresh_interval as Interval)
- .seconds()?
- .run_two_args(&mut schedule, projects_refresh_job, client_mgr.clone(), project_mgr.clone())?;
-
- every(10)
- .seconds()?
- .run_one_arg(&mut schedule, push_process_metrics_job, p_url.clone())?;
-
- spawn_metrics_job(
- args.project_metrics_refresh_interval,
- client_mgr.clone(),
- project_mgr.clone(),
- p_url.clone(),
- project_metrics.clone());
-
- spawn_metrics_job(
- args.queue_metrics_refresh_interval,
- client_mgr.clone(),
- project_mgr.clone(),
- p_url.clone(),
- queue_metrics.clone());
-
- spawn_metrics_job(
- args.downloader_metrics_refresh_interval,
- client_mgr.clone(),
- project_mgr.clone(),
- p_url.clone(),
- downloader_metrics.clone());
-
- loop {
- if let Err(e) = schedule.run_pending() {
- warn!("Error: {e}");
- }
- std::thread::sleep(std::time::Duration::from_secs(1));
- }
- }
|