You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

132 lines
4.9 KiB

  1. use std::collections::HashMap;
  2. use std::sync::Arc;
  3. use std::thread::JoinHandle;
  4. use std::time::SystemTime;
  5. use clap::Parser;
  6. use prometheus::push_metrics;
  7. use skedge::{every, Interval, Scheduler};
  8. use tracing::{info, warn};
  9. use rayon::prelude::*;
  10. use crate::args::Args;
  11. use crate::client_manager::ClientManager;
  12. use crate::metrics::MetricsProvider;
  13. use crate::metrics::downloader_metrics::DownloaderMetrics;
  14. use crate::metrics::project_metrics::ProjectMetrics;
  15. use crate::metrics::queue_metrics::QueueMetrics;
  16. use crate::tracker_manager::TrackerInfoManager;
  17. pub mod models;
  18. pub mod args;
  19. pub mod client_manager;
  20. pub mod tracker_manager;
  21. pub mod helpers;
  22. pub mod metrics;
  23. fn metric_refresh_job<T: MetricsProvider>(client_mgr: Arc<ClientManager>, project_mgr: Arc<TrackerInfoManager>, url: String, provider: Arc<T>) {
  24. let start = SystemTime::now();
  25. project_mgr.enabled_projects().into_par_iter().for_each({
  26. let client_mgr = client_mgr.clone();
  27. let provider = provider.clone();
  28. move |(name, info)| {
  29. info!("Refresing {} metrics for {name}...", provider.get_metric_type_name());
  30. if let Err(e) = provider.update(&client_mgr, &name, &info) {
  31. warn!("Unable to refresh {} metrics: {e}", provider.get_metric_type_name())
  32. }
  33. }
  34. }
  35. );
  36. let mfs = provider.get_registry().unwrap().gather();
  37. info!("Pushing {} metric families for {}.", mfs.len(), provider.get_metric_type_name());
  38. if let Err(e) = push_metrics("tracker-redis-exporter", HashMap::from([("instance".to_string(), provider.get_metric_type_name().to_string())]), &url, mfs, None) {
  39. warn!("Unable to push {} metrics: {e}", provider.get_metric_type_name())
  40. }
  41. let delta = SystemTime::now().duration_since(start).unwrap().as_secs_f32();
  42. info!("Refreshed {} in {delta:.04} seconds.", provider.get_metric_type_name());
  43. }
  44. fn projects_refresh_job(client_mgr: Arc<ClientManager>, project_mgr: Arc<TrackerInfoManager>) {
  45. info!("Refreshing project list...");
  46. if let Err(e) = project_mgr.refresh_trackers(&client_mgr) {
  47. warn!("Unable to refresh projects: {e}")
  48. }
  49. info!("Updated project list. Updating active list in background...");
  50. rayon::spawn(move || {
  51. if let Err(e) = project_mgr.refresh_active_trackers(&client_mgr) {
  52. warn!("Unable to refresh active projects: {e}")
  53. }
  54. });
  55. }
  56. fn perform_metrics_job<T: MetricsProvider + 'static>(interval: usize, client_mgr: Arc<ClientManager>, project_mgr: Arc<TrackerInfoManager>, p_url: String, provider: Arc<T>) {
  57. let mut schedule = Scheduler::new();
  58. metric_refresh_job(client_mgr.clone(), project_mgr.clone(), p_url.clone(), provider.clone());
  59. every(interval as Interval)
  60. .seconds().unwrap()
  61. .run_four_args(&mut schedule, metric_refresh_job, client_mgr, project_mgr, p_url, provider).unwrap();
  62. loop {
  63. if let Err(e) = schedule.run_pending() {
  64. warn!("Error: {e}");
  65. }
  66. std::thread::sleep(std::time::Duration::from_secs(1));
  67. }
  68. }
  69. fn spawn_metrics_job<T: MetricsProvider + 'static>(interval: usize, client_mgr: Arc<ClientManager>, project_mgr: Arc<TrackerInfoManager>, p_url: String, provider: Arc<T>) -> JoinHandle<()> {
  70. std::thread::spawn(move || {
  71. perform_metrics_job(interval, client_mgr, project_mgr, p_url, provider);
  72. })
  73. }
  74. pub fn main() -> color_eyre::Result<()> {
  75. let subscriber = tracing_subscriber::FmtSubscriber::new();
  76. tracing::subscriber::set_global_default(subscriber)?;
  77. color_eyre::install()?;
  78. let args = Args::parse();
  79. let client_mgr = Arc::new(ClientManager::new(args.get_master_redis_info()));
  80. let project_mgr = Arc::new(TrackerInfoManager::new(&client_mgr, args.included_projects.clone(), args.excluded_projects.clone(), args.projects_inactive_minutes)?);
  81. let project_metrics = Arc::new(ProjectMetrics::new()?);
  82. let queue_metrics = Arc::new(QueueMetrics::new()?);
  83. let downloader_metrics = Arc::new(DownloaderMetrics::new()?);
  84. let p_url = args.push_url.to_string();
  85. let mut schedule = Scheduler::new();
  86. every(args.projects_refresh_interval as Interval)
  87. .seconds()?
  88. .run_two_args(&mut schedule, projects_refresh_job, client_mgr.clone(), project_mgr.clone())?;
  89. spawn_metrics_job(
  90. args.project_metrics_refresh_interval,
  91. client_mgr.clone(),
  92. project_mgr.clone(),
  93. p_url.clone(),
  94. project_metrics.clone());
  95. spawn_metrics_job(
  96. args.queue_metrics_refresh_interval,
  97. client_mgr.clone(),
  98. project_mgr.clone(),
  99. p_url.clone(),
  100. queue_metrics.clone());
  101. spawn_metrics_job(
  102. args.downloader_metrics_refresh_interval,
  103. client_mgr.clone(),
  104. project_mgr.clone(),
  105. p_url.clone(),
  106. downloader_metrics.clone());
  107. loop {
  108. if let Err(e) = schedule.run_pending() {
  109. warn!("Error: {e}");
  110. }
  111. std::thread::sleep(std::time::Duration::from_secs(1));
  112. }
  113. }