290 lines
7.3 KiB
TypeScript
290 lines
7.3 KiB
TypeScript
/*
|
|
* https://github.com/morethanwords/tweb
|
|
* Copyright (C) 2019-2021 Eduard Kuzmenko
|
|
* https://github.com/morethanwords/tweb/blob/master/LICENSE
|
|
*/
|
|
|
|
import {logger} from '../lib/logger';
|
|
import insertInDescendSortedArray from './array/insertInDescendSortedArray';
|
|
import {getMiddleware, MiddlewareHelper} from './middleware';
|
|
import middlewarePromise from './middlewarePromise';
|
|
import safeAssign from './object/safeAssign';
|
|
import pause from './schedulers/pause';
|
|
|
|
export type SortedElementBase<T = any> = {
|
|
id: T,
|
|
index: number
|
|
};
|
|
|
|
let id = 0;
|
|
|
|
export class BatchProcessor<Item extends any = any> {
|
|
protected queue: Promise<Item>[];
|
|
protected promise: Promise<void>;
|
|
|
|
protected middlewareHelper: MiddlewareHelper;
|
|
protected log: ReturnType<typeof logger>;
|
|
|
|
protected process: (batch: Item[], m: ReturnType<typeof middlewarePromise>, log: BatchProcessor['log']) => Promise<any>;
|
|
protected possibleError: any;
|
|
|
|
constructor(options: {
|
|
log?: BatchProcessor['log'],
|
|
// middleware: MiddlewareHelper,
|
|
process: BatchProcessor<Item>['process'],
|
|
possibleError?: BatchProcessor['possibleError']
|
|
}) {
|
|
safeAssign(this, options);
|
|
|
|
this.queue = [];
|
|
this.middlewareHelper ??= getMiddleware();
|
|
|
|
const prefix = 'BATCH-PROCESSOR-' + ++id;
|
|
if(this.log) {
|
|
this.log = this.log.bindPrefix(prefix);
|
|
} else {
|
|
this.log = logger(prefix);
|
|
}
|
|
}
|
|
|
|
public get queuePromise() {
|
|
return this.promise;
|
|
}
|
|
|
|
public clear() {
|
|
this.log('clear');
|
|
this.queue.length = 0;
|
|
this.promise = undefined;
|
|
this.middlewareHelper.clean();
|
|
}
|
|
|
|
public addToQueue(item: BatchProcessor<Item>['queue'][0]) {
|
|
this.queue.push(item);
|
|
return this.setQueue();
|
|
}
|
|
|
|
protected setQueue() {
|
|
if(!this.queue.length) {
|
|
return Promise.resolve();
|
|
}
|
|
|
|
if(this.promise) {
|
|
return this.promise;
|
|
}
|
|
|
|
const middleware = this.middlewareHelper.get();
|
|
const log = this.log.bindPrefix('queue');
|
|
const m = middlewarePromise(middleware, this.possibleError);
|
|
|
|
const processQueue = async(): Promise<void> => {
|
|
log('start', this.queue.length);
|
|
|
|
const queue = this.queue.splice(0, this.queue.length);
|
|
|
|
const perf = performance.now();
|
|
const promises = queue.map((promise) => {
|
|
promise.then((details) => {
|
|
log('render item time', performance.now() - perf, details);
|
|
});
|
|
|
|
return promise;
|
|
});
|
|
|
|
const renderedQueue = await m(Promise.all(promises));
|
|
await m(this.process(renderedQueue, m, log));
|
|
|
|
log('queue rendered');
|
|
|
|
if(this.queue.length) {
|
|
log('have new items to render');
|
|
return processQueue();
|
|
} else {
|
|
log('end');
|
|
}
|
|
};
|
|
|
|
log('setting pause');
|
|
const promise = this.promise = m(pause(0))
|
|
.then(
|
|
() => processQueue().catch((err: ApiError) => {
|
|
if(err !== this.possibleError) {
|
|
log.error('process queue error', err);
|
|
}
|
|
|
|
throw err;
|
|
}),
|
|
(err) => {
|
|
log('pause has been cleared');
|
|
throw err;
|
|
}
|
|
)
|
|
.finally(() => {
|
|
if(this.promise === promise) {
|
|
this.promise = undefined;
|
|
}
|
|
});
|
|
|
|
return promise;
|
|
}
|
|
}
|
|
|
|
export default class SortedList<SortedElement extends SortedElementBase, SortedElementId = SortedElement['id']> {
|
|
protected elements: Map<SortedElementId, SortedElement>;
|
|
protected sorted: Array<SortedElement>;
|
|
|
|
protected getIndex: (element: SortedElement) => PromiseLike<number> | number;
|
|
protected onDelete: (element: SortedElement) => void;
|
|
protected onUpdate: (element: SortedElement) => void;
|
|
protected onSort: (element: SortedElement, idx: number) => void;
|
|
protected onElementCreate: (base: SortedElementBase) => PromiseLike<SortedElement> | SortedElement;
|
|
|
|
protected updateElementWith = (callback: () => void) => callback();
|
|
protected updateListWith = (callback: (canUpdate: boolean | undefined) => void) => callback(true);
|
|
|
|
protected middleware: MiddlewareHelper;
|
|
|
|
protected batchProcessor: BatchProcessor<SortedElement>;
|
|
|
|
protected log: ReturnType<typeof logger>;
|
|
|
|
constructor(options: {
|
|
getIndex: SortedList<SortedElement>['getIndex'],
|
|
onDelete?: SortedList<SortedElement>['onDelete'],
|
|
onUpdate?: SortedList<SortedElement>['onUpdate'],
|
|
onSort?: SortedList<SortedElement>['onSort'],
|
|
onElementCreate: SortedList<SortedElement>['onElementCreate'],
|
|
|
|
updateElementWith?: SortedList<SortedElement>['updateElementWith'],
|
|
updateListWith?: SortedList<SortedElement>['updateListWith'],
|
|
|
|
log?: SortedList<SortedElement>['log']
|
|
}) {
|
|
safeAssign(this, options);
|
|
|
|
this.elements = new Map();
|
|
this.sorted = [];
|
|
this.middleware = getMiddleware();
|
|
|
|
this.batchProcessor = new BatchProcessor<SortedElement>({
|
|
log: this.log,
|
|
process: async(batch, m, log) => {
|
|
// const elements = await Promise.all(batch.map((element) => this.onElementCreate(element)));
|
|
const elements = batch;
|
|
const promises = elements.map((element) => this.update(element.id, element));
|
|
await m(Promise.all(promises));
|
|
}
|
|
});
|
|
}
|
|
|
|
public clear() {
|
|
this.batchProcessor.clear();
|
|
this.middleware.clean();
|
|
this.elements.clear();
|
|
this.sorted.length = 0;
|
|
}
|
|
|
|
protected _updateList() {
|
|
this.elements.forEach((element) => {
|
|
this.update(element.id);
|
|
});
|
|
|
|
if(this.onSort) {
|
|
this.sorted.forEach((element, idx) => {
|
|
this.onSort(element, idx);
|
|
});
|
|
}
|
|
}
|
|
|
|
public updateList(callback?: (updated: boolean) => void) {
|
|
const middleware = this.middleware.get();
|
|
this.updateListWith((canUpdate) => {
|
|
if(!middleware() || (canUpdate !== undefined && !canUpdate)) {
|
|
callback?.(false);
|
|
return;
|
|
}
|
|
|
|
this._updateList();
|
|
|
|
callback?.(true);
|
|
});
|
|
}
|
|
|
|
public has(id: SortedElementId) {
|
|
return this.elements.has(id);
|
|
}
|
|
|
|
public get(id: SortedElementId) {
|
|
return this.elements.get(id);
|
|
}
|
|
|
|
public getAll() {
|
|
return this.elements;
|
|
}
|
|
|
|
public async add(id: SortedElementId) {
|
|
const element = this.get(id);
|
|
if(element) {
|
|
return;
|
|
// return element;
|
|
}
|
|
|
|
const base: SortedElementBase = {
|
|
id,
|
|
index: 0
|
|
};
|
|
|
|
this.elements.set(id, base as SortedElement);
|
|
const createPromise = Promise.resolve(this.onElementCreate(base));
|
|
return this.batchProcessor.addToQueue(createPromise);
|
|
|
|
// return element;
|
|
}
|
|
|
|
public delete(id: SortedElementId, noScheduler?: boolean) {
|
|
const element = this.elements.get(id);
|
|
if(!element) {
|
|
return false;
|
|
}
|
|
|
|
this.elements.delete(id);
|
|
|
|
const idx = this.sorted.indexOf(element);
|
|
if(idx !== -1) {
|
|
this.sorted.splice(idx, 1);
|
|
}
|
|
|
|
if(this.onDelete) {
|
|
if(noScheduler) {
|
|
this.onDelete(element);
|
|
} else {
|
|
const middleware = this.middleware.get();
|
|
this.updateElementWith(() => {
|
|
if(!middleware()) {
|
|
return;
|
|
}
|
|
|
|
this.onDelete(element);
|
|
});
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
public async update(id: SortedElementId, element = this.get(id)) {
|
|
if(!element) {
|
|
return;
|
|
}
|
|
|
|
element.index = await this.getIndex(element);
|
|
if(this.get(id) !== element) {
|
|
return;
|
|
}
|
|
|
|
this.onUpdate?.(element);
|
|
|
|
const idx = insertInDescendSortedArray(this.sorted, element, 'index');
|
|
this.onSort(element, idx);
|
|
}
|
|
}
|