Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.
 
 

143 Zeilen
5.3 KiB

  1. use std::collections::HashMap;
  2. use std::sync::Arc;
  3. use std::thread::JoinHandle;
  4. use std::time::SystemTime;
  5. use clap::Parser;
  6. use prometheus::push_metrics;
  7. use skedge::{every, Interval, Scheduler};
  8. use tracing::{info, warn};
  9. use rayon::prelude::*;
  10. use crate::args::Args;
  11. use crate::client_manager::ClientManager;
  12. use crate::metrics::MetricsProvider;
  13. use crate::metrics::downloader_metrics::DownloaderMetrics;
  14. use crate::metrics::project_metrics::ProjectMetrics;
  15. use crate::metrics::queue_metrics::QueueMetrics;
  16. use crate::tracker_manager::TrackerInfoManager;
  17. pub mod models;
  18. pub mod args;
  19. pub mod client_manager;
  20. pub mod tracker_manager;
  21. pub mod helpers;
  22. pub mod metrics;
  23. fn push_process_metrics_job(url: String) {
  24. let mfs = prometheus::gather();
  25. if let Err(e) = push_metrics("tracker-redis-exporter", HashMap::from([("instance".to_string(), "process".to_string())]), &url, mfs, None) {
  26. warn!("Unable to push process metrics: {e}")
  27. }
  28. }
  29. fn metric_refresh_job<T: MetricsProvider>(client_mgr: Arc<ClientManager>, project_mgr: Arc<TrackerInfoManager>, url: String, provider: Arc<T>) {
  30. let start = SystemTime::now();
  31. project_mgr.enabled_projects().into_par_iter().for_each({
  32. let client_mgr = client_mgr.clone();
  33. let provider = provider.clone();
  34. move |(name, info)| {
  35. info!("Refresing {} metrics for {name}...", provider.get_metric_type_name());
  36. if let Err(e) = provider.update(&client_mgr, &name, &info) {
  37. warn!("Unable to refresh {} metrics: {e}", provider.get_metric_type_name())
  38. }
  39. }
  40. }
  41. );
  42. let mfs = provider.get_registry().unwrap().gather();
  43. info!("Pushing {} metric families for {}.", mfs.len(), provider.get_metric_type_name());
  44. if let Err(e) = push_metrics("tracker-redis-exporter", HashMap::from([("instance".to_string(), provider.get_metric_type_name().to_string())]), &url, mfs, None) {
  45. warn!("Unable to push {} metrics: {e}", provider.get_metric_type_name())
  46. }
  47. let delta = SystemTime::now().duration_since(start).unwrap().as_secs_f32();
  48. info!("Refreshed {} in {delta:.04} seconds.", provider.get_metric_type_name());
  49. }
  50. fn projects_refresh_job(client_mgr: Arc<ClientManager>, project_mgr: Arc<TrackerInfoManager>) {
  51. info!("Refreshing project list...");
  52. if let Err(e) = project_mgr.refresh_trackers(&client_mgr) {
  53. warn!("Unable to refresh projects: {e}")
  54. }
  55. info!("Updated project list. Updating active list in background...");
  56. rayon::spawn(move || {
  57. if let Err(e) = project_mgr.refresh_active_trackers(&client_mgr) {
  58. warn!("Unable to refresh active projects: {e}")
  59. }
  60. });
  61. }
  62. fn perform_metrics_job<T: MetricsProvider + 'static>(interval: usize, client_mgr: Arc<ClientManager>, project_mgr: Arc<TrackerInfoManager>, p_url: String, provider: Arc<T>) {
  63. let mut schedule = Scheduler::new();
  64. metric_refresh_job(client_mgr.clone(), project_mgr.clone(), p_url.clone(), provider.clone());
  65. every(interval as Interval)
  66. .seconds().unwrap()
  67. .run_four_args(&mut schedule, metric_refresh_job, client_mgr, project_mgr, p_url, provider).unwrap();
  68. loop {
  69. if let Err(e) = schedule.run_pending() {
  70. warn!("Error: {e}");
  71. }
  72. std::thread::sleep(std::time::Duration::from_secs(1));
  73. }
  74. }
  75. fn spawn_metrics_job<T: MetricsProvider + 'static>(interval: usize, client_mgr: Arc<ClientManager>, project_mgr: Arc<TrackerInfoManager>, p_url: String, provider: Arc<T>) -> JoinHandle<()> {
  76. std::thread::spawn(move || {
  77. perform_metrics_job(interval, client_mgr, project_mgr, p_url, provider);
  78. })
  79. }
  80. pub fn main() -> color_eyre::Result<()> {
  81. let subscriber = tracing_subscriber::FmtSubscriber::new();
  82. tracing::subscriber::set_global_default(subscriber)?;
  83. color_eyre::install()?;
  84. let args = Args::parse();
  85. let client_mgr = Arc::new(ClientManager::new(args.get_master_redis_info()));
  86. let project_mgr = Arc::new(TrackerInfoManager::new(&client_mgr, args.included_projects.clone(), args.excluded_projects.clone(), args.projects_inactive_minutes)?);
  87. let project_metrics = Arc::new(ProjectMetrics::new()?);
  88. let queue_metrics = Arc::new(QueueMetrics::new()?);
  89. let downloader_metrics = Arc::new(DownloaderMetrics::new()?);
  90. let p_url = args.push_url.to_string();
  91. let mut schedule = Scheduler::new();
  92. every(args.projects_refresh_interval as Interval)
  93. .seconds()?
  94. .run_two_args(&mut schedule, projects_refresh_job, client_mgr.clone(), project_mgr.clone())?;
  95. every(10)
  96. .seconds()?
  97. .run_one_arg(&mut schedule, push_process_metrics_job, p_url.clone())?;
  98. spawn_metrics_job(
  99. args.project_metrics_refresh_interval,
  100. client_mgr.clone(),
  101. project_mgr.clone(),
  102. p_url.clone(),
  103. project_metrics.clone());
  104. spawn_metrics_job(
  105. args.queue_metrics_refresh_interval,
  106. client_mgr.clone(),
  107. project_mgr.clone(),
  108. p_url.clone(),
  109. queue_metrics.clone());
  110. spawn_metrics_job(
  111. args.downloader_metrics_refresh_interval,
  112. client_mgr.clone(),
  113. project_mgr.clone(),
  114. p_url.clone(),
  115. downloader_metrics.clone());
  116. loop {
  117. if let Err(e) = schedule.run_pending() {
  118. warn!("Error: {e}");
  119. }
  120. std::thread::sleep(std::time::Duration::from_secs(1));
  121. }
  122. }