Allow running specific amount of tasks at once

This commit is contained in:
HoLLy 2022-09-20 20:31:26 +02:00
Родитель 66214cf372
Коммит 1487e687ec
2 изменённых файлов: 46 добавлений и 18 удалений

Просмотреть файл

@ -1,13 +1,17 @@
use std::{
collections::HashSet,
path::{Path, PathBuf},
time::Duration,
};
use anyhow::{bail, Context, Result};
use hyper::{Client, StatusCode};
use hyper_tls::HttpsConnector;
use regex::Regex;
use tokio::sync::mpsc::{self, UnboundedSender};
use tokio::{
sync::mpsc::{self, UnboundedSender},
time::sleep,
};
use crate::git_parsing::{parse_hash, parse_head, parse_log, parse_object, GitObject};
@ -38,7 +42,7 @@ struct DownloadedFile {
pub tx: UnboundedSender<DownloadedFile>,
}
pub async fn download_all(base_url: String, base_path: PathBuf) {
pub async fn download_all(base_url: String, base_path: PathBuf, max_task_count: u16) {
let mut cache = HashSet::<String>::new();
// TODO: try out unbounded channel too
@ -59,6 +63,7 @@ pub async fn download_all(base_url: String, base_path: PathBuf) {
drop(tx);
// every time we downloaded a new file, see what other files we can derive from it
let mut threads = vec![];
while let Some(message) = rx.recv().await {
// TODO: if this file is already downloaded, continue
if cache.contains(&message.path) {
@ -69,24 +74,37 @@ pub async fn download_all(base_url: String, base_path: PathBuf) {
cache.insert(message.path.clone());
let url = format!("{}{}", &base_url, &message.path);
let file_bytes = match download(&url).await {
Ok(content) => content,
Err(e) => {
println!("Error while downloading file {url}: {}", e);
continue;
let base_path = base_path.clone();
let handle = tokio::spawn(async move {
let file_bytes = match download(&url).await {
Ok(content) => content,
Err(e) => {
println!("Error while downloading file {url}: {}", e);
return;
}
};
println!("Downloaded '{}' ({} bytes)", message.path, file_bytes.len());
// write this file to disk
if let Err(e) = write_file(&base_path, &message.path, &file_bytes) {
println!("Failed to write file {} to disk: {}", &message.path, e)
}
};
println!("Downloaded '{}' ({} bytes)", message.path, file_bytes.len());
// match on the file name and queue new messages
if let Err(e) = queue_new_references(message.path.as_str(), &file_bytes, message.tx) {
println!("Error while trying to find new references: {e}");
}
});
// write this file to disk
if let Err(e) = write_file(&base_path, &message.path, &file_bytes) {
println!("Failed to write file {} to disk: {}", &message.path, e)
}
threads.push(handle);
// match on the file name and queue new messages
if let Err(e) = queue_new_references(message.path.as_str(), &file_bytes, message.tx) {
println!("Error while trying to find new references: {e}");
while threads.len() >= (max_task_count as usize) {
// sleep
sleep(Duration::from_millis(10)).await;
// remove dead threads
threads.retain(|h| !h.is_finished());
}
}
}

Просмотреть файл

@ -1,6 +1,6 @@
use std::path::PathBuf;
use clap::{command, Arg, Command};
use clap::{command, value_parser, Arg, Command};
mod dump_git;
mod git_parsing;
@ -18,6 +18,15 @@ fn cli() -> Command<'static> {
.help("The directory to download to")
.default_value("git-dumped"),
)
.arg(
Arg::new("tasks")
.required(false)
.short('t')
.long("tasks")
.help("Sets the maximum of concurrent download tasks that can be running")
.value_parser(value_parser!(u16))
.default_value("8"),
)
}
#[tokio::main]
@ -25,13 +34,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let matches = cli().get_matches();
let url = matches.get_one::<String>("URL").unwrap();
let path = matches.get_one::<String>("PATH").unwrap();
let tasks = *matches.get_one::<u16>("tasks").unwrap();
// println!("URL: {url}");
// println!("PATH: {path}");
std::fs::create_dir_all(format!("{path}/.git/"))?;
dump_git::download_all(url.clone(), PathBuf::from(path)).await;
dump_git::download_all(url.clone(), PathBuf::from(path), tasks).await;
Ok(())
}