add hashing threads

This commit is contained in:
mrbesen 2022-10-23 15:43:26 +02:00
parent f9df723386
commit 73bba13272
Signed by: MrBesen
GPG Key ID: 596B2350DCD67504
7 changed files with 162 additions and 9 deletions

View File

@ -21,7 +21,7 @@ LOGO = $(LOGF)Log.o
export LOG_USEMUTEX = 0
INCLUDES = -I$(LOGF) $(addprefix -I, $(INCFS))
LDFLAGS = -lssl -lcrypto
LDFLAGS = -lssl -lcrypto -pthread
SRCFILES = $(shell find $(SRCF) -name "*.cpp")
OBJFILES = $(patsubst $(SRCF)%, $(BUILDDIR)%, $(patsubst %.cpp, %.o, $(SRCFILES))) $(LOGO)

View File

@ -27,6 +27,8 @@ private:
void resolveDuplicates(std::map<uint64_t, Files>& hashed);
bool relinkFile(const std::string& linkbase, const std::string& replacedfile);
static const uint_fast8_t HASHTHREADCOUNT;
std::vector<SearchFolder> folders;
FileIndexer indexer;

34
inc/hasherthread.h Normal file
View File

@ -0,0 +1,34 @@
#pragma once
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include "file.h"
#include "hash.h"
class HasherThread {
public:
using getNext_f = std::function<std::shared_ptr<File>()>;
using doneCallback_f = std::function<void(Hash&& h, std::shared_ptr<File>)>;
HasherThread(std::mutex& mutex, getNext_f, doneCallback_f);
~HasherThread();
void start();
void stop();
void waitfor();
private:
void threadLoop();
std::thread thread;
std::mutex& mutex;
getNext_f getNext;
doneCallback_f doneCallback;
bool shouldrun = false;
};

View File

@ -3,6 +3,8 @@
#include <chrono>
#include <cstring>
#include <iostream>
#include <iterator>
#include <mutex>
#include <numeric>
#include <sys/stat.h>
#include <thread>
@ -11,6 +13,9 @@
#include <Log.h>
#include "progressbar.h"
#include "hasherthread.h"
const uint_fast8_t Dedup::HASHTHREADCOUNT = 4;
Dedup::Dedup() {
}
@ -78,20 +83,70 @@ void Dedup::removeUninterestingFiles(std::multimap<uint64_t, std::shared_ptr<Fil
std::map<uint64_t, Files> Dedup::hash(std::multimap<uint64_t, std::shared_ptr<File>>& foundfiles, uint64_t bytesToHash) {
std::map<uint64_t, Files> filesizes;
ProgressBar progressBar(foundfiles.size(), bytesToHash);
for(auto it = foundfiles.begin(); it != foundfiles.end(); ++it) {
Hash hash = it->second->createHash();
progressBar.update(1, it->second->filesize);
uint64_t stepsize = foundfiles.size() / HASHTHREADCOUNT;
Files& files = getFiles(it->first, filesizes);
files.addNewFile(it->second, std::move(hash));
ProgressBar* bpptr = &progressBar;
std::map<uint64_t, Files>* filesizesptr = &filesizes;
std::cout << progressBar;
std::mutex globalData;
std::vector<HasherThread*> threads;
std::vector<std::multimap<uint64_t, std::shared_ptr<File>>::iterator> threadit;
threads.reserve(HASHTHREADCOUNT);
threadit.reserve(HASHTHREADCOUNT + 1);
// create threads
Log::info << "spawning " << (int) HASHTHREADCOUNT << " hashing threads";
threadit.push_back(foundfiles.begin());
for(uint_fast8_t i = 0; i < HASHTHREADCOUNT; ++i) {
// make a copy!
std::multimap<uint64_t, std::shared_ptr<File>>::iterator it = threadit.at(i);
std::advance(it, stepsize);
threadit.push_back(it);
}
// make sure the last one is realy the last
threadit.at(HASHTHREADCOUNT) = foundfiles.end();
for(uint_fast8_t i = 0; i < HASHTHREADCOUNT; ++i) {
std::multimap<uint64_t, std::shared_ptr<File>>::iterator* itptr = &(threadit.at(i));
std::multimap<uint64_t, std::shared_ptr<File>>::iterator* endptr = &(threadit.at(i+1));
threads.push_back(new HasherThread(
globalData,
[itptr, endptr]() -> std::shared_ptr<File> {
if(*itptr != *endptr) {
auto copy = *itptr;
++(*itptr);
return copy->second;
}
return nullptr;
},
[this, bpptr, filesizesptr](Hash&& h, std::shared_ptr<File> f) -> void {
bpptr->update(1, f->filesize);
Files& files = getFiles(f->filesize, *filesizesptr);
files.addNewFile(f, std::move(h));
std::cout << *bpptr;
}
));
}
// start all threads
std::for_each(threads.begin(), threads.end(), [](HasherThread* ht){
ht->start();
});
// wait for all threads
std::for_each(threads.begin(), threads.end(), [](HasherThread* ht){
ht->waitfor();
});
std::cout << "\n";
Log::info << "Hashing done after: " << (int) (progressBar.getDuration().count() / 1000) << "s ";
Log::info << filesizes.size() << " Hashed files";
return filesizes;
}

View File

@ -1,5 +1,7 @@
#include "file.h"
#include <iomanip>
FileSize::FileSize(uint64_t fs) : fs(fs) {}
File::File(uint64_t fs, uint64_t inode, uint64_t linkcount, const std::string& path) :
@ -23,7 +25,7 @@ std::ostream& operator<<(std::ostream& str, const FileSize& fs) {
++prefix;
}
str << (cpy / (float) COMMA);
str << std::setprecision(2) << std::fixed << (cpy / (float) COMMA);
if(prefix > 0)
str << PREFIX[prefix];
return str << "B";

56
src/hasherthread.cpp Normal file
View File

@ -0,0 +1,56 @@
#include "hasherthread.h"
#include <cassert>
#include <Log.h>
HasherThread::HasherThread(std::mutex& mutex, getNext_f next, doneCallback_f done) : mutex(mutex), getNext(next), doneCallback(done) {
assert(getNext);
assert(doneCallback);
}
HasherThread::~HasherThread() {
stop();
waitfor();
}
void HasherThread::start() {
shouldrun = true;
thread = std::thread(&HasherThread::threadLoop, this);
}
void HasherThread::stop() {
shouldrun = false;
}
void HasherThread::waitfor() {
if(thread.joinable()) {
thread.join();
}
}
void HasherThread::threadLoop() {
while(shouldrun) {
// get task
std::shared_ptr<File> task;
{
std::lock_guard lock(mutex);
task = getNext();
}
// no more tasks
if(!task) {
break;
}
// do the task
Hash hash = task->createHash();
// submit
{
std::lock_guard lock(mutex);
doneCallback(std::move(hash), task);
}
}
Log::info << "no more tasks, terminateing Hasherthread";
}

View File

@ -21,6 +21,10 @@ std::chrono::duration<uint64_t, std::milli> ProgressBar::getDuration() const {
std::ostream& operator<<(std::ostream& str, const ProgressBar& pb) {
double progress = (pb.currentBytes / (double) pb.maxBytes);
// not the optimal way, but the easyiest
static double lastProgress = 0;
if(progress - lastProgress < 0.0001) return str;
lastProgress = progress;
auto usedtime = pb.getDuration();
std::chrono::duration<uint64_t, std::ratio<60>> eta = std::chrono::duration_cast<std::chrono::minutes>((usedtime / (double) pb.currentBytes) * (pb.maxBytes - pb.currentBytes) );