use anyhow::Context; use chrono::prelude::*; use chrono_humanize::{Accuracy, HumanTime, Tense}; use futures::stream::StreamExt; use tokio::io::AsyncBufReadExt; use tokio::io::BufReader; use tokio_stream::wrappers::LinesStream; use tracing::{debug, info}; use args::ARGS; mod args; mod compression; mod connector; mod pbar; mod project_config; #[tokio::main] async fn main() -> anyhow::Result<()> { pbar::setup_logging(); let mut con = { let redis_url = project_config::get_redis_url() .await .context("getting project redis url")?; info!("Connecting to project redis..."); let proj_client = redis::Client::open(redis_url).context("project redis client connect")?; let c = proj_client .get_async_connection() .await .context("project redis get_async_connection")?; info!("Connected!"); c }; let (reader, total_bytes) = connector::open(true).await.context("opening input")?; //Turn bytes into lines. let reader = BufReader::new(reader); let reader = reader.lines(); let reader = LinesStream::new(reader); //Gather the lines into chunks of given size. let reader = reader.chunks(ARGS.chunk_size); //Gather the chunks into the pipelines. let mut reader = reader.chunks(ARGS.pipeline_size); //Keep track of some statistics. let mut counter = 0usize; let mut last_stamp: DateTime = Utc::now(); let start_stamp = last_stamp.clone(); //Initialize the progress bar. if !ARGS.no_progressbar { if let Some(total_size) = total_bytes { pbar::create_progress_bar(total_size); } } let redis_key = format!("{}:{}", ARGS.project, ARGS.queue); info!( "Writing into key {:?} using {} items per SADD and {} SADDS per pipeline...", redis_key, ARGS.chunk_size, ARGS.pipeline_size ); //Main processing loop while let Some(chunks) = reader.next().await { let mut pipeline = &mut redis::pipe(); let mut pipeline_counter = 0usize; //Create the pipeline commands. for chunk in chunks.into_iter() { let items: Vec = chunk .into_iter() .collect::>>() .context("reading items")?; debug!("Queueing chunk of {} items into pipeline...", items.len()); pipeline_counter += items.len(); pipeline = pipeline.sadd(&redis_key, items).ignore(); } //Submit pipeline debug!("Submitting pipeline with {} items...", pipeline_counter); pipeline .query_async(&mut con) .await .context("performing pipeline query")?; counter += pipeline_counter; //Compute statistics. let curr_stamp: DateTime = Utc::now(); let elapsed = curr_stamp.signed_duration_since(last_stamp); let elapsed_millis = elapsed.num_milliseconds(); let elapsed_secs = (elapsed_millis as f64) / 1000.0f64; last_stamp = curr_stamp; let ips = (pipeline_counter as f64) / elapsed_secs; //Update the progressbar if it exists or print a message instead. { let pb = pbar::PROGRESS_BAR.lock().unwrap(); if let Some(pb) = pb.as_ref() { pb.set_message(format!("{} items ({:.02} items/s)", counter, ips)); } else { let ht = HumanTime::from(curr_stamp.signed_duration_since(start_stamp)); info!("Items queued! Inserted {} items so far in {}. Inserted {} items this round in {:.03} seconds. ({:.03} items/s)", counter, ht.to_text_en(Accuracy::Rough, Tense::Present), pipeline_counter, elapsed_secs, ips ); } } } //We're done, close off the progress bar. pbar::finish_progress_bar(); //Print some last information. { let end_stamp: DateTime = Utc::now(); let ht = HumanTime::from(end_stamp.signed_duration_since(start_stamp)); info!( "Finished queueing in {}.", ht.to_text_en(Accuracy::Precise, Tense::Present) ); } Ok(()) }