You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

132 lines
4.2 KiB

  1. use anyhow::Context;
  2. use chrono::prelude::*;
  3. use chrono_humanize::{Accuracy, HumanTime, Tense};
  4. use futures::stream::StreamExt;
  5. use tokio::io::AsyncBufReadExt;
  6. use tokio::io::BufReader;
  7. use tokio_stream::wrappers::LinesStream;
  8. use tracing::{debug, info};
  9. use args::ARGS;
  10. mod args;
  11. mod compression;
  12. mod connector;
  13. mod pbar;
  14. mod project_config;
  15. #[tokio::main]
  16. async fn main() -> anyhow::Result<()> {
  17. pbar::setup_logging();
  18. let mut con = {
  19. let redis_url = project_config::get_redis_url()
  20. .await
  21. .context("getting project redis url")?;
  22. info!("Connecting to project redis...");
  23. let proj_client = redis::Client::open(redis_url).context("project redis client connect")?;
  24. let c = proj_client
  25. .get_async_connection()
  26. .await
  27. .context("project redis get_async_connection")?;
  28. info!("Connected!");
  29. c
  30. };
  31. let (reader, total_bytes) = connector::open(true).await.context("opening input")?;
  32. //Turn bytes into lines.
  33. let reader = BufReader::new(reader);
  34. let reader = reader.lines();
  35. let reader = LinesStream::new(reader);
  36. //Gather the lines into chunks of given size.
  37. let reader = reader.chunks(ARGS.chunk_size);
  38. //Gather the chunks into the pipelines.
  39. let mut reader = reader.chunks(ARGS.pipeline_size);
  40. //Keep track of some statistics.
  41. let mut counter = 0usize;
  42. let mut last_stamp: DateTime<Utc> = Utc::now();
  43. let start_stamp = last_stamp.clone();
  44. //Initialize the progress bar.
  45. if !ARGS.no_progressbar {
  46. if let Some(total_size) = total_bytes {
  47. pbar::create_progress_bar(total_size);
  48. }
  49. }
  50. let redis_key = format!("{}:{}", ARGS.project, ARGS.queue);
  51. info!(
  52. "Writing into key {:?} using {} items per SADD and {} SADDS per pipeline...",
  53. redis_key, ARGS.chunk_size, ARGS.pipeline_size
  54. );
  55. //Main processing loop
  56. while let Some(chunks) = reader.next().await {
  57. let mut pipeline = &mut redis::pipe();
  58. let mut pipeline_counter = 0usize;
  59. //Create the pipeline commands.
  60. for chunk in chunks.into_iter() {
  61. let items: Vec<String> = chunk
  62. .into_iter()
  63. .collect::<std::io::Result<Vec<String>>>()
  64. .context("reading items")?;
  65. debug!("Queueing chunk of {} items into pipeline...", items.len());
  66. pipeline_counter += items.len();
  67. pipeline = pipeline.sadd(&redis_key, items).ignore();
  68. }
  69. //Submit pipeline
  70. debug!("Submitting pipeline with {} items...", pipeline_counter);
  71. pipeline
  72. .query_async(&mut con)
  73. .await
  74. .context("performing pipeline query")?;
  75. counter += pipeline_counter;
  76. //Compute statistics.
  77. let curr_stamp: DateTime<Utc> = Utc::now();
  78. let elapsed = curr_stamp.signed_duration_since(last_stamp);
  79. let elapsed_millis = elapsed.num_milliseconds();
  80. let elapsed_secs = (elapsed_millis as f64) / 1000.0f64;
  81. last_stamp = curr_stamp;
  82. let ips = (pipeline_counter as f64) / elapsed_secs;
  83. //Update the progressbar if it exists or print a message instead.
  84. {
  85. let pb = pbar::PROGRESS_BAR.lock().unwrap();
  86. if let Some(pb) = pb.as_ref() {
  87. pb.set_message(format!("{} items ({:.02} items/s)", counter, ips));
  88. } else {
  89. let ht = HumanTime::from(curr_stamp.signed_duration_since(start_stamp));
  90. info!("Items queued! Inserted {} items so far in {}. Inserted {} items this round in {:.03} seconds. ({:.03} items/s)",
  91. counter,
  92. ht.to_text_en(Accuracy::Rough, Tense::Present),
  93. pipeline_counter,
  94. elapsed_secs,
  95. ips
  96. );
  97. }
  98. }
  99. }
  100. //We're done, close off the progress bar.
  101. pbar::finish_progress_bar();
  102. //Print some last information.
  103. {
  104. let end_stamp: DateTime<Utc> = Utc::now();
  105. let ht = HumanTime::from(end_stamp.signed_duration_since(start_stamp));
  106. info!(
  107. "Finished queueing in {}.",
  108. ht.to_text_en(Accuracy::Precise, Tense::Present)
  109. );
  110. }
  111. Ok(())
  112. }