Add base::ConcurrentTimer.

Write removes from cache database once an hour.
This commit is contained in:
John Preston 2018-08-09 23:39:22 +03:00
parent 64b8adb3d0
commit b9af3c7f34
14 changed files with 552 additions and 9 deletions

View File

@ -15,6 +15,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "core/crash_reports.h"
#include "messenger.h"
#include "base/timer.h"
#include "base/concurrent_timer.h"
#include "base/qthelp_url.h"
#include "base/qthelp_regex.h"
#include "core/update_checker.h"
@ -446,6 +447,7 @@ void adjustSingleTimers() {
a->adjustSingleTimers();
}
base::Timer::Adjust();
base::ConcurrentTimerEnvironment::Adjust();
}
void connect(const char *signal, QObject *object, const char *method) {

View File

@ -11,6 +11,8 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include <QtCore/QUrl>
#include <QtCore/QMutex>
#include <QtCore/QRegularExpression>
#include <QtCore/QThread>
#include <QtCore/QCoreApplication>
#include <crl/crl.h>
#include <rpl/rpl.h>

View File

@ -0,0 +1,367 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#include "base/concurrent_timer.h"
#include <QtCore/QThread>
#include <QtCore/QCoreApplication>
using namespace base::details;
namespace base {
namespace details {
namespace {
constexpr auto kCallDelayedEvent = QEvent::Type(QEvent::User + 1);
constexpr auto kCancelTimerEvent = QEvent::Type(QEvent::User + 2);
static_assert(kCancelTimerEvent < QEvent::MaxUser);
ConcurrentTimerEnvironment *Environment/* = nullptr*/;
QMutex EnvironmentMutex;
class CallDelayedEvent : public QEvent {
public:
CallDelayedEvent(
crl::time_type timeout,
Qt::TimerType type,
FnMut<void()> method);
crl::time_type timeout() const;
Qt::TimerType type() const;
FnMut<void()> takeMethod();
private:
crl::time_type _timeout = 0;
Qt::TimerType _type = Qt::PreciseTimer;
FnMut<void()> _method;
};
class CancelTimerEvent : public QEvent {
public:
CancelTimerEvent();
};
CallDelayedEvent::CallDelayedEvent(
crl::time_type timeout,
Qt::TimerType type,
FnMut<void()> method)
: QEvent(kCallDelayedEvent)
, _timeout(timeout)
, _type(type)
, _method(std::move(method)) {
Expects(_timeout >= 0 && _timeout < std::numeric_limits<int>::max());
}
crl::time_type CallDelayedEvent::timeout() const {
return _timeout;
}
Qt::TimerType CallDelayedEvent::type() const {
return _type;
}
FnMut<void()> CallDelayedEvent::takeMethod() {
return base::take(_method);
}
CancelTimerEvent::CancelTimerEvent() : QEvent(kCancelTimerEvent) {
}
} // namespace
class TimerObject : public QObject {
public:
TimerObject(
not_null<QThread*> thread,
not_null<QObject*> adjuster,
Fn<void()> adjust);
protected:
bool event(QEvent *e) override;
private:
void callDelayed(not_null<CallDelayedEvent*> e);
void callNow();
void cancel();
void adjust();
FnMut<void()> _next;
Fn<void()> _adjust;
int _timerId = 0;
};
TimerObject::TimerObject(
not_null<QThread*> thread,
not_null<QObject*> adjuster,
Fn<void()> adjust)
: _adjust(std::move(adjust)) {
moveToThread(thread);
connect(
adjuster,
&QObject::destroyed,
this,
&TimerObject::adjust,
Qt::DirectConnection);
}
bool TimerObject::event(QEvent *e) {
const auto type = e->type();
switch (type) {
case kCallDelayedEvent:
callDelayed(static_cast<CallDelayedEvent*>(e));
return true;
case kCancelTimerEvent:
cancel();
return true;
case QEvent::Timer:
callNow();
return true;
}
return QObject::event(e);
}
void TimerObject::callDelayed(not_null<CallDelayedEvent*> e) {
cancel();
const auto timeout = e->timeout();
const auto type = e->type();
_next = e->takeMethod();
if (timeout > 0) {
_timerId = startTimer(timeout, type);
} else {
base::take(_next)();
}
}
void TimerObject::cancel() {
if (const auto id = base::take(_timerId)) {
killTimer(id);
}
_next = nullptr;
}
void TimerObject::callNow() {
auto next = base::take(_next);
cancel();
next();
}
void TimerObject::adjust() {
if (_adjust) {
_adjust();
}
}
TimerObjectWrap::TimerObjectWrap(Fn<void()> adjust) {
QMutexLocker lock(&EnvironmentMutex);
if (Environment) {
_value = Environment->createTimer(std::move(adjust));
}
}
TimerObjectWrap::~TimerObjectWrap() {
if (_value) {
QMutexLocker lock(&EnvironmentMutex);
if (Environment) {
_value.release()->deleteLater();
}
}
}
void TimerObjectWrap::call(
crl::time_type timeout,
Qt::TimerType type,
FnMut<void()> method) {
sendEvent(std::make_unique<CallDelayedEvent>(
timeout,
type,
std::move(method)));
}
void TimerObjectWrap::cancel() {
sendEvent(std::make_unique<CancelTimerEvent>());
}
void TimerObjectWrap::sendEvent(std::unique_ptr<QEvent> event) {
if (!_value) {
return;
}
QCoreApplication::postEvent(
_value.get(),
event.release(),
Qt::HighEventPriority);
}
} // namespace details
ConcurrentTimerEnvironment::ConcurrentTimerEnvironment() {
_thread.start();
_adjuster.moveToThread(&_thread);
acquire();
}
ConcurrentTimerEnvironment::~ConcurrentTimerEnvironment() {
_thread.quit();
release();
_thread.wait();
QObject::disconnect(&_adjuster, &QObject::destroyed, nullptr, nullptr);
}
std::unique_ptr<TimerObject> ConcurrentTimerEnvironment::createTimer(
Fn<void()> adjust) {
return std::make_unique<TimerObject>(
&_thread,
&_adjuster,
std::move(adjust));
}
void ConcurrentTimerEnvironment::Adjust() {
QMutexLocker lock(&EnvironmentMutex);
if (Environment) {
Environment->adjustTimers();
}
}
void ConcurrentTimerEnvironment::adjustTimers() {
QObject emitter;
QObject::connect(
&emitter,
&QObject::destroyed,
&_adjuster,
&QObject::destroyed,
Qt::QueuedConnection);
}
void ConcurrentTimerEnvironment::acquire() {
Expects(Environment == nullptr);
QMutexLocker lock(&EnvironmentMutex);
Environment = this;
}
void ConcurrentTimerEnvironment::release() {
Expects(Environment == this);
QMutexLocker lock(&EnvironmentMutex);
Environment = nullptr;
}
ConcurrentTimer::ConcurrentTimer(
Fn<void(FnMut<void()>)> runner,
Fn<void()> callback)
: _runner(std::move(runner))
, _object(createAdjuster())
, _callback(std::move(callback))
, _type(Qt::PreciseTimer)
, _adjusted(false) {
setRepeat(Repeat::Interval);
}
Fn<void()> ConcurrentTimer::createAdjuster() {
auto guards = base::make_binary_guard();
_guard = std::make_shared<bool>(true);
return [=, runner = _runner, guard = std::weak_ptr<bool>(_guard)] {
runner([=] {
if (!guard.lock()) {
return;
}
adjust();
});
};
}
void ConcurrentTimer::start(
TimeMs timeout,
Qt::TimerType type,
Repeat repeat) {
_type = type;
setRepeat(repeat);
_adjusted = false;
setTimeout(timeout);
cancelAndSchedule(_timeout);
_next = crl::time() + _timeout;
}
void ConcurrentTimer::cancelAndSchedule(int timeout) {
auto guards = base::make_binary_guard();
_running = std::move(guards.first);
auto method = [
=,
runner = _runner,
guard = std::move(guards.second)
]() mutable {
if (!guard.alive()) {
return;
}
runner([=, guard = std::move(guard)] {
if (!guard.alive()) {
return;
}
timerEvent();
});
};
_object.call(timeout, _type, std::move(method));
}
void ConcurrentTimer::timerEvent() {
if (repeat() == Repeat::Interval) {
if (_adjusted) {
start(_timeout, _type, repeat());
} else {
_next = crl::time() + _timeout;
}
} else {
cancel();
}
if (_callback) {
_callback();
}
}
void ConcurrentTimer::cancel() {
_running = {};
if (isActive()) {
_running = base::binary_guard();
_object.cancel();
}
}
TimeMs ConcurrentTimer::remainingTime() const {
if (!isActive()) {
return -1;
}
const auto now = crl::time();
return (_next > now) ? (_next - now) : TimeMs(0);
}
void ConcurrentTimer::adjust() {
auto remaining = remainingTime();
if (remaining >= 0) {
cancelAndSchedule(remaining);
_adjusted = true;
}
}
void ConcurrentTimer::setTimeout(TimeMs timeout) {
Expects(timeout >= 0 && timeout <= std::numeric_limits<int>::max());
_timeout = static_cast<unsigned int>(timeout);
}
int ConcurrentTimer::timeout() const {
return _timeout;
}
} // namespace base

View File

@ -0,0 +1,147 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#pragma once
#include "base/binary_guard.h"
#include <crl/crl_time.h>
#include <crl/crl_object_on_queue.h>
#include <QtCore/QThread>
namespace base {
namespace details {
class TimerObject;
class TimerObjectWrap {
public:
explicit TimerObjectWrap(Fn<void()> adjust);
~TimerObjectWrap();
void call(
crl::time_type timeout,
Qt::TimerType type,
FnMut<void()> method);
void cancel();
private:
void sendEvent(std::unique_ptr<QEvent> event);
std::unique_ptr<TimerObject> _value;
};
} // namespace details
class ConcurrentTimerEnvironment {
public:
ConcurrentTimerEnvironment();
~ConcurrentTimerEnvironment();
std::unique_ptr<details::TimerObject> createTimer(Fn<void()> adjust);
static void Adjust();
private:
void acquire();
void release();
void adjustTimers();
QThread _thread;
QObject _adjuster;
};
class ConcurrentTimer {
public:
explicit ConcurrentTimer(
Fn<void(FnMut<void()>)> runner,
Fn<void()> callback = nullptr);
template <typename Object>
explicit ConcurrentTimer(
crl::weak_on_queue<Object> weak,
Fn<void()> callback = nullptr);
static Qt::TimerType DefaultType(TimeMs timeout) {
constexpr auto kThreshold = TimeMs(1000);
return (timeout > kThreshold) ? Qt::CoarseTimer : Qt::PreciseTimer;
}
void setCallback(Fn<void()> callback) {
_callback = std::move(callback);
}
void callOnce(TimeMs timeout) {
callOnce(timeout, DefaultType(timeout));
}
void callEach(TimeMs timeout) {
callEach(timeout, DefaultType(timeout));
}
void callOnce(TimeMs timeout, Qt::TimerType type) {
start(timeout, type, Repeat::SingleShot);
}
void callEach(TimeMs timeout, Qt::TimerType type) {
start(timeout, type, Repeat::Interval);
}
bool isActive() const {
return _running.alive();
}
void cancel();
TimeMs remainingTime() const;
private:
enum class Repeat : unsigned {
Interval = 0,
SingleShot = 1,
};
Fn<void()> createAdjuster();
void start(TimeMs timeout, Qt::TimerType type, Repeat repeat);
void adjust();
void cancelAndSchedule(int timeout);
void setTimeout(TimeMs timeout);
int timeout() const;
void timerEvent();
void setRepeat(Repeat repeat) {
_repeat = static_cast<unsigned>(repeat);
}
Repeat repeat() const {
return static_cast<Repeat>(_repeat);
}
Fn<void(FnMut<void()>)> _runner;
std::shared_ptr<bool> _guard; // Must be before _object.
details::TimerObjectWrap _object;
Fn<void()> _callback;
base::binary_guard _running;
TimeMs _next = 0;
int _timeout = 0;
int _timerId = 0;
Qt::TimerType _type : 2;
bool _adjusted : 1;
unsigned _repeat : 1;
};
template <typename Object>
ConcurrentTimer::ConcurrentTimer(
crl::weak_on_queue<Object> weak,
Fn<void()> callback)
: ConcurrentTimer(weak.runner(), std::move(callback)) {
}
} // namespace base

View File

@ -26,7 +26,6 @@ Timer::Timer(
moveToThread(thread);
}
Timer::Timer(Fn<void()> callback)
: QObject(nullptr)
, _callback(std::move(callback))

View File

@ -12,6 +12,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "core/crash_reports.h"
#include "core/main_queue_processor.h"
#include "core/update_checker.h"
#include "base/concurrent_timer.h"
#include "application.h"
namespace Core {
@ -242,6 +243,8 @@ void Launcher::processArguments() {
int Launcher::executeApplication() {
MainQueueProcessor processor;
base::ConcurrentTimerEnvironment environment;
Application app(this, _argc, _argv);
return app.exec();
}

View File

@ -12,7 +12,6 @@ namespace Core {
class MainQueueProcessor : public QObject {
public:
MainQueueProcessor();
~MainQueueProcessor();
protected:

View File

@ -12,6 +12,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "storage/storage_encrypted_file.h"
#include "base/flat_set.h"
#include "base/algorithm.h"
#include "base/concurrent_timer.h"
#include <crl/crl.h>
#include <xxhash.h>
#include <QtCore/QDir>
@ -39,6 +40,7 @@ constexpr auto kReadBlockSize = 8 * 1024 * 1024;
constexpr auto kRecordSizeUnknown = size_type(-1);
constexpr auto kRecordSizeInvalid = size_type(-2);
constexpr auto kMaxDataSize = 10 * 1024 * 1024;
constexpr auto kRemoveBundleDelay = 60 * 60 * crl::time_type(1000);
using RecordType = uint8;
using PlaceId = std::array<uint8, 7>;
@ -175,6 +177,8 @@ public:
void clear(FnMut<void(Error)> done);
~Database();
private:
struct Entry {
Entry() = default;
@ -232,6 +236,8 @@ private:
std::unordered_map<Key, Entry> _map;
std::set<Key> _removing;
base::ConcurrentTimer _writeRemoveTimer;
CleanerWrap _cleaner;
};
@ -253,7 +259,8 @@ Database::Database(
const Settings &settings)
: _weak(std::move(weak))
, _base(ComputeBasePath(path))
, _settings(settings) {
, _settings(settings)
, _writeRemoveTimer(_weak, [=] { writeMultiRemove(); }) {
}
template <typename Callback, typename ...Args>
@ -483,6 +490,9 @@ bool Database::readRecordMultiRemove(bytes::const_span data) {
}
void Database::close(FnMut<void()> done) {
if (_writeRemoveTimer.isActive()) {
writeMultiRemove();
}
_cleaner = CleanerWrap();
_binlog.close();
invokeCallback(done);
@ -588,11 +598,11 @@ void Database::remove(const Key &key, FnMut<void()> done) {
const auto i = _map.find(key);
if (i != _map.end()) {
_removing.emplace(key);
if (true || _removing.size() == kMaxBundledRecords) {
if (_removing.size() == kMaxBundledRecords) {
_writeRemoveTimer.cancel();
writeMultiRemove();
// cancel timeout?..
} else {
// timeout?..
} else if (!_writeRemoveTimer.isActive()) {
_writeRemoveTimer.callOnce(kRemoveBundleDelay);
}
const auto &entry = i->second;
@ -649,6 +659,10 @@ void Database::clear(FnMut<void(Error)> done) {
writeVersion(version) ? Error::NoError() : ioError(versionPath()));
}
Database::~Database() {
close(nullptr);
}
auto Database::findAvailableVersion() const -> Version {
const auto entries = QDir(_base).entryList(
QDir::Dirs | QDir::NoDotAndDotDot);

View File

@ -9,6 +9,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "storage/cache/storage_cache_database.h"
#include "storage/storage_encryption.h"
#include "base/concurrent_timer.h"
#include <crl/crl.h>
#include <thread>

View File

@ -9,6 +9,9 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "storage/storage_encrypted_file.h"
#include <QtCore/QThread>
#include <QtCore/QCoreApplication>
#ifdef Q_OS_WIN
#include "platform/win/windows_dlls.h"
#endif // Q_OS_WIN

@ -1 +1 @@
Subproject commit 2cab11076d84a9db7d86f165eb2cfb4c6ebcc8f4
Subproject commit 4291015efab76bda5886a56b5007f4531be17d46

View File

@ -51,6 +51,8 @@
'<(src_loc)/base/binary_guard.h',
'<(src_loc)/base/build_config.h',
'<(src_loc)/base/bytes.h',
'<(src_loc)/base/concurrent_timer.cpp',
'<(src_loc)/base/concurrent_timer.h',
'<(src_loc)/base/flags.h',
'<(src_loc)/base/enum_mask.h',
'<(src_loc)/base/flat_map.h',

View File

@ -34,6 +34,11 @@
],
'dependencies': [
'crl.gyp:crl',
'lib_base.gyp:lib_base',
],
'export_dependent_settings': [
'crl.gyp:crl',
'lib_base.gyp:lib_base',
],
'include_dirs': [
'<(src_loc)',

View File

@ -125,7 +125,6 @@
],
'dependencies': [
'../lib_storage.gyp:lib_storage',
'../crl.gyp:crl',
],
'sources': [
'<(src_loc)/storage/storage_encrypted_file_tests.cpp',