diff --git a/Makefile b/Makefile index 6422343..c1a5870 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ LOGO = $(LOGF)Log.o export LOG_USEMUTEX = 0 INCLUDES = -I$(LOGF) $(addprefix -I, $(INCFS)) -LDFLAGS = -lssl -lcrypto +LDFLAGS = -lssl -lcrypto -pthread SRCFILES = $(shell find $(SRCF) -name "*.cpp") OBJFILES = $(patsubst $(SRCF)%, $(BUILDDIR)%, $(patsubst %.cpp, %.o, $(SRCFILES))) $(LOGO) diff --git a/inc/dedup.h b/inc/dedup.h index 01c0b6b..184a831 100644 --- a/inc/dedup.h +++ b/inc/dedup.h @@ -27,6 +27,8 @@ private: void resolveDuplicates(std::map& hashed); bool relinkFile(const std::string& linkbase, const std::string& replacedfile); + static const uint_fast8_t HASHTHREADCOUNT; + std::vector folders; FileIndexer indexer; diff --git a/inc/hasherthread.h b/inc/hasherthread.h new file mode 100644 index 0000000..5aaf137 --- /dev/null +++ b/inc/hasherthread.h @@ -0,0 +1,34 @@ +#pragma once + +#include +#include +#include +#include + +#include "file.h" +#include "hash.h" + +class HasherThread { +public: + using getNext_f = std::function()>; + using doneCallback_f = std::function)>; + + HasherThread(std::mutex& mutex, getNext_f, doneCallback_f); + ~HasherThread(); + + void start(); + void stop(); + + void waitfor(); + +private: + void threadLoop(); + + std::thread thread; + std::mutex& mutex; + + getNext_f getNext; + doneCallback_f doneCallback; + + bool shouldrun = false; +}; diff --git a/src/dedup.cpp b/src/dedup.cpp index 7c429d1..1abad9c 100644 --- a/src/dedup.cpp +++ b/src/dedup.cpp @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include #include #include @@ -11,6 +13,9 @@ #include #include "progressbar.h" +#include "hasherthread.h" + +const uint_fast8_t Dedup::HASHTHREADCOUNT = 4; Dedup::Dedup() { } @@ -78,20 +83,70 @@ void Dedup::removeUninterestingFiles(std::multimap Dedup::hash(std::multimap>& foundfiles, uint64_t bytesToHash) { std::map filesizes; - ProgressBar progressBar(foundfiles.size(), bytesToHash); - for(auto it = foundfiles.begin(); it != foundfiles.end(); ++it) { - Hash hash = it->second->createHash(); - progressBar.update(1, it->second->filesize); + uint64_t stepsize = foundfiles.size() / HASHTHREADCOUNT; - Files& files = getFiles(it->first, filesizes); - files.addNewFile(it->second, std::move(hash)); + ProgressBar* bpptr = &progressBar; + std::map* filesizesptr = &filesizes; - std::cout << progressBar; + std::mutex globalData; + + std::vector threads; + std::vector>::iterator> threadit; + threads.reserve(HASHTHREADCOUNT); + threadit.reserve(HASHTHREADCOUNT + 1); + + // create threads + Log::info << "spawning " << (int) HASHTHREADCOUNT << " hashing threads"; + + threadit.push_back(foundfiles.begin()); + for(uint_fast8_t i = 0; i < HASHTHREADCOUNT; ++i) { + // make a copy! + std::multimap>::iterator it = threadit.at(i); + std::advance(it, stepsize); + threadit.push_back(it); } + // make sure the last one is realy the last + threadit.at(HASHTHREADCOUNT) = foundfiles.end(); + + for(uint_fast8_t i = 0; i < HASHTHREADCOUNT; ++i) { + std::multimap>::iterator* itptr = &(threadit.at(i)); + std::multimap>::iterator* endptr = &(threadit.at(i+1)); + + threads.push_back(new HasherThread( + globalData, + [itptr, endptr]() -> std::shared_ptr { + if(*itptr != *endptr) { + auto copy = *itptr; + ++(*itptr); + return copy->second; + } + return nullptr; + }, + [this, bpptr, filesizesptr](Hash&& h, std::shared_ptr f) -> void { + bpptr->update(1, f->filesize); + Files& files = getFiles(f->filesize, *filesizesptr); + files.addNewFile(f, std::move(h)); + + std::cout << *bpptr; + } + )); + } + + // start all threads + std::for_each(threads.begin(), threads.end(), [](HasherThread* ht){ + ht->start(); + }); + + // wait for all threads + std::for_each(threads.begin(), threads.end(), [](HasherThread* ht){ + ht->waitfor(); + }); + std::cout << "\n"; Log::info << "Hashing done after: " << (int) (progressBar.getDuration().count() / 1000) << "s "; + Log::info << filesizes.size() << " Hashed files"; return filesizes; } diff --git a/src/file.cpp b/src/file.cpp index 3f71512..ecfd4d2 100644 --- a/src/file.cpp +++ b/src/file.cpp @@ -1,5 +1,7 @@ #include "file.h" +#include + FileSize::FileSize(uint64_t fs) : fs(fs) {} File::File(uint64_t fs, uint64_t inode, uint64_t linkcount, const std::string& path) : @@ -23,7 +25,7 @@ std::ostream& operator<<(std::ostream& str, const FileSize& fs) { ++prefix; } - str << (cpy / (float) COMMA); + str << std::setprecision(2) << std::fixed << (cpy / (float) COMMA); if(prefix > 0) str << PREFIX[prefix]; return str << "B"; diff --git a/src/hasherthread.cpp b/src/hasherthread.cpp new file mode 100644 index 0000000..572754e --- /dev/null +++ b/src/hasherthread.cpp @@ -0,0 +1,56 @@ +#include "hasherthread.h" + +#include +#include + +HasherThread::HasherThread(std::mutex& mutex, getNext_f next, doneCallback_f done) : mutex(mutex), getNext(next), doneCallback(done) { + assert(getNext); + assert(doneCallback); +} + +HasherThread::~HasherThread() { + stop(); + waitfor(); +} + +void HasherThread::start() { + shouldrun = true; + thread = std::thread(&HasherThread::threadLoop, this); +} + +void HasherThread::stop() { + shouldrun = false; +} + +void HasherThread::waitfor() { + if(thread.joinable()) { + thread.join(); + } +} + +void HasherThread::threadLoop() { + while(shouldrun) { + // get task + std::shared_ptr task; + { + std::lock_guard lock(mutex); + task = getNext(); + } + + // no more tasks + if(!task) { + break; + } + + // do the task + Hash hash = task->createHash(); + + // submit + { + std::lock_guard lock(mutex); + doneCallback(std::move(hash), task); + } + } + + Log::info << "no more tasks, terminateing Hasherthread"; +} diff --git a/src/progressbar.cpp b/src/progressbar.cpp index 85a7df9..598f471 100644 --- a/src/progressbar.cpp +++ b/src/progressbar.cpp @@ -21,6 +21,10 @@ std::chrono::duration ProgressBar::getDuration() const { std::ostream& operator<<(std::ostream& str, const ProgressBar& pb) { double progress = (pb.currentBytes / (double) pb.maxBytes); + // not the optimal way, but the easyiest + static double lastProgress = 0; + if(progress - lastProgress < 0.0001) return str; + lastProgress = progress; auto usedtime = pb.getDuration(); std::chrono::duration> eta = std::chrono::duration_cast((usedtime / (double) pb.currentBytes) * (pb.maxBytes - pb.currentBytes) );