TelegramAPI/src/main/java/de/mrbesen/telegram/AsyncHandler.java

158 lines
4.2 KiB
Java

package de.mrbesen.telegram;
import org.json.JSONObject;
import java.net.UnknownHostException;
import java.util.LinkedList;
import java.util.List;
public class AsyncHandler implements Runnable {
private List<Task> tasks = new LinkedList<>();
private Thread[] asynchandlerthread;
private int threadsRunning = 0;
private TelegramAPI api;
private static final String THREADPREFIX = "AsyncTgHandler-";
//just use 1 async thread
public AsyncHandler(TelegramAPI api) {
this(api, 1);
}
//allow as many as threadCount threads to handle Async Tasks
public AsyncHandler(TelegramAPI api, int threadCount) {
this.api = api;
asynchandlerthread = new Thread[threadCount];
}
public void enque(Task t) {
enque(t, false);
}
public void enque(Task t, boolean priority) {
if(priority) {
synchronized (tasks) {
tasks.add(0, t);
}
} else {
synchronized (tasks) {
tasks.add(t);
}
}
//muss nicht im syncronized liegen, weil es nur ein read ist
ensureThreads(tasks.size());
}
public void enque(String request, String parameters, long userid) {
enque(new Task(request, parameters, userid));
}
//makes sure, that at least count Threads are running
private void ensureThreads(int count) {
if(count < 1) count = 1;
synchronized (asynchandlerthread) {
if(threadsRunning >= count) return; //mus im syncronized liegen, damit es nicht zu problemen kommt, wenn diese funktion parallel aufgerufen wird
//alle threads durchgehen und leere (null) mit neuen Threads füllen
for(int i = 0; i < asynchandlerthread.length && count > 0; i++) {
if(asynchandlerthread[i] == null) {
//spawn a thread
asynchandlerthread[i] = new Thread(this, THREADPREFIX + i);
asynchandlerthread[i].start();
threadsRunning++;
}
count --;
}
}
}
@Override
public void run() {
int failed = 0;
while(!tasks.isEmpty()) {
Task current;
synchronized (tasks) {
current = tasks.remove(0);
}
if(current == null)
continue;
//run task
try {
try {
Object obj = api.request(current.apimethod, current.parameters, current.userid);
Callback<?, ?> callb = current.callback;
while(callb != null) {
obj = (Object) callb.callObj(obj);
callb = callb.next;
// throw new Exception("Callbacktype missmatch! Got " + obj.getClass().getSimpleName() + " Wanted: " + wanted.getSimpleName() );
}
failed = 0;
} catch(UnknownHostException ex) { //host(api.telegram.org) is good -> bad inet
failed ++;
if(failed > 10)
try {
Thread.sleep(1000);//wait 1 second
} catch(InterruptedException ignored) {}
//reenque
enque(current);
} catch(Throwable t) {
if(current.exceptionhandl == null)
throw t;
System.out.println("Exception " + t.getClass().getSimpleName() + " handled by " + current.exceptionhandl.getClass().getSimpleName());
current.exceptionhandl.call(t);
}
} catch(Throwable t) {
System.out.println("Error executing Task: ");
t.printStackTrace();
}
}
int threadid = Integer.parseInt(Thread.currentThread().getName().substring(THREADPREFIX.length()));
synchronized (asynchandlerthread) {
threadsRunning --;
asynchandlerthread[threadid] = null; //delete thread
}
}
public static class Task {
String apimethod;
String parameters;
Callback<JSONObject, ?> callback = null;
Callback<Throwable, ?> exceptionhandl = null;
final long userid;
public Task(String apimethod, String parameters, long userid) {
this.apimethod = apimethod;
this.parameters = parameters;
this.userid = userid;
}
public Task(String apimethod, String parameters, long userid, Callback<JSONObject, ?> callback) {
this.apimethod = apimethod;
this.parameters = parameters;
this.userid = userid;
this.callback = callback;
}
public Task setExceptionhandl(Callback<Throwable, ?> exceptionhandl) {
this.exceptionhandl = exceptionhandl;
return this;
}
}
public static abstract class Callback<T extends Object, K extends Object> {
public Callback<K, ?> next = null;
public abstract K call(T j) throws Throwable;
@SuppressWarnings("unchecked")
public K callObj(Object j) throws Throwable {
return call((T) j);
}
}
}