AsyncHandlerThread

This commit is contained in:
Oliver 2022-07-13 19:25:03 +02:00
parent b9d0616774
commit 9440f84b41
Signed by: okaestne
GPG Key ID: 06A81B143EA9588F
4 changed files with 139 additions and 103 deletions

View File

@ -2,124 +2,63 @@ package de.mrbesen.telegram;
import org.json.JSONObject; import org.json.JSONObject;
import java.net.UnknownHostException;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
public class AsyncHandler implements Runnable { public class AsyncHandler {
private List<Task> tasks = new LinkedList<>(); private List<Task> fasttasks = new LinkedList<>();
private Thread[] asynchandlerthread; private List<Task> slowtasks = new LinkedList<>();
private int threadsRunning = 0;
private TelegramAPI api; private AsyncHandlerThread[] asynchandlerthread;
private static final String THREADPREFIX = "AsyncTgHandler-"; private static final String THREADPREFIX = "AsyncTgHandler-";
//just use 1 async thread // just use 2 async threads
public AsyncHandler(TelegramAPI api) { public AsyncHandler(TelegramAPI api) {
this(api, 1); this(api, 2);
} }
//allow as many as threadCount threads to handle Async Tasks //allow as many as threadCount threads to handle Async Tasks
public AsyncHandler(TelegramAPI api, int threadCount) { public AsyncHandler(TelegramAPI api, int threadCount) {
this.api = api; asynchandlerthread = new AsyncHandlerThread[threadCount];
asynchandlerthread = new Thread[threadCount];
asynchandlerthread[0] = new AsyncHandlerThread(fasttasks, THREADPREFIX + "Fast-0", api);
for(int i = 1; i < threadCount; ++i) {
asynchandlerthread[i] = new AsyncHandlerThread(slowtasks, THREADPREFIX + "Slow-" + i, api);
}
}
public void stop() {
for(AsyncHandlerThread t : asynchandlerthread) {
t.stop();
}
} }
public void enque(Task t) { public void enque(Task t) {
enque(t, false); enque(t, false, false);
} }
public void enque(Task t, boolean priority) { public void enque(Task t, boolean isSlowMethod, boolean priority) {
List<Task> tasks = isSlowMethod ? slowtasks : fasttasks;
if(priority) { if(priority) {
synchronized (tasks) { synchronized (tasks) {
tasks.add(0, t); tasks.add(0, t);
tasks.notifyAll();
} }
} else { } else {
synchronized (tasks) { synchronized (tasks) {
tasks.add(t); tasks.add(t);
tasks.notifyAll();
} }
} }
//muss nicht im syncronized liegen, weil es nur ein read ist
ensureThreads(tasks.size());
} }
public void enque(String request, String parameters, long userid) { public void enque(String request, String parameters, long userid) {
enque(new Task(request, parameters, userid)); enque(new Task(request, parameters, userid));
} }
//makes sure, that at least count Threads are running
private void ensureThreads(int count) {
if(count < 1) count = 1;
synchronized (asynchandlerthread) {
if(threadsRunning >= count) return; //mus im syncronized liegen, damit es nicht zu problemen kommt, wenn diese funktion parallel aufgerufen wird
//alle threads durchgehen und leere (null) mit neuen Threads füllen
for(int i = 0; i < asynchandlerthread.length && count > 0; i++) {
if(asynchandlerthread[i] == null) {
//spawn a thread
asynchandlerthread[i] = new Thread(this, THREADPREFIX + i);
asynchandlerthread[i].start();
threadsRunning++;
}
count --;
}
}
}
@Override
public void run() {
int failed = 0;
while(!tasks.isEmpty()) {
Task current;
synchronized (tasks) {
current = tasks.remove(0);
}
if(current == null)
continue;
//run task
try {
try {
Object obj = api.request(current.apimethod, current.parameters, current.userid);
Callback<?, ?> callb = current.callback;
while(callb != null) {
obj = (Object) callb.callObj(obj);
callb = callb.next;
// throw new Exception("Callbacktype missmatch! Got " + obj.getClass().getSimpleName() + " Wanted: " + wanted.getSimpleName() );
}
failed = 0;
} catch(UnknownHostException ex) { //host(api.telegram.org) is good -> bad inet
failed ++;
if(failed > 10)
try {
Thread.sleep(1000);//wait 1 second
} catch(InterruptedException ignored) {}
//reenque
enque(current);
} catch(Throwable t) {
if(current.exceptionhandl == null)
throw t;
System.out.println("Exception " + t.getClass().getSimpleName() + " handled by " + current.exceptionhandl.getClass().getSimpleName());
current.exceptionhandl.call(t);
}
} catch(Throwable t) {
System.out.println("Error executing Task: ");
t.printStackTrace();
}
}
int threadid = Integer.parseInt(Thread.currentThread().getName().substring(THREADPREFIX.length()));
synchronized (asynchandlerthread) {
threadsRunning --;
asynchandlerthread[threadid] = null; //delete thread
}
}
public static class Task { public static class Task {
String apimethod; String apimethod;
String parameters; String parameters;
@ -128,9 +67,7 @@ public class AsyncHandler implements Runnable {
final long userid; final long userid;
public Task(String apimethod, String parameters, long userid) { public Task(String apimethod, String parameters, long userid) {
this.apimethod = apimethod; this(apimethod, parameters, userid, null);
this.parameters = parameters;
this.userid = userid;
} }
public Task(String apimethod, String parameters, long userid, Callback<JSONObject, ?> callback) { public Task(String apimethod, String parameters, long userid, Callback<JSONObject, ?> callback) {

View File

@ -0,0 +1,98 @@
package de.mrbesen.telegram;
import java.net.UnknownHostException;
import java.util.List;
import de.mrbesen.telegram.AsyncHandler.Callback;
import de.mrbesen.telegram.AsyncHandler.Task;
public class AsyncHandlerThread implements Runnable {
private boolean shouldrun = false;
private TelegramAPI api;
private int failed = 0;
private Thread thread;
private List<Task> tasks;
AsyncHandlerThread(List<Task> tasks, String name, TelegramAPI api) {
this.tasks = tasks;
shouldrun = true;
thread = new Thread(this, name);
thread.start();
}
public void stop() {
shouldrun = false;
thread.interrupt();
try {
thread.join();
} catch (InterruptedException e) {
}
}
@Override
public void run() {
while (shouldrun) {
Task t;
synchronized (tasks) {
while (tasks.isEmpty()) {
try {
tasks.wait();
} catch (InterruptedException e) {
}
}
t = tasks.remove(0);
tasks.notifyAll();
}
if (t == null)
continue;
// run task
try {
processTask(t);
} catch (Throwable thro) {
api.log.log("Error executing Task: ");
thro.printStackTrace();
}
}
}
private void processTask(Task t) throws Throwable {
try {
Object obj = api.request(t.apimethod, t.parameters, t.userid);
Callback<?, ?> callb = t.callback;
while (callb != null) {
obj = (Object) callb.callObj(obj);
callb = callb.next;
// throw new Exception("Callbacktype missmatch! Got " +
// obj.getClass().getSimpleName() + " Wanted: " + wanted.getSimpleName() );
}
failed = 0;
} catch (UnknownHostException ex) { // host(api.telegram.org) is good -> bad inet
failed++;
if (failed > 10)
try {
Thread.sleep(1000); // wait 1 second
} catch (InterruptedException ignored) {
}
// reenque
synchronized (tasks) {
tasks.add(0, t);
}
} catch (Throwable thro) {
if (t.exceptionhandl == null)
throw thro;
api.log.log("Exception " + thro.getClass().getSimpleName() + " handled by "
+ t.exceptionhandl.getClass().getSimpleName());
t.exceptionhandl.call(thro);
}
}
}

View File

@ -17,7 +17,7 @@ public class MessageBuilder {
private boolean silent = false; private boolean silent = false;
private boolean no_web_view = false; private boolean no_web_view = false;
private boolean allow_sending_without_reply = false; private boolean allow_sending_without_reply = false;
private long reciver_id = 0; private long receiver_id = 0;
private int reply_to_message_id = 0; private int reply_to_message_id = 0;
private TReplyMarkup markup = null; private TReplyMarkup markup = null;
private boolean async = false; private boolean async = false;
@ -37,12 +37,12 @@ public class MessageBuilder {
public MessageBuilder() { } public MessageBuilder() { }
public MessageBuilder setReciver(long id) { public MessageBuilder setReciver(long id) {
reciver_id = id; receiver_id = id;
return this; return this;
} }
public MessageBuilder setReciver(TUser user) { public MessageBuilder setReciver(TUser user) {
reciver_id = user.getID(); receiver_id = user.getID();
return this; return this;
} }
@ -80,7 +80,7 @@ public class MessageBuilder {
} }
public MessageBuilder setReplyTo(TMessage msg) { public MessageBuilder setReplyTo(TMessage msg) {
if(reciver_id == 0) if(receiver_id == 0)
setReciver(msg.getChatID()); setReciver(msg.getChatID());
return setReplyTo(msg.getMessageID()); return setReplyTo(msg.getMessageID());
} }
@ -163,7 +163,7 @@ public class MessageBuilder {
} }
public SendableMessage build() { public SendableMessage build() {
if(reciver_id == 0) { if(receiver_id == 0) {
throw new MissingException("Reciver"); throw new MissingException("Reciver");
} }
String optionals = ""; String optionals = "";
@ -213,13 +213,14 @@ public class MessageBuilder {
optionals += "&allow_sending_without_reply=true"; optionals += "&allow_sending_without_reply=true";
} }
String q = "chat_id=" + reciver_id + text + optionals + attachment; String q = "chat_id=" + receiver_id + text + optionals + attachment;
if(async) { if(async) {
AsyncSendable tmp = new AsyncSendable(cmd, q, reciver_id, callback, excpt); AsyncSendable tmp = new AsyncSendable(cmd, q, receiver_id, callback, excpt);
tmp.prio = asyncprio; tmp.prio = asyncprio;
tmp.isSlow = attachmenttype != Attachment.none;
return tmp; return tmp;
} }
return new SendableMessage(cmd, q, reciver_id); return new SendableMessage(cmd, q, receiver_id);
} }
/** /**
@ -243,6 +244,7 @@ public class MessageBuilder {
Callback<TMessage, ?> callback; Callback<TMessage, ?> callback;
Callback<Throwable, Void> excpt = null; Callback<Throwable, Void> excpt = null;
boolean prio = false; boolean prio = false;
boolean isSlow = false; // is it expected that this message is slow to porcess? sendMedia
public AsyncSendable(String cmd, String q, long userid, Callback<TMessage, ?> clb, Callback<Throwable, Void> excpt) { public AsyncSendable(String cmd, String q, long userid, Callback<TMessage, ?> clb, Callback<Throwable, Void> excpt) {
super(cmd, q, userid); super(cmd, q, userid);

View File

@ -46,7 +46,6 @@ public class TelegramAPI implements Runnable {
private int msg_offset = 0; private int msg_offset = 0;
private int updateInterval = 60; private int updateInterval = 60;
private String apikey;
private String botname; private String botname;
private Thread thread; private Thread thread;
private boolean run = true; private boolean run = true;
@ -97,13 +96,13 @@ public class TelegramAPI implements Runnable {
} }
public TelegramAPI(String apiurl, String apikey, String botname) { public TelegramAPI(String apiurl, String apikey, String botname) {
this.apiurl = (apiurl == null ? API_URL_DEFAULT : apiurl); this.apiurl = (apiurl == null ? API_URL_DEFAULT : apiurl);
this.botname = botname != null ? botname : ""; this.botname = botname != null ? botname : "";
if (!apikey.matches(TOKENREGEX) ) { if (!apikey.matches(TOKENREGEX) ) {
throw new IllegalArgumentException("Invalid API key: " + apikey); throw new IllegalArgumentException("Invalid API key: " + apikey);
} }
this.apiurl += this.apikey = apikey; this.apiurl += apikey;
} }
public void start() { public void start() {
@ -243,7 +242,7 @@ public class TelegramAPI implements Runnable {
adapter.next = asyncm.callback; adapter.next = asyncm.callback;
Task t = new Task(msg.getCommand(), msg.getQ(), msg.getUserid(), adapter); Task t = new Task(msg.getCommand(), msg.getQ(), msg.getUserid(), adapter);
t.setExceptionhandl(asyncm.excpt == null ? IOE400supressor : asyncm.excpt); t.setExceptionhandl(asyncm.excpt == null ? IOE400supressor : asyncm.excpt);
async.enque(t); async.enque(t, asyncm.isSlow, asyncm.prio);
} else { } else {
JSONObject o = request(msg.getCommand(), msg.getQ(), msg.getUserid(), true); JSONObject o = request(msg.getCommand(), msg.getQ(), msg.getUserid(), true);
return new TMessage(o.getJSONObject("result"), this); return new TMessage(o.getJSONObject("result"), this);
@ -313,7 +312,7 @@ public class TelegramAPI implements Runnable {
callback.accept(file); callback.accept(file);
return null; return null;
} }
})); }), true, false);
} }
public void sendTypedMessage(final String msg, final TUser user, final int seconds) { public void sendTypedMessage(final String msg, final TUser user, final int seconds) {