commit 7fdcab7d6c65c0220c525d277f2d6f2d71ac2cfc Author: Roelf Wichertjes Date: Thu Dec 9 13:13:43 2021 +0100 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c13bb84 --- /dev/null +++ b/.gitignore @@ -0,0 +1,113 @@ + +# Created by https://www.toptal.com/developers/gitignore/api/rust,intellij+all +# Edit at https://www.toptal.com/developers/gitignore?templates=rust,intellij+all + +### Intellij+all ### +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# AWS User-specific +.idea/**/aws.xml + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/artifacts +# .idea/compiler.xml +# .idea/jarRepositories.xml +# .idea/modules.xml +# .idea/*.iml +# .idea/modules +# *.iml +# *.ipr + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests + +# Android studio 3.1+ serialized cache file +.idea/caches/build_file_checksums.ser + +### Intellij+all Patch ### +# Ignores the whole .idea folder and all .iml files +# See https://github.com/joeblau/gitignore.io/issues/186 and https://github.com/joeblau/gitignore.io/issues/360 + +.idea/ + +# Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-249601023 + +*.iml +modules.xml +.idea/misc.xml +*.ipr + +# Sonarlint plugin +.idea/sonarlint + +### Rust ### +# Generated by Cargo +# will have compiled files and executables +debug/ +target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb + +# End of https://www.toptal.com/developers/gitignore/api/rust,intellij+all diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..b734d52 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "queuectl" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = { version = "1.14.0", features = ["full"] } +anyhow = "1.0.51" +tracing = "0.1" +tracing-subscriber = "0.2" +clap = { version = "3.0.0-rc.0", features = ["derive"] } +redis = { version = "0.21.4", features = ["tokio-comp"] } +futures = "0.3" +tokio-stream = { version = "0.1.8", features = ["io-util"] } +async-compression = { version = "0.3.8", features = ["tokio", "all-algorithms"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +url = "2.2.2" +reqwest = { version = "0.11.7", features = ["rustls-tls-webpki-roots", "stream"], default-features = false } +tokio-util = { version = "0.6.9", features = ["io"] } +chrono = "0.4.19" +chrono-humanize = "0.2.1" +async-read-progress = "0.2.0" +indicatif = { version = "0.16.2", features = ["improved_unicode"] } +once_cell = "1.8.0" diff --git a/src/args.rs b/src/args.rs new file mode 100644 index 0000000..b3174b7 --- /dev/null +++ b/src/args.rs @@ -0,0 +1,34 @@ +use clap::Parser; +use once_cell::sync::Lazy; + +#[derive(Parser)] +#[clap(name = "queuectl")] +#[clap(version = "1.0")] +pub struct Cli { + #[clap(short, long)] + /// Input path. Can be a file or a url that starts with http:// or https://. + pub input: String, + #[clap(short, long)] + /// Tracker slug for the project. + pub project: String, + #[clap(short, long, default_value = "todo")] + /// The queue where items will be queued into. + pub queue: String, + #[clap(long, default_value = "8192")] + /// How many items to send per SADD command. + pub chunk_size: usize, + #[clap(long, default_value = "redis://127.0.0.1/")] + /// URL of the redis to connect to. + pub redis: String, + #[clap(long, short, default_value = "auto", arg_enum)] + /// Specify the compression of the input file. By default will autodetect by file extension. + pub compression: crate::compression::CompressionMode, + #[clap(long, default_value = "32")] + /// How many commands to pipeline into redis. + pub pipeline_size: usize, + #[clap(long)] + /// Do not show a progress bar. + pub no_progressbar: bool, +} + +pub static ARGS: Lazy = Lazy::new(|| Cli::parse()); diff --git a/src/compression.rs b/src/compression.rs new file mode 100644 index 0000000..86b5d9b --- /dev/null +++ b/src/compression.rs @@ -0,0 +1,53 @@ +use async_compression::tokio::bufread::{GzipDecoder, XzDecoder, ZstdDecoder}; +use clap::ArgEnum; +use tokio::io::{AsyncRead, BufReader}; + +#[derive(Copy, Clone, PartialEq, Ord, PartialOrd, Eq, ArgEnum, Debug)] +pub enum CompressionMode { + AUTO, + NONE, + ZSTD, + GZIP, + XZ, +} + +impl CompressionMode { + pub fn parse(filename: &str) -> Self { + if filename.to_ascii_lowercase().ends_with("zst") { + return Self::ZSTD; + } + if filename.to_ascii_lowercase().ends_with("zstd") { + return Self::ZSTD; + } + if filename.to_ascii_lowercase().ends_with("xz") { + return Self::XZ; + } + if filename.to_ascii_lowercase().ends_with("gz") { + return Self::GZIP; + } + + return Self::NONE; + } +} + +pub async fn get_decompressed_reader( + compression_mode: CompressionMode, + reader: Box, +) -> anyhow::Result> { + match compression_mode { + CompressionMode::AUTO => unreachable!(), + CompressionMode::NONE => Ok(Box::new(reader)), + CompressionMode::ZSTD => { + let reader = BufReader::new(reader); + Ok(Box::new(ZstdDecoder::new(reader))) + } + CompressionMode::GZIP => { + let reader = BufReader::new(reader); + Ok(Box::new(GzipDecoder::new(reader))) + } + CompressionMode::XZ => { + let reader = BufReader::new(reader); + Ok(Box::new(XzDecoder::new(reader))) + } + } +} diff --git a/src/connector.rs b/src/connector.rs new file mode 100644 index 0000000..736f90e --- /dev/null +++ b/src/connector.rs @@ -0,0 +1,75 @@ +use anyhow::{anyhow, Context}; +use async_read_progress::*; +use futures::stream::StreamExt; +use tokio::fs::File; +use tokio::io::AsyncRead; +use tokio_util::io::StreamReader; +use tracing::info; + +use crate::args::ARGS; +use crate::compression; +use crate::pbar; + +pub async fn open_http( + url: &str, +) -> anyhow::Result<(Box, Option, String)> { + info!("Requesting url {:?}...", url); + let client = reqwest::Client::new(); + let resp = client + .get(url) + .send() + .await + .context("http client get request")?; + let filename = String::from(resp.url().path_segments().unwrap().last().unwrap()); + info!("Completed request with code {}!", resp.status()); + if !resp.status().is_success() { + return Err(anyhow!("incorrect status code attempting to retrieve file")); + } + let size = resp.content_length(); + let stream = resp.bytes_stream(); + let stream = stream.map(|v| v.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))); + Ok((Box::new(StreamReader::new(stream)), size, filename)) +} + +pub async fn open( + update_progressbar: bool, +) -> anyhow::Result<(Box, Option)> { + let (reader, total_bytes, filename): (Box, Option, String) = + if ARGS.input == "-" { + info!("Reading from stdin..."); + (Box::new(tokio::io::stdin()), None, "dummy.txt".to_string()) + } else { + if ARGS.input.starts_with("http") { + open_http(&ARGS.input).await? + } else { + info!("Opening file {}...", ARGS.input); + let file = File::open(&ARGS.input).await.context("open input file")?; + let meta = file.metadata().await.context("read input file metadata")?; + (Box::new(file), Some(meta.len()), ARGS.input.clone()) + } + }; + + //Hook into the progress bar. + let reader = { + Box::new( + reader.report_progress(std::time::Duration::from_millis(20), move |bytes_read| { + if update_progressbar { + pbar::update_progress(bytes_read as u64); + } + }), + ) + }; + + let compression_mode = if ARGS.compression == compression::CompressionMode::AUTO { + info!("Attempting to guess compression mode..."); + compression::CompressionMode::parse(&filename) + } else { + ARGS.compression + }; + info!("Using compression mode {:?}.", compression_mode); + + info!("Attempting decompression..."); + let reader = compression::get_decompressed_reader(compression_mode, reader).await?; + info!("Ok!"); + Ok((reader, total_bytes)) +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..f6917d3 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,131 @@ +use anyhow::Context; +use chrono::prelude::*; +use chrono_humanize::{Accuracy, HumanTime, Tense}; +use futures::stream::StreamExt; +use tokio::io::AsyncBufReadExt; +use tokio::io::BufReader; +use tokio_stream::wrappers::LinesStream; +use tracing::{debug, info}; + +use args::ARGS; + +mod args; +mod compression; +mod connector; +mod pbar; +mod project_config; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + pbar::setup_logging(); + + let mut con = { + let redis_url = project_config::get_redis_url() + .await + .context("getting project redis url")?; + info!("Connecting to project redis..."); + let proj_client = redis::Client::open(redis_url).context("project redis client connect")?; + let c = proj_client + .get_async_connection() + .await + .context("project redis get_async_connection")?; + info!("Connected!"); + c + }; + + let (reader, total_bytes) = connector::open(true).await.context("opening input")?; + + //Turn bytes into lines. + let reader = BufReader::new(reader); + let reader = reader.lines(); + let reader = LinesStream::new(reader); + + //Gather the lines into chunks of given size. + let reader = reader.chunks(ARGS.chunk_size); + + //Gather the chunks into the pipelines. + let mut reader = reader.chunks(ARGS.pipeline_size); + + //Keep track of some statistics. + let mut counter = 0usize; + let mut last_stamp: DateTime = Utc::now(); + let start_stamp = last_stamp.clone(); + + //Initialize the progress bar. + if !ARGS.no_progressbar { + if let Some(total_size) = total_bytes { + pbar::create_progress_bar(total_size); + } + } + + let redis_key = format!("{}:{}", ARGS.project, ARGS.queue); + info!( + "Writing into key {:?} using {} items per SADD and {} SADDS per pipeline...", + redis_key, ARGS.chunk_size, ARGS.pipeline_size + ); + + //Main processing loop + while let Some(chunks) = reader.next().await { + let mut pipeline = &mut redis::pipe(); + let mut pipeline_counter = 0usize; + + //Create the pipeline commands. + for chunk in chunks.into_iter() { + let items: Vec = chunk + .into_iter() + .collect::>>() + .context("reading items")?; + + debug!("Queueing chunk of {} items into pipeline...", items.len()); + pipeline_counter += items.len(); + pipeline = pipeline.sadd(&redis_key, items).ignore(); + } + + //Submit pipeline + debug!("Submitting pipeline with {} items...", pipeline_counter); + pipeline + .query_async(&mut con) + .await + .context("performing pipeline query")?; + counter += pipeline_counter; + + //Compute statistics. + let curr_stamp: DateTime = Utc::now(); + let elapsed = curr_stamp.signed_duration_since(last_stamp); + let elapsed_millis = elapsed.num_milliseconds(); + let elapsed_secs = (elapsed_millis as f64) / 1000.0f64; + last_stamp = curr_stamp; + let ips = (pipeline_counter as f64) / elapsed_secs; + + //Update the progressbar if it exists or print a message instead. + { + let pb = pbar::PROGRESS_BAR.lock().unwrap(); + if let Some(pb) = pb.as_ref() { + pb.set_message(format!("{} items ({:.02} items/s)", counter, ips)); + } else { + let ht = HumanTime::from(curr_stamp.signed_duration_since(start_stamp)); + info!("Items queued! Inserted {} items so far in {}. Inserted {} items this round in {:.03} seconds. ({:.03} items/s)", + counter, + ht.to_text_en(Accuracy::Rough, Tense::Present), + pipeline_counter, + elapsed_secs, + ips + ); + } + } + } + + //We're done, close off the progress bar. + pbar::finish_progress_bar(); + + //Print some last information. + { + let end_stamp: DateTime = Utc::now(); + let ht = HumanTime::from(end_stamp.signed_duration_since(start_stamp)); + info!( + "Finished queueing in {}.", + ht.to_text_en(Accuracy::Precise, Tense::Present) + ); + } + Ok(()) +} diff --git a/src/pbar.rs b/src/pbar.rs new file mode 100644 index 0000000..48fcc47 --- /dev/null +++ b/src/pbar.rs @@ -0,0 +1,67 @@ +use indicatif::{ProgressBar, ProgressStyle}; +use once_cell::sync::Lazy; +use std::io::LineWriter; +use std::sync::Mutex; + +pub static PROGRESS_BAR: Lazy>> = Lazy::new(|| Mutex::new(None)); + +pub fn update_progress(position: u64) { + let pb = PROGRESS_BAR.lock().unwrap(); + if let Some(pb) = pb.as_ref() { + pb.set_position(position); + } +} + +pub fn create_progress_bar(total_size: u64) { + let pb = ProgressBar::new(total_size); + pb.enable_steady_tick(100); + pb.set_style(ProgressStyle::default_bar() + .template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} {msg} ({eta} remaining)") + .progress_chars("=>-")); + *PROGRESS_BAR.lock().unwrap() = Some(pb); +} + +pub fn finish_progress_bar() { + let mut pb = PROGRESS_BAR.lock().unwrap(); + if let Some(pb) = pb.as_ref() { + pb.finish(); + } + *pb = None; +} + +pub struct PBWriter {} + +impl PBWriter { + pub fn new() -> Self { + PBWriter {} + } +} + +impl std::io::Write for PBWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let pb = PROGRESS_BAR.lock().unwrap(); + if let Some(pb) = pb.as_ref() { + pb.println(std::str::from_utf8(buf).unwrap()); + Ok(buf.len()) + } else { + std::io::stderr().write(buf) + } + } + + fn flush(&mut self) -> std::io::Result<()> { + std::io::stderr().flush() + } +} + +pub fn setup_logging() { + std::env::set_var( + "RUST_LOG", + std::env::var("RUST_LOG").unwrap_or("info".to_string()), + ); + + tracing_subscriber::fmt::fmt() + .with_writer(move || -> Box { + Box::new(LineWriter::new(PBWriter::new())) + }) + .init(); +} diff --git a/src/project_config/config_json.rs b/src/project_config/config_json.rs new file mode 100644 index 0000000..6e0cde2 --- /dev/null +++ b/src/project_config/config_json.rs @@ -0,0 +1,23 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct Root { + pub max_claims_soft: Option, + pub max_claims_hard: Option, + pub moving_average_interval: Option, + pub min_script_version: Option, + pub title: String, + pub ignore_global_blocked: bool, + pub item_type: String, + pub history_length: i64, + pub domains: Option, + pub valid_item_regexp: String, + pub redis: Option, +} + +#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct Redis { + pub host: String, + pub port: u16, + pub pass: String, +} diff --git a/src/project_config/mod.rs b/src/project_config/mod.rs new file mode 100644 index 0000000..46c4de7 --- /dev/null +++ b/src/project_config/mod.rs @@ -0,0 +1,47 @@ +use crate::args::ARGS; +use anyhow::Context; +use redis::AsyncCommands; +use tracing::info; +use url::Url; + +pub mod config_json; + +pub async fn get_redis_url() -> anyhow::Result { + let base_redis_url = Url::parse(&ARGS.redis).context("parsing redis url")?; + info!( + "Connecting to redis {} to find project config...", + base_redis_url + ); + let client = redis::Client::open(base_redis_url.clone()).context("connecting to redis")?; + let mut base_con = client + .get_async_connection() + .await + .context("get_async_connection")?; + info!("Connected!"); + + info!("Attempting to retrieve project configuration..."); + let config: Option = base_con + .hget("trackers", &ARGS.project) + .await + .context("hget trackers")?; + if config.is_none() { + panic!("Unable to get project config!"); + } + + let config: config_json::Root = + serde_json::from_str(&config.unwrap()).context("parsing project config")?; + info!("Read config:\n{:#?}", config); + + if let Some(r) = config.redis { + let mut u = Url::parse("redis://127.0.0.1/")?; + u.set_host(Some(&r.host)).unwrap(); + u.set_port(Some(r.port)).unwrap(); + u.set_username("default").unwrap(); + u.set_password(Some(&r.pass)).unwrap(); + info!("Found project redis server at {}!", u); + Ok(u) + } else { + info!("No project-specific redis config found; staying on this redis!"); + Ok(base_redis_url) + } +}