@@ -17,9 +17,12 @@ pub struct Args { | |||
#[arg(short, long)] | |||
pub excluded_projects: Vec<Regex>, | |||
#[arg(long, default_value = "60")] | |||
#[arg(long, default_value = "600")] | |||
pub projects_refresh_interval: usize, | |||
#[arg(long, default_value = "5")] | |||
pub projects_inactive_minutes: u64, | |||
#[arg(long, default_value = "60")] | |||
pub downloader_metrics_refresh_interval: usize, | |||
@@ -1,7 +1,8 @@ | |||
use std::collections::BTreeMap; | |||
use std::collections::{BTreeMap, BTreeSet}; | |||
use std::time::SystemTime; | |||
use redis::Commands; | |||
use tracing::warn; | |||
use tracing::{info, warn}; | |||
use rayon::prelude::*; | |||
use crate::client_manager::ClientManager; | |||
use crate::models::{TrackerInfo, TrackerMap}; | |||
@@ -26,6 +27,28 @@ pub fn get_trackers(client_mgr: &ClientManager) -> color_eyre::Result<TrackerMap | |||
Ok(trackers) | |||
} | |||
pub fn determine_active_trackers(client_mgr: &ClientManager, trackers: &TrackerMap, inactive_minutes: u64) -> color_eyre::Result<BTreeSet<String>> { | |||
let start = SystemTime::now(); | |||
let t = get_last_requests_time()?; | |||
let res: color_eyre::Result<Vec<Option<String>>> = trackers.par_iter().map(|(name, info)| { | |||
let mut con = client_mgr.get_connection_from_tracker(info)?; | |||
for i in 0..inactive_minutes { | |||
let tim = t.clone() - i * 60u64; | |||
let k = format!("{name}:requests_processed:{tim}"); | |||
let ex: bool = con.exists(k)?; | |||
if ex { | |||
return Ok(Some(name.clone())) | |||
} | |||
} | |||
Ok(None) | |||
}) | |||
.collect(); | |||
let res = res?; | |||
let dur = SystemTime::now().duration_since(start).unwrap().as_secs_f32(); | |||
info!("Determining active projects took {dur:.04} seconds."); | |||
Ok(res.into_iter().filter_map(|v| v).collect()) | |||
} | |||
pub fn get_last_requests_time() -> color_eyre::Result<u64> { | |||
let now = SystemTime::now(); | |||
let unix = now.duration_since(SystemTime::UNIX_EPOCH)?.as_secs(); | |||
@@ -49,6 +49,12 @@ fn projects_refresh_job(client_mgr: Arc<ClientManager>, project_mgr: Arc<Tracker | |||
if let Err(e) = project_mgr.refresh_trackers(&client_mgr) { | |||
warn!("Unable to refresh projects: {e}") | |||
} | |||
info!("Updated project list. Updating active list in background..."); | |||
rayon::spawn(move || { | |||
if let Err(e) = project_mgr.refresh_active_trackers(&client_mgr) { | |||
warn!("Unable to refresh active projects: {e}") | |||
} | |||
}); | |||
} | |||
fn register_metrics_job<T: MetricsProvider + 'static>(interval: usize, schedule: &mut Scheduler, client_mgr: Arc<ClientManager>, project_mgr: Arc<TrackerInfoManager>, p_url: String, provider: Arc<T>) -> color_eyre::Result<()> { | |||
@@ -67,7 +73,7 @@ pub fn main() -> color_eyre::Result<()> { | |||
let args = Args::parse(); | |||
let client_mgr = Arc::new(ClientManager::new(args.get_master_redis_info())); | |||
let project_mgr = Arc::new(TrackerInfoManager::new(&client_mgr, args.included_projects.clone(), args.excluded_projects.clone())?); | |||
let project_mgr = Arc::new(TrackerInfoManager::new(&client_mgr, args.included_projects.clone(), args.excluded_projects.clone(), args.projects_inactive_minutes)?); | |||
let project_metrics = Arc::new(ProjectMetrics::new()?); | |||
let queue_metrics = Arc::new(QueueMetrics::new()?); | |||
let downloader_metrics = Arc::new(DownloaderMetrics::new()?); | |||
@@ -1,21 +1,32 @@ | |||
use std::collections::BTreeSet; | |||
use parking_lot::RwLock; | |||
use regex::Regex; | |||
use tracing::info; | |||
use crate::client_manager::ClientManager; | |||
use crate::helpers::get_trackers; | |||
use crate::helpers::{determine_active_trackers, get_trackers}; | |||
use crate::models::{TrackerInfo, TrackerMap}; | |||
pub struct TrackerInfoManager { | |||
trackers: RwLock<TrackerMap>, | |||
active_trackers: RwLock<BTreeSet<String>>, | |||
included_projects: Vec<Regex>, | |||
excluded_projects: Vec<Regex>, | |||
inactive_minutes: u64, | |||
} | |||
impl TrackerInfoManager { | |||
pub fn new(client_mgr: &ClientManager, included_projects: Vec<Regex>, excluded_projects: Vec<Regex>) -> color_eyre::Result<Self> { | |||
pub fn new(client_mgr: &ClientManager, included_projects: Vec<Regex>, excluded_projects: Vec<Regex>, inactive_minutes: u64) -> color_eyre::Result<Self> { | |||
info!("Fetching projects..."); | |||
let t = get_trackers(client_mgr)?; | |||
info!("Found {} projects.", t.len()); | |||
let a = determine_active_trackers(client_mgr, &t, inactive_minutes.clone())?; | |||
info!("Found {} projects are active.", a.len()); | |||
Ok(Self { | |||
trackers: RwLock::new(get_trackers(client_mgr)?), | |||
trackers: RwLock::new(t), | |||
active_trackers: RwLock::new(a), | |||
included_projects, | |||
excluded_projects, | |||
inactive_minutes, | |||
}) | |||
} | |||
@@ -24,6 +35,13 @@ impl TrackerInfoManager { | |||
*l = get_trackers(client_mgr)?; | |||
Ok(()) | |||
} | |||
pub fn refresh_active_trackers(&self, client_mgr: &ClientManager) -> color_eyre::Result<()> { | |||
let r = self.trackers.read(); | |||
let new = determine_active_trackers(client_mgr, &r, self.inactive_minutes.clone())?; | |||
let mut w = self.active_trackers.write(); | |||
*w = new; | |||
Ok(()) | |||
} | |||
pub fn get_tracker_info(&self, name: &str) -> color_eyre::Result<TrackerInfo> { | |||
let l = self.trackers.read(); | |||
@@ -41,13 +59,17 @@ impl TrackerInfoManager { | |||
pub fn project_is_excluded(&self, name: &str) -> bool { | |||
self.excluded_projects.iter().any(|r| r.is_match(name)) | |||
} | |||
pub fn project_is_active(&self, name: &str) -> bool { | |||
(self.inactive_minutes == 0) || self.active_trackers.read().contains(&name.to_string()) | |||
} | |||
pub fn enabled_projects(&self) -> Vec<(String, TrackerInfo)> { | |||
self.trackers.read().iter() | |||
.filter(|(name, _)| { | |||
self.project_is_included(name) && !self.project_is_excluded(name) | |||
self.project_is_included(name) && !self.project_is_excluded(name) && self.project_is_active(name) | |||
}) | |||
.map(|(a, b)| (a.clone(), b.clone())) | |||
.collect() | |||
} | |||
} |