|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- use anyhow::Context;
- use chrono::prelude::*;
- use chrono_humanize::{Accuracy, HumanTime, Tense};
- use futures::stream::StreamExt;
- use tokio::io::AsyncBufReadExt;
- use tokio::io::BufReader;
- use tokio_stream::wrappers::LinesStream;
- use tracing::{debug, info};
-
- use args::ARGS;
-
- mod args;
- mod compression;
- mod connector;
- mod pbar;
- mod project_config;
-
- #[tokio::main]
- async fn main() -> anyhow::Result<()> {
- pbar::setup_logging();
-
- let mut con = {
- let redis_url = project_config::get_redis_url()
- .await
- .context("getting project redis url")?;
- info!("Connecting to project redis...");
- let proj_client = redis::Client::open(redis_url).context("project redis client connect")?;
- let c = proj_client
- .get_async_connection()
- .await
- .context("project redis get_async_connection")?;
- info!("Connected!");
- c
- };
-
- let (reader, total_bytes) = connector::open(true).await.context("opening input")?;
-
- //Turn bytes into lines.
- let reader = BufReader::new(reader);
- let reader = reader.lines();
- let reader = LinesStream::new(reader);
-
- //Gather the lines into chunks of given size.
- let reader = reader.chunks(ARGS.chunk_size);
-
- //Gather the chunks into the pipelines.
- let mut reader = reader.chunks(ARGS.pipeline_size);
-
- //Keep track of some statistics.
- let mut counter = 0usize;
- let mut last_stamp: DateTime<Utc> = Utc::now();
- let start_stamp = last_stamp.clone();
-
- //Initialize the progress bar.
- if !ARGS.no_progressbar {
- if let Some(total_size) = total_bytes {
- pbar::create_progress_bar(total_size);
- }
- }
-
- let redis_key = format!("{}:{}", ARGS.project, ARGS.queue);
- info!(
- "Writing into key {:?} using {} items per SADD and {} SADDS per pipeline...",
- redis_key, ARGS.chunk_size, ARGS.pipeline_size
- );
-
- //Main processing loop
- while let Some(chunks) = reader.next().await {
- let mut pipeline = &mut redis::pipe();
- let mut pipeline_counter = 0usize;
-
- //Create the pipeline commands.
- for chunk in chunks.into_iter() {
- let items: Vec<String> = chunk
- .into_iter()
- .collect::<std::io::Result<Vec<String>>>()
- .context("reading items")?;
-
- debug!("Queueing chunk of {} items into pipeline...", items.len());
- pipeline_counter += items.len();
- pipeline = pipeline.sadd(&redis_key, items).ignore();
- }
-
- //Submit pipeline
- debug!("Submitting pipeline with {} items...", pipeline_counter);
- pipeline
- .query_async(&mut con)
- .await
- .context("performing pipeline query")?;
- counter += pipeline_counter;
-
- //Compute statistics.
- let curr_stamp: DateTime<Utc> = Utc::now();
- let elapsed = curr_stamp.signed_duration_since(last_stamp);
- let elapsed_millis = elapsed.num_milliseconds();
- let elapsed_secs = (elapsed_millis as f64) / 1000.0f64;
- last_stamp = curr_stamp;
- let ips = (pipeline_counter as f64) / elapsed_secs;
-
- //Update the progressbar if it exists or print a message instead.
- {
- let pb = pbar::PROGRESS_BAR.lock().unwrap();
- if let Some(pb) = pb.as_ref() {
- pb.set_message(format!("{} items ({:.02} items/s)", counter, ips));
- } else {
- let ht = HumanTime::from(curr_stamp.signed_duration_since(start_stamp));
- info!("Items queued! Inserted {} items so far in {}. Inserted {} items this round in {:.03} seconds. ({:.03} items/s)",
- counter,
- ht.to_text_en(Accuracy::Rough, Tense::Present),
- pipeline_counter,
- elapsed_secs,
- ips
- );
- }
- }
- }
-
- //We're done, close off the progress bar.
- pbar::finish_progress_bar();
-
- //Print some last information.
- {
- let end_stamp: DateTime<Utc> = Utc::now();
- let ht = HumanTime::from(end_stamp.signed_duration_since(start_stamp));
- info!(
- "Finished queueing in {}.",
- ht.to_text_en(Accuracy::Precise, Tense::Present)
- );
- }
- Ok(())
- }
|