@@ -0,0 +1,3 @@ | |||
/target/ | |||
/.idea/ | |||
/Cargo.lock |
@@ -0,0 +1,18 @@ | |||
--- | |||
kind: pipeline | |||
name: default | |||
steps: | |||
- name: docker | |||
image: plugins/docker | |||
settings: | |||
registry: atdr-writer.meo.ws | |||
username: | |||
from_secret: atdr_user | |||
password: | |||
from_secret: atdr_pass | |||
repo: atdr-writer.meo.ws/notarchiveteam/tracker-redis-exporter | |||
dockerfile: Dockerfile | |||
purge: true | |||
auto_tag: false | |||
tags: | |||
- latest |
@@ -0,0 +1,108 @@ | |||
# Created by https://www.toptal.com/developers/gitignore/api/rust,intellij+all | |||
# Edit at https://www.toptal.com/developers/gitignore?templates=rust,intellij+all | |||
### Intellij+all ### | |||
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider | |||
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 | |||
# User-specific stuff | |||
.idea/**/workspace.xml | |||
.idea/**/tasks.xml | |||
.idea/**/usage.statistics.xml | |||
.idea/**/dictionaries | |||
.idea/**/shelf | |||
# AWS User-specific | |||
.idea/**/aws.xml | |||
# Generated files | |||
.idea/**/contentModel.xml | |||
# Sensitive or high-churn files | |||
.idea/**/dataSources/ | |||
.idea/**/dataSources.ids | |||
.idea/**/dataSources.local.xml | |||
.idea/**/sqlDataSources.xml | |||
.idea/**/dynamic.xml | |||
.idea/**/uiDesigner.xml | |||
.idea/**/dbnavigator.xml | |||
# Gradle | |||
.idea/**/gradle.xml | |||
.idea/**/libraries | |||
# Gradle and Maven with auto-import | |||
# When using Gradle or Maven with auto-import, you should exclude module files, | |||
# since they will be recreated, and may cause churn. Uncomment if using | |||
# auto-import. | |||
# .idea/artifacts | |||
# .idea/compiler.xml | |||
# .idea/jarRepositories.xml | |||
# .idea/modules.xml | |||
# .idea/*.iml | |||
# .idea/modules | |||
# *.iml | |||
# *.ipr | |||
# CMake | |||
cmake-build-*/ | |||
# Mongo Explorer plugin | |||
.idea/**/mongoSettings.xml | |||
# File-based project format | |||
*.iws | |||
# IntelliJ | |||
out/ | |||
# mpeltonen/sbt-idea plugin | |||
.idea_modules/ | |||
# JIRA plugin | |||
atlassian-ide-plugin.xml | |||
# Cursive Clojure plugin | |||
.idea/replstate.xml | |||
# SonarLint plugin | |||
.idea/sonarlint/ | |||
# Crashlytics plugin (for Android Studio and IntelliJ) | |||
com_crashlytics_export_strings.xml | |||
crashlytics.properties | |||
crashlytics-build.properties | |||
fabric.properties | |||
# Editor-based Rest Client | |||
.idea/httpRequests | |||
# Android studio 3.1+ serialized cache file | |||
.idea/caches/build_file_checksums.ser | |||
### Intellij+all Patch ### | |||
# Ignore everything but code style settings and run configurations | |||
# that are supposed to be shared within teams. | |||
.idea/* | |||
!.idea/codeStyles | |||
!.idea/runConfigurations | |||
### Rust ### | |||
# Generated by Cargo | |||
# will have compiled files and executables | |||
debug/ | |||
target/ | |||
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries | |||
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html | |||
Cargo.lock | |||
# These are backup files generated by rustfmt | |||
**/*.rs.bk | |||
# MSVC Windows builds of rustc generate these, which store debugging information | |||
*.pdb | |||
# End of https://www.toptal.com/developers/gitignore/api/rust,intellij+all |
@@ -0,0 +1,23 @@ | |||
[package] | |||
name = "tracker-redis-exporter" | |||
version = "0.1.0" | |||
edition = "2021" | |||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |||
[dependencies] | |||
clap = { version = "4.3.19", features = ["derive", "env"] } | |||
color-eyre = "0.6.2" | |||
lock_api = "0.4.10" | |||
parking_lot = "0.12.1" | |||
prometheus = { version = "0.13.3", features = ["push", "process"] } | |||
r2d2 = "0.8.10" | |||
rayon = "1.7.0" | |||
redis = { version = "0.23.1", features = ["r2d2"] } | |||
regex = "1.9.3" | |||
serde = { version = "1.0.182", features = ["derive"] } | |||
serde_json = "1.0.104" | |||
skedge = "0.1.3" | |||
tracing = "0.1.37" | |||
tracing-subscriber = "0.3.17" | |||
url = "2.4.0" |
@@ -0,0 +1,25 @@ | |||
# Build Stage | |||
FROM rust:1.71.1-bookworm AS builder | |||
RUN apt-get update \ | |||
&& apt-get install -y openssl ca-certificates tini libssl3 libssl-dev build-essential \ | |||
&& apt-get clean \ | |||
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* | |||
WORKDIR /usr/src/tracker-redis-exporter | |||
COPY Cargo.toml . | |||
COPY src src | |||
RUN cargo build --release | |||
FROM debian:bookworm | |||
WORKDIR /app | |||
RUN apt-get update \ | |||
&& apt-get install -y openssl ca-certificates tini libssl3 \ | |||
&& apt-get clean \ | |||
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* | |||
# Copy bin from builder to this new image | |||
COPY --from=builder /usr/src/tracker-redis-exporter/target/release/tracker-redis-exporter /app/tracker-redis-exporter | |||
# Default command, run app | |||
ENTRYPOINT ["/usr/bin/tini-static", "--", "/app/tracker-redis-exporter"] |
@@ -0,0 +1,37 @@ | |||
use clap::Parser; | |||
use regex::Regex; | |||
use crate::models::TrackerRedisInfo; | |||
#[derive(Parser, Debug)] | |||
#[command(author, version, about, long_about = None)] | |||
pub struct Args { | |||
#[arg(short, long, default_value = "redis://127.0.0.1:6379")] | |||
master_redis: url::Url, | |||
#[arg(short, long, default_value = "http://127.0.0.1:9091")] | |||
pub push_url: url::Url, | |||
#[arg(short, long)] | |||
pub included_projects: Vec<Regex>, | |||
#[arg(short, long)] | |||
pub excluded_projects: Vec<Regex>, | |||
#[arg(long, default_value = "60")] | |||
pub projects_refresh_interval: usize, | |||
#[arg(long, default_value = "60")] | |||
pub downloader_metrics_refresh_interval: usize, | |||
#[arg(long, default_value = "15")] | |||
pub queue_metrics_refresh_interval: usize, | |||
#[arg(long, default_value = "15")] | |||
pub project_metrics_refresh_interval: usize, | |||
} | |||
impl Args { | |||
pub fn get_master_redis_info(&self) -> TrackerRedisInfo { | |||
TrackerRedisInfo::from(self.master_redis.clone()) | |||
} | |||
} |
@@ -0,0 +1,69 @@ | |||
use std::collections::HashMap; | |||
use parking_lot::RwLock; | |||
use r2d2::{Pool, PooledConnection}; | |||
use redis::Client; | |||
use crate::models::{TrackerInfo, TrackerRedisInfo}; | |||
fn pool_from_redisinfo(info: &TrackerRedisInfo) -> color_eyre::Result<Pool<Client>> { | |||
let url = if let Some(pass) = &info.pass { | |||
format!("redis://:{}@{}:{}", pass, info.host, info.port) | |||
} else { | |||
format!("redis://{}:{}", info.host, info.port) | |||
}; | |||
let new_client = Client::open(url)?; | |||
let pool = Pool::builder() | |||
.max_size(32) | |||
.build(new_client)?; | |||
Ok(pool) | |||
} | |||
pub type RedisConnection = PooledConnection<Client>; | |||
pub struct ClientManager { | |||
client_map: RwLock<HashMap<TrackerRedisInfo, Pool<Client>>>, | |||
master_redis_info: TrackerRedisInfo, | |||
} | |||
impl ClientManager { | |||
pub fn new(master_redis_info: TrackerRedisInfo) -> Self { | |||
Self { | |||
client_map: RwLock::new(HashMap::new()), | |||
master_redis_info, | |||
} | |||
} | |||
pub fn get_connection_from_tracker(&self, info: &TrackerInfo) -> color_eyre::Result<RedisConnection> { | |||
if let Some(redis) = &info.redis { | |||
self.get_connection(redis) | |||
} else { | |||
self.get_master_connection() | |||
} | |||
} | |||
pub fn get_master_connection(&self) -> color_eyre::Result<RedisConnection> { | |||
self.get_connection(&self.master_redis_info) | |||
} | |||
pub fn get_connection(&self, info: &TrackerRedisInfo) -> color_eyre::Result<RedisConnection> { | |||
let d = self.client_map.read(); | |||
if let Some(client) = d.get(info) { | |||
let con = client.get()?; | |||
Ok(con) | |||
} else { | |||
drop(d); | |||
let mut d = self.client_map.write(); | |||
if let Some(client) = d.get(info) { | |||
let con = client.get()?; | |||
Ok(con) | |||
} else { | |||
let new_client = pool_from_redisinfo(info)?; | |||
let con = new_client.get()?; | |||
d.insert(info.clone(), new_client); | |||
Ok(con) | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,35 @@ | |||
use std::collections::BTreeMap; | |||
use std::time::SystemTime; | |||
use redis::Commands; | |||
use tracing::warn; | |||
use crate::client_manager::ClientManager; | |||
use crate::models::{TrackerInfo, TrackerMap}; | |||
pub fn get_trackers(client_mgr: &ClientManager) -> color_eyre::Result<TrackerMap> { | |||
let mut con = client_mgr.get_master_connection()?; | |||
let trackers: BTreeMap<String, String> = con.hgetall("trackers")?; | |||
let mut errors = vec![]; | |||
let trackers: BTreeMap<String, TrackerInfo> = trackers.into_iter() | |||
.map(|(name, info)| { | |||
let info: serde_json::Result<TrackerInfo> = serde_json::from_str(&info); | |||
(name, info) | |||
}) | |||
.filter_map(|(n,v)| { | |||
v.map_err(|e| errors.push((n.clone(), e))) | |||
.ok() | |||
.map(|v| (n, v)) | |||
}) | |||
.collect(); | |||
for (n, e) in errors { | |||
warn!("Unable to parse tracker data for {n}: {e}"); | |||
} | |||
Ok(trackers) | |||
} | |||
pub fn get_last_requests_time() -> color_eyre::Result<u64> { | |||
let now = SystemTime::now(); | |||
let unix = now.duration_since(SystemTime::UNIX_EPOCH)?.as_secs(); | |||
let unix = unix - 60; | |||
let d = unix.clone() % 60; | |||
Ok(unix - d) | |||
} |
@@ -0,0 +1,112 @@ | |||
use std::collections::HashMap; | |||
use std::sync::Arc; | |||
use std::time::SystemTime; | |||
use clap::Parser; | |||
use prometheus::push_metrics; | |||
use skedge::{every, Interval, Scheduler}; | |||
use tracing::{info, warn}; | |||
use rayon::prelude::*; | |||
use crate::args::Args; | |||
use crate::client_manager::ClientManager; | |||
use crate::metrics::MetricsProvider; | |||
use crate::metrics::downloader_metrics::DownloaderMetrics; | |||
use crate::metrics::project_metrics::ProjectMetrics; | |||
use crate::metrics::queue_metrics::QueueMetrics; | |||
use crate::tracker_manager::TrackerInfoManager; | |||
pub mod models; | |||
pub mod args; | |||
pub mod client_manager; | |||
pub mod tracker_manager; | |||
pub mod helpers; | |||
pub mod metrics; | |||
fn metric_refresh_job<T: MetricsProvider>(client_mgr: Arc<ClientManager>, project_mgr: Arc<TrackerInfoManager>, url: String, provider: Arc<T>) { | |||
let start = SystemTime::now(); | |||
project_mgr.enabled_projects().into_par_iter().for_each({ | |||
let client_mgr = client_mgr.clone(); | |||
let provider = provider.clone(); | |||
move |(name, info)| { | |||
info!("Refresing {} metrics for {name}...", provider.get_metric_type_name()); | |||
if let Err(e) = provider.update(&client_mgr, &name, &info) { | |||
warn!("Unable to refresh {} metrics: {e}", provider.get_metric_type_name()) | |||
} | |||
} | |||
} | |||
); | |||
let mfs = provider.get_registry().unwrap().gather(); | |||
info!("Pushing {} metric families for {}.", mfs.len(), provider.get_metric_type_name()); | |||
if let Err(e) = push_metrics("tracker-redis-exporter", HashMap::from([("instance".to_string(), provider.get_metric_type_name().to_string())]), &url, mfs, None) { | |||
warn!("Unable to push {} metrics: {e}", provider.get_metric_type_name()) | |||
} | |||
let delta = SystemTime::now().duration_since(start).unwrap().as_secs_f32(); | |||
info!("Refreshed {} in {delta:.04} seconds.", provider.get_metric_type_name()); | |||
} | |||
fn projects_refresh_job(client_mgr: Arc<ClientManager>, project_mgr: Arc<TrackerInfoManager>) { | |||
info!("Refreshing project list..."); | |||
if let Err(e) = project_mgr.refresh_trackers(&client_mgr) { | |||
warn!("Unable to refresh projects: {e}") | |||
} | |||
} | |||
fn register_metrics_job<T: MetricsProvider + 'static>(interval: usize, schedule: &mut Scheduler, client_mgr: Arc<ClientManager>, project_mgr: Arc<TrackerInfoManager>, p_url: String, provider: Arc<T>) -> color_eyre::Result<()> { | |||
metric_refresh_job(client_mgr.clone(), project_mgr.clone(), p_url.clone(), provider.clone()); | |||
every(interval as Interval) | |||
.seconds()? | |||
.run_four_args(schedule, metric_refresh_job, client_mgr, project_mgr, p_url, provider)?; | |||
Ok(()) | |||
} | |||
pub fn main() -> color_eyre::Result<()> { | |||
let subscriber = tracing_subscriber::FmtSubscriber::new(); | |||
tracing::subscriber::set_global_default(subscriber)?; | |||
color_eyre::install()?; | |||
let args = Args::parse(); | |||
let client_mgr = Arc::new(ClientManager::new(args.get_master_redis_info())); | |||
let project_mgr = Arc::new(TrackerInfoManager::new(&client_mgr, args.included_projects.clone(), args.excluded_projects.clone())?); | |||
let project_metrics = Arc::new(ProjectMetrics::new()?); | |||
let queue_metrics = Arc::new(QueueMetrics::new()?); | |||
let downloader_metrics = Arc::new(DownloaderMetrics::new()?); | |||
let p_url = args.push_url.to_string(); | |||
let mut schedule = Scheduler::new(); | |||
every(args.projects_refresh_interval as Interval) | |||
.seconds()? | |||
.run_two_args(&mut schedule, projects_refresh_job, client_mgr.clone(), project_mgr.clone())?; | |||
register_metrics_job( | |||
args.project_metrics_refresh_interval, | |||
&mut schedule, | |||
client_mgr.clone(), | |||
project_mgr.clone(), | |||
p_url.clone(), | |||
project_metrics.clone())?; | |||
register_metrics_job( | |||
args.queue_metrics_refresh_interval, | |||
&mut schedule, | |||
client_mgr.clone(), | |||
project_mgr.clone(), | |||
p_url.clone(), | |||
queue_metrics.clone())?; | |||
register_metrics_job( | |||
args.downloader_metrics_refresh_interval, | |||
&mut schedule, | |||
client_mgr.clone(), | |||
project_mgr.clone(), | |||
p_url.clone(), | |||
downloader_metrics.clone())?; | |||
loop { | |||
if let Err(e) = schedule.run_pending() { | |||
warn!("Error: {e}"); | |||
} | |||
std::thread::sleep(std::time::Duration::from_secs(1)); | |||
} | |||
} |
@@ -0,0 +1,63 @@ | |||
use std::collections::BTreeMap; | |||
use prometheus::{IntGaugeVec, Opts, Registry}; | |||
use redis::Commands; | |||
use crate::client_manager::{ClientManager, RedisConnection}; | |||
use crate::metrics::MetricsProvider; | |||
use crate::models::TrackerInfo; | |||
fn update_downloader_gauge_from_redis( | |||
con: &mut RedisConnection, | |||
project: &str, | |||
redis_key: &str, | |||
metric: &IntGaugeVec, | |||
) -> color_eyre::Result<()> { | |||
let k = format!("{project}:{redis_key}"); | |||
let m: BTreeMap<String, i64> = con.hgetall(k)?; | |||
for (downloader_name, value) in m { | |||
let m = metric.get_metric_with_label_values(&[project, downloader_name.as_str()])?; | |||
m.set(value); | |||
} | |||
Ok(()) | |||
} | |||
pub struct DownloaderMetrics { | |||
registry: Registry, | |||
downloader_count: IntGaugeVec, | |||
downloader_bytes: IntGaugeVec, | |||
} | |||
impl DownloaderMetrics { | |||
pub fn new() -> color_eyre::Result<Self> { | |||
let registry = Registry::new_custom(Some("at_tracker".to_string()), None)?; | |||
let s = Self { | |||
registry, | |||
downloader_count: IntGaugeVec::new(Opts::new("downloader_count", "downloader_count"), &["project", "downloader_name"])?, | |||
downloader_bytes: IntGaugeVec::new(Opts::new("downloader_bytes", "downloader_bytes"), &["project", "downloader_name"])?, | |||
}; | |||
s.registry.register(Box::new(s.downloader_count.clone()))?; | |||
s.registry.register(Box::new(s.downloader_bytes.clone()))?; | |||
Ok(s) | |||
} | |||
} | |||
impl MetricsProvider for DownloaderMetrics { | |||
fn get_registry(&self) -> color_eyre::Result<&Registry> { | |||
Ok(&self.registry) | |||
} | |||
fn update(&self, client_mgr: &ClientManager, project: &str, info: &TrackerInfo) -> color_eyre::Result<()> { | |||
let mut con = client_mgr.get_connection_from_tracker(info)?; | |||
update_downloader_gauge_from_redis(&mut con, project, "downloader_count", &self.downloader_count)?; | |||
update_downloader_gauge_from_redis(&mut con, project, "downloader_bytes", &self.downloader_bytes)?; | |||
Ok(()) | |||
} | |||
fn get_metric_type_name(&self) -> &str { | |||
"downloader" | |||
} | |||
} |
@@ -0,0 +1,13 @@ | |||
use prometheus::Registry; | |||
use crate::client_manager::ClientManager; | |||
use crate::models::TrackerInfo; | |||
pub mod project_metrics; | |||
pub mod queue_metrics; | |||
pub mod downloader_metrics; | |||
pub trait MetricsProvider: Sync + Send { | |||
fn get_registry(&self) -> color_eyre::Result<&Registry>; | |||
fn update(&self, client_mgr: &ClientManager, project: &str, info: &TrackerInfo) -> color_eyre::Result<()>; | |||
fn get_metric_type_name(&self) -> &str; | |||
} |
@@ -0,0 +1,83 @@ | |||
use prometheus::{GaugeVec, IntGaugeVec, Opts, Registry}; | |||
use prometheus::core::{Atomic, GenericGaugeVec}; | |||
use redis::{Commands, FromRedisValue}; | |||
use crate::client_manager::{ClientManager, RedisConnection}; | |||
use crate::helpers::get_last_requests_time; | |||
use crate::metrics::MetricsProvider; | |||
use crate::models::TrackerInfo; | |||
fn update_gauge_from_redis<A: Atomic<T=T>, T: FromRedisValue>( | |||
con: &mut RedisConnection, | |||
project: &str, | |||
redis_key: &str, | |||
metric: &GenericGaugeVec<A>, | |||
) -> color_eyre::Result<()> { | |||
let key = format!("{project}:{redis_key}"); | |||
let val: Option<T> = con.get(&key)?; | |||
if let Some(v) = val { | |||
let m = metric.get_metric_with_label_values(&[project])?; | |||
m.set(v); | |||
} | |||
Ok(()) | |||
} | |||
pub struct ProjectMetrics { | |||
registry: Registry, | |||
item_request_serve_rate: GaugeVec, | |||
reclaim_serve_rate: GaugeVec, | |||
reclaim_rate: GaugeVec, | |||
requests_processed: IntGaugeVec, | |||
requests_granted: IntGaugeVec, | |||
item_fail_rate: GaugeVec, | |||
} | |||
impl ProjectMetrics { | |||
pub fn new() -> color_eyre::Result<Self> { | |||
let registry = Registry::new_custom(Some("at_tracker".to_string()), None)?; | |||
let s = Self { | |||
registry, | |||
item_request_serve_rate: GaugeVec::new(Opts::new("item_request_serve_rate", "item_request_serve_rate"), &["project"])?, | |||
reclaim_serve_rate: GaugeVec::new(Opts::new("reclaim_serve_rate", "reclaim_serve_rate"), &["project"])?, | |||
reclaim_rate: GaugeVec::new(Opts::new("reclaim_rate", "reclaim_rate"), &["project"])?, | |||
requests_processed: IntGaugeVec::new(Opts::new("requests_processed", "requests_processed"), &["project"])?, | |||
requests_granted: IntGaugeVec::new(Opts::new("requests_granted", "requests_granted"), &["project"])?, | |||
item_fail_rate: GaugeVec::new(Opts::new("item_fail_rate", "item_fail_rate"), &["project"])?, | |||
}; | |||
s.registry.register(Box::new(s.item_request_serve_rate.clone()))?; | |||
s.registry.register(Box::new(s.reclaim_serve_rate.clone()))?; | |||
s.registry.register(Box::new(s.reclaim_rate.clone()))?; | |||
s.registry.register(Box::new(s.requests_processed.clone()))?; | |||
s.registry.register(Box::new(s.requests_granted.clone()))?; | |||
s.registry.register(Box::new(s.item_fail_rate.clone()))?; | |||
Ok(s) | |||
} | |||
} | |||
impl MetricsProvider for ProjectMetrics { | |||
fn get_registry(&self) -> color_eyre::Result<&Registry> { | |||
Ok(&self.registry) | |||
} | |||
fn update(&self, client_mgr: &ClientManager, project: &str, info: &TrackerInfo) -> color_eyre::Result<()> { | |||
let mut con = client_mgr.get_connection_from_tracker(info)?; | |||
update_gauge_from_redis(&mut con, project, "item_request_serve_rate", &self.item_request_serve_rate)?; | |||
update_gauge_from_redis(&mut con, project, "reclaim_serve_rate", &self.reclaim_serve_rate)?; | |||
update_gauge_from_redis(&mut con, project, "reclaim_rate", &self.reclaim_rate)?; | |||
update_gauge_from_redis(&mut con, project, "item_fail_rate", &self.item_fail_rate)?; | |||
let t = get_last_requests_time()?; | |||
let rp_key = format!("requests_processed:{t}"); | |||
update_gauge_from_redis(&mut con, project, &rp_key, &self.requests_processed)?; | |||
let rg_key = format!("requests_granted:{t}"); | |||
update_gauge_from_redis(&mut con, project, &rg_key, &self.requests_granted)?; | |||
Ok(()) | |||
} | |||
fn get_metric_type_name(&self) -> &str { | |||
"project" | |||
} | |||
} |
@@ -0,0 +1,85 @@ | |||
use std::collections::HashSet; | |||
use prometheus::{IntGaugeVec, Opts, Registry}; | |||
use redis::Commands; | |||
use crate::client_manager::ClientManager; | |||
use crate::metrics::MetricsProvider; | |||
use crate::models::TrackerInfo; | |||
pub struct QueueMetrics { | |||
registry: Registry, | |||
queue_size: IntGaugeVec, | |||
} | |||
impl QueueMetrics { | |||
pub fn new() -> color_eyre::Result<Self> { | |||
let registry = Registry::new_custom(Some("at_tracker".to_string()), None)?; | |||
let s = Self { | |||
registry, | |||
queue_size: IntGaugeVec::new(Opts::new("queue_size", "queue_size"), &["project", "queue"])?, | |||
}; | |||
s.registry.register(Box::new(s.queue_size.clone()))?; | |||
Ok(s) | |||
} | |||
} | |||
impl MetricsProvider for QueueMetrics { | |||
fn get_registry(&self) -> color_eyre::Result<&Registry> { | |||
Ok(&self.registry) | |||
} | |||
fn update(&self, client_mgr: &ClientManager, project: &str, info: &TrackerInfo) -> color_eyre::Result<()> { | |||
let mut con = client_mgr.get_connection_from_tracker(info)?; | |||
// Default queue set. | |||
let mut queues: HashSet<String> = HashSet::from([ | |||
"todo".to_string(), | |||
"todo:redo".to_string(), | |||
"todo:backfeed".to_string(), | |||
"unretrievable".to_string(), | |||
]); | |||
// Read the queues from the offloaded key. | |||
let offloaded_key = format!("{project}:offloaded"); | |||
let offloaded_keys: Vec<String> = con.hkeys(&offloaded_key)?; | |||
// Remove queues that need special handling. | |||
queues.extend(offloaded_keys.into_iter().filter(|v| (v != "done") && (v != "claims"))); | |||
for queue in queues { | |||
let k = format!("{project}:{queue}"); | |||
let c: Option<i64> = con.scard(k)?; | |||
if let Some(a) = c { | |||
let d: Option<i64> = con.hget(&offloaded_key, &queue)?; | |||
let a = a + d.unwrap_or(0); | |||
let m = self.queue_size.get_metric_with_label_values(&[project, queue.as_str()])?; | |||
m.set(a); | |||
} | |||
} | |||
// Handle done queue. | |||
{ | |||
let c: Option<i64> = con.get(format!("{project}:done_counter"))?; | |||
if let Some(a) = c { | |||
let m = self.queue_size.get_metric_with_label_values(&[project, "done"])?; | |||
m.set(a); | |||
} | |||
} | |||
// Handle claims queue. | |||
{ | |||
let c: Option<i64> = con.hlen(format!("{project}:claims"))?; | |||
if let Some(a) = c { | |||
let m = self.queue_size.get_metric_with_label_values(&[project, "claims"])?; | |||
m.set(a); | |||
} | |||
} | |||
Ok(()) | |||
} | |||
fn get_metric_type_name(&self) -> &str { | |||
"queue" | |||
} | |||
} |
@@ -0,0 +1,42 @@ | |||
use std::collections::BTreeMap; | |||
use serde::Deserialize; | |||
use url::Url; | |||
#[derive(Debug, Clone, Deserialize, Hash, Eq, PartialEq)] | |||
pub struct TrackerRedisInfo { | |||
pub host: String, | |||
pub pass: Option<String>, | |||
pub port: u16, | |||
} | |||
impl From<Url> for TrackerRedisInfo { | |||
fn from(value: Url) -> Self { | |||
let port = value.port().unwrap_or(6379); | |||
let host = value.host().map(|v| v.to_string()).unwrap_or_else(|| "127.0.0.1".to_string()); | |||
let pass = value.password().map(|v| v.to_string()); | |||
Self { | |||
host, | |||
pass, | |||
port, | |||
} | |||
} | |||
} | |||
#[derive(Debug, Clone, Deserialize)] | |||
pub struct TrackerOffloadInfo { | |||
pub high: usize, | |||
pub low: usize, | |||
pub pipelinesize: Option<usize>, | |||
} | |||
#[derive(Debug, Clone, Deserialize)] | |||
pub struct TrackerInfo { | |||
pub title: String, | |||
pub min_script_version: String, | |||
pub redis: Option<TrackerRedisInfo>, | |||
pub max_claims_hard: Option<usize>, | |||
pub max_claims_soft: Option<usize>, | |||
pub offload: Option<TrackerOffloadInfo>, | |||
} | |||
pub type TrackerMap = BTreeMap<String, TrackerInfo>; |
@@ -0,0 +1,53 @@ | |||
use parking_lot::RwLock; | |||
use regex::Regex; | |||
use crate::client_manager::ClientManager; | |||
use crate::helpers::get_trackers; | |||
use crate::models::{TrackerInfo, TrackerMap}; | |||
pub struct TrackerInfoManager { | |||
trackers: RwLock<TrackerMap>, | |||
included_projects: Vec<Regex>, | |||
excluded_projects: Vec<Regex>, | |||
} | |||
impl TrackerInfoManager { | |||
pub fn new(client_mgr: &ClientManager, included_projects: Vec<Regex>, excluded_projects: Vec<Regex>) -> color_eyre::Result<Self> { | |||
Ok(Self { | |||
trackers: RwLock::new(get_trackers(client_mgr)?), | |||
included_projects, | |||
excluded_projects, | |||
}) | |||
} | |||
pub fn refresh_trackers(&self, client_mgr: &ClientManager) -> color_eyre::Result<()> { | |||
let mut l = self.trackers.write(); | |||
*l = get_trackers(client_mgr)?; | |||
Ok(()) | |||
} | |||
pub fn get_tracker_info(&self, name: &str) -> color_eyre::Result<TrackerInfo> { | |||
let l = self.trackers.read(); | |||
if let Some(v) = l.get(&name.to_string()) { | |||
Ok(v.clone()) | |||
} else { | |||
Err(color_eyre::eyre::eyre!("Tracker {} not found!", name)) | |||
} | |||
} | |||
pub fn project_is_included(&self, name: &str) -> bool { | |||
self.included_projects.iter().all(|r| r.is_match(name)) | |||
} | |||
pub fn project_is_excluded(&self, name: &str) -> bool { | |||
self.excluded_projects.iter().any(|r| r.is_match(name)) | |||
} | |||
pub fn enabled_projects(&self) -> Vec<(String, TrackerInfo)> { | |||
self.trackers.read().iter() | |||
.filter(|(name, _)| { | |||
self.project_is_included(name) && !self.project_is_excluded(name) | |||
}) | |||
.map(|(a, b)| (a.clone(), b.clone())) | |||
.collect() | |||
} | |||
} |