some more refactoring work

This commit is contained in:
Blue 2023-09-25 17:15:24 -03:00
parent 7b9112c0dd
commit cdaf94e3cd
Signed by: blue
GPG Key ID: 9B203B252A63EE38
7 changed files with 324 additions and 38 deletions

View File

@ -1 +1,104 @@
#include "collection.h"
constexpr std::string_view downloads("downloads");
Collection::Collection(
const std::filesystem::path& destination,
const std::string& target,
const std::shared_ptr<Logger>& logger,
const std::shared_ptr<TaskManager>& taskManager
):
Loggable(logger),
destination(destination),
target(target),
taskManager(taskManager),
knownSources(),
successSources(),
downloadingSources(),
readingSources(),
buildingSources()
{}
void Collection::addSource(const std::string& source) {
if (hasSource(source)) {
major("Source " + source + " is already present, skipping as duplicate");
return;
}
if (isRemote(source))
queueDownload(source);
else
queueRead(source, source);
}
void Collection::sourceDownaloded(const std::string& source, TaskManager::Error err) {
Downloads::const_iterator itr = downloadingSources.find(source);
std::optional<std::filesystem::path> location = itr->second->getLocation();
downloadingSources.erase(itr);
if (err.has_value() || !location.has_value()) {
error("Coundn't download " + source + ": " + err.value()->what());
return;
}
queueRead(source, location.value());
}
void Collection::sourceRead(const std::string& source, TaskManager::Error err) {
Components::iterator itr = readingSources.find(source);
if (err.has_value()) {
error("Coundn't read " + source + ": " + err.value()->what());
readingSources.erase(itr);
return;
}
std::pair<Components::iterator, bool> res = buildingSources.emplace(source, std::move(itr->second));
readingSources.erase(itr);
taskManager->queue(
std::bind(&Component::build, res.first->second.get(), destination, target),
std::bind(&Collection::sourceBuilt, this, source, std::placeholders::_1)
);
}
void Collection::sourceBuilt(const std::string& source, TaskManager::Error err) {
Components::iterator itr = buildingSources.find(source);
buildingSources.erase(itr);
if (err.has_value()) {
error("Coundn't read " + source + ": " + err.value()->what());
return;
}
successSources.emplace(source);
}
void Collection::queueDownload(const std::string& source) {
std::pair<Downloads::iterator, bool> res = downloadingSources.emplace(std::piecewise_construct,
std::forward_as_tuple(source),
std::forward_as_tuple(
std::make_unique<Download>(source, destination/downloads, logger)
)
);
taskManager->queue(
std::bind(&Download::proceed, res.first->second.get()),
std::bind(&Collection::sourceDownaloded, this, source, std::placeholders::_1)
);
}
void Collection::queueRead(const std::string& source, const std::filesystem::path& location) {
std::pair<Components::iterator, bool> res = readingSources.emplace(std::piecewise_construct,
std::forward_as_tuple(source),
std::forward_as_tuple(
std::make_unique<Component>(location, std::shared_ptr<Collection>(this), logger)
)
);
taskManager->queue(
std::bind(&Component::read, res.first->second.get()),
std::bind(&Collection::sourceRead, this, source, std::placeholders::_1)
);
}
bool Collection::hasSource(const std::string& source) const {
return knownSources.count(source) > 0;
}

View File

@ -5,18 +5,25 @@
#include <map>
#include <set>
#include <exception>
#include <string_view>
#include "loggable.h"
#include "component.h"
#include "download.h"
#include "taskmanager.h"
class Collection : protected Loggable {
using Sources = std::set<std::string>;
using Downloads = std::map<std::string, std::unique_ptr<Download>>;
using Components = std::map<std::string, std::unique_ptr<Component>>;
public:
class UnknownSource;
class DuplicateSource;
class DuplicatePath;
Collection(
const std::filesystem::path& destination,
const std::string& target,
const std::shared_ptr<Logger>& logger,
const std::shared_ptr<TaskManager>& taskManager
);
@ -30,19 +37,24 @@ public:
void addSource(const std::string& source);
private:
void sourceDownloadError(const std::string& source);
void sourceBuildError(const std::string& source);
void sourceDownaloded(const std::string& source);
void sourceBuilt(const std::string& source);
void sourceDownaloded(const std::string& source, TaskManager::Error err);
void sourceRead(const std::string& source, TaskManager::Error err);
void sourceBuilt(const std::string& source, TaskManager::Error err);
bool isRemote(const std::string& source);
void queueDownload(const std::string& source);
void queueRead(const std::string& source, const std::filesystem::path& location);
private:
std::filesystem::path destination;
std::string target;
std::shared_ptr<TaskManager> taskManager;
std::set<std::string> errorSources;
std::set<std::string> pendingSources;
std::map<std::string, std::unique_ptr<Component>> failedSources;
std::map<std::string, std::unique_ptr<Component>> downloadedSources;
std::map<std::string, std::unique_ptr<Component>> buildingSources;
std::map<std::string, std::unique_ptr<Component>> successSources;
Sources knownSources;
Sources successSources;
Downloads downloadingSources;
Components readingSources;
Components buildingSources;
};
class Collection::UnknownSource : public std::runtime_error {

View File

@ -2,9 +2,7 @@
#include <regex>
#include "collection.h"
constexpr std::string_view downloads("downloads");
constexpr std::string_view archived("archived");
constexpr std::string_view headerAcceptJson("accept: application/json");
static const std::regex repo("(^https?):\\/\\/([\\w\\d\\.\\-\\_]+)\\/([\\w\\d\\-\\_]+)\\/([\\w\\d\\-\\_]+)");
@ -20,18 +18,16 @@ void Download::FileDeleter::operator()(FILE* file) const {
fclose(file);
}
Download::Download(
const std::string& url,
const std::filesystem::path& destination,
const std::shared_ptr<Collection>& collection,
const std::shared_ptr<Logger>& logger
) :
Loggable(logger),
curl(),
collection(collection),
url(url),
destination(destination)
destination(destination),
location(std::nullopt)
{
std::lock_guard lock(amx);
if (instances == 0) {
@ -39,7 +35,6 @@ Download::Download(
if (res != CURLE_OK)
throw CurlError(std::string("Error initializing curl global ") + curl_easy_strerror(res));
}
++instances;
createCurl();
@ -52,13 +47,26 @@ Download::~Download() {
curl_global_cleanup();
}
std::optional<std::filesystem::path> Download::getLocation() const {
return location;
}
void Download::proceed() {
bool success = false;
std::optional<RepoInfo> repo = testRepo();
if (repo.has_value()) {
std::optional<std::filesystem::path> path = downloadAsRepo(repo.value());
std::optional<std::filesystem::path> path;
if (repo.has_value())
path = downloadAsRepo(repo.value());
if (path.has_value()) {
info(url + " has been successfully donwloaded as a repository, extracting");
success = extract(path.value(), destination/repo.value().name);
if (success)
location = destination/repo.value().name;
}
if (success)
info("Successfully extracted " + url);
}
std::optional<nlohmann::json> Download::repoInfoGiteaApi1(const RepoInfo& repo) {
@ -86,7 +94,7 @@ std::optional<std::filesystem::path> Download::downloadAsRepo(const RepoInfo& re
info("Gitea v1 API seem to have worked");
info("Default branch is " + branchName);
std::optional<std::filesystem::path> path = downloadRepoGiteaApi1(repo, branchName);
return downloadRepoGiteaApi1(repo, branchName);
}
}
@ -94,15 +102,13 @@ std::optional<std::filesystem::path> Download::downloadAsRepo(const RepoInfo& re
}
std::optional<std::filesystem::path> Download::downloadRepoGiteaApi1(const RepoInfo& repo, const std::string& branch) {
std::string fileName = branch + ".tar.gz";
std::string url = repo.origin() + "/api/v1/repos/" + repo.project() + "/archive/" + fileName;
std::filesystem::path path = destination/downloads/fileName;
std::filesystem::path path = destination/archived/fileName;
CURLcode code = httpDownload(url, path);
if (code == CURLE_OK)
return path;
warn("Couldn't download file " + url + ": " + curl_easy_strerror(code));
minor("Removing " + path.string());
if (std::filesystem::exists(path))
std::filesystem::remove_all(path);
@ -126,6 +132,7 @@ CURLcode Download::httpGet(
}
CURLcode Download::httpDownload(const std::string& url, const std::filesystem::path& path, const std::vector<std::string_view>& headers) {
info("Starting to download " + url);
std::filesystem::create_directories(path.parent_path());
if (std::filesystem::exists(path)) {
minor("File " + path.string() + " already exists, will be overwritten");
@ -140,10 +147,16 @@ CURLcode Download::httpDownload(const std::string& url, const std::filesystem::p
setHeaders(headers);
curl_easy_setopt(curl.get(), CURLOPT_URL, url.c_str());
curl_easy_setopt(curl.get(), CURLOPT_WRITEFUNCTION, writeFile);
curl_easy_setopt(curl.get(), CURLOPT_WRITEFUNCTION, fwrite);
curl_easy_setopt(curl.get(), CURLOPT_WRITEDATA, filePtr.get());
return curl_easy_perform(curl.get());
CURLcode code = curl_easy_perform(curl.get());
if (code == CURLE_OK)
info("Successfully downloaded " + url);
else
warn("Couldn't download file " + url + ": " + curl_easy_strerror(code));
return code;
// return res;
// if (res != CURLE_OK) {
@ -199,6 +212,129 @@ std::optional<Download::RepoInfo> Download::testRepo() const {
return std::nullopt;
}
bool Download::extract(const std::filesystem::path& source, const std::filesystem::path& destination) const {
int flags = ARCHIVE_EXTRACT_TIME;
flags |= ARCHIVE_EXTRACT_PERM;
flags |= ARCHIVE_EXTRACT_ACL;
flags |= ARCHIVE_EXTRACT_FFLAGS;
struct archive* a = archive_read_new();
struct archive* ext = archive_write_disk_new();
struct archive_entry *entry;
archive_read_support_format_all(a);
archive_read_support_filter_all(a);
archive_write_disk_set_options(ext, flags);
archive_write_disk_set_standard_lookup(ext);
bool result = true;
bool readOpen = false;
bool writeOpen = false;
int r = archive_read_open_filename(a, source.c_str(), 10240);
if (r) {
major("Couldn't open file " + source.string());
result = false;
} else {
readOpen = true;
}
while (result) {
r = archive_read_next_header(a, &entry);
if (r == ARCHIVE_EOF)
break;
if (r < ARCHIVE_OK)
major(archive_error_string(a));
if (r < ARCHIVE_WARN)
break;
std::string fileName(archive_entry_pathname(entry));
std::filesystem::path filePath = destination/fileName;
debug("Extracting " + filePath.string());
if (std::filesystem::exists(filePath))
minor(filePath.string() + " exists, overwriting");
archive_entry_set_pathname_utf8(entry, filePath.c_str());
r = archive_write_header(ext, entry);
if (r < ARCHIVE_OK) {
major(archive_error_string(ext));
} else if (archive_entry_size(entry) > 0) {
writeOpen = true;
r = copy(a, ext);
if (r < ARCHIVE_OK)
major(archive_error_string(ext));
if (r < ARCHIVE_WARN)
break;
}
r = archive_write_finish_entry(ext);
if (r < ARCHIVE_OK)
major(archive_error_string(ext));
if (r < ARCHIVE_WARN)
break;
}
if (readOpen)
archive_read_close(a);
if (writeOpen)
archive_write_close(ext);
archive_read_free(a);
archive_write_free(ext);
return result;
}
int Download::copy(struct archive* ar, struct archive* aw) const {
int r;
const void *buff;
size_t size;
la_int64_t offset;
while (true) {
r = archive_read_data_block(ar, &buff, &size, &offset);
if (r == ARCHIVE_EOF)
return ARCHIVE_OK;
if (r < ARCHIVE_OK)
return r;
r = archive_write_data_block(aw, buff, size, offset);
if (r < ARCHIVE_OK) {
major(archive_error_string(aw));
return r;
}
}
}
int Download::trace(CURL* handle, curl_infotype type, char* data, size_t size, void* clientp) {
(void)(handle);
switch (type) {
case CURLINFO_TEXT: {
std::string message(data, size);
if (message[size - 1] == '\n')
message = message.substr(0, size - 1);
static_cast<Download*>(clientp)->debug(message);
} break;
default:
break;
}
return 0;
}
size_t Download::writeString(void* data, size_t size, size_t nmemb, void* mem) {
size_t finalSize = size * nmemb;
std::string* string = static_cast<std::string*>(mem);
string->append(static_cast<char*>(data), finalSize);
return finalSize;
}
Download::CurlError::CurlError(const std::string& message) :
std::runtime_error(message) {}

View File

@ -11,6 +11,8 @@
#include <curl/curl.h>
#include <nlohmann/json.hpp>
#include <archive.h>
#include <archive_entry.h>
#include "atomicmutex.h"
#include "loggable.h"
@ -32,12 +34,12 @@ public:
explicit Download(
const std::string& url,
const std::filesystem::path& destination,
const std::shared_ptr<Collection>& collection,
const std::shared_ptr<Logger>& logger
);
~Download();
void proceed();
std::optional<std::filesystem::path> getLocation() const;
private:
std::optional<RepoInfo> testRepo() const;
@ -47,12 +49,14 @@ private:
std::optional<nlohmann::json> repoInfoGiteaApi1(const RepoInfo& repo);
std::optional<std::filesystem::path> downloadRepoGiteaApi1(const RepoInfo& repo, const std::string& branch);
bool extract(const std::filesystem::path& source, const std::filesystem::path& destination) const;
int copy(struct archive *ar, struct archive *aw) const;
CURLcode httpGet(const std::string& url, std::string& result, const std::vector<std::string_view>& headers = {});
CURLcode httpDownload(const std::string& url, const std::filesystem::path& path, const std::vector<std::string_view>& headers = {});
void setHeaders (const std::vector<std::string_view>& headers);
static size_t writeString(void* data, size_t size, size_t nmemb, void* mem);
static size_t writeFile(void* data, size_t size, size_t nmemb, void* file);
static int trace(CURL *handle, curl_infotype type, char *data, size_t size, void *clientp);
void createCurl();
@ -60,9 +64,9 @@ private:
static AtomicMutex amx;
static unsigned int instances;
std::unique_ptr<CURL, CurlDeleter> curl;
std::shared_ptr<Collection> collection;
std::string url;
std::filesystem::path destination;
std::optional<std::filesystem::path> location;
};
struct Download::RepoInfo {

View File

@ -20,6 +20,6 @@ public:
private:
void log(Logger::Severity severity, const std::string& message) const;
private:
protected:
std::shared_ptr<Logger> logger;
};

View File

@ -5,7 +5,7 @@ TaskManager::TaskManager() :
stopping(false),
maxThreads(std::thread::hardware_concurrency()),
activeThreads(0),
jobs(),
tasks(),
mutex(),
loopConditional(),
waitConditional(),
@ -20,11 +20,19 @@ TaskManager::~TaskManager() {
void TaskManager::queue(const Job& job) {
std::unique_lock lock(mutex);
jobs.emplace(job);
tasks.emplace(job, std::nullopt);
lock.unlock();
loopConditional.notify_one();
}
void TaskManager::queue(const Job& job, const Result& result) {
std::unique_lock lock(mutex);
tasks.emplace(job, result);
lock.unlock();
loopConditional.notify_one();
}
void TaskManager::start() {
std::lock_guard lock(mutex);
if (running)
@ -53,20 +61,21 @@ void TaskManager::stop() {
void TaskManager::loop() {
while (true) {
Job job;
Task task;
std::unique_lock lock(mutex);
while (!stopping && jobs.empty())
while (!stopping && tasks.empty())
loopConditional.wait(lock);
if (stopping)
return;
++activeThreads;
job = jobs.front();
jobs.pop();
task = tasks.front();
tasks.pop();
lock.unlock();
job();
executeTask(task);
lock.lock();
--activeThreads;
lock.unlock();
@ -75,6 +84,21 @@ void TaskManager::loop() {
}
void TaskManager::executeTask(const Task& task) const {
try {
task.first();
} catch (const std::exception& e) {
if (task.second.has_value())
task.second.value()(&e);
}
try {
if (task.second.has_value())
task.second.value()(std::nullopt);
} catch (...) {}
}
bool TaskManager::busy() const {
std::lock_guard lock(mutex);
return activeThreads == 0;

View File

@ -7,10 +7,15 @@
#include <mutex>
#include <condition_variable>
#include <functional>
#include <optional>
#include <exception>
class TaskManager {
public:
using Error = std::optional<const std::exception*>;
using Job = std::function<void ()>;
using Result = std::function<void (Error)>;
using Task = std::pair<Job, std::optional<Result>>;
TaskManager();
~TaskManager();
@ -18,17 +23,19 @@ public:
void stop();
void wait() const;
void queue(const Job& job);
void queue(const Job& job, const Result& result);
bool busy() const;
private:
void loop();
void executeTask(const Task& task) const;
private:
bool running;
bool stopping;
uint32_t maxThreads;
uint32_t activeThreads;
std::queue<Job> jobs;
std::queue<Task> tasks;
mutable std::mutex mutex;
mutable std::condition_variable loopConditional;
mutable std::condition_variable waitConditional;