diff --git a/src/main/java/de/mrbesen/telegram/AsyncHandler.java b/src/main/java/de/mrbesen/telegram/AsyncHandler.java index 3c8b36b..0cb2769 100644 --- a/src/main/java/de/mrbesen/telegram/AsyncHandler.java +++ b/src/main/java/de/mrbesen/telegram/AsyncHandler.java @@ -2,124 +2,63 @@ package de.mrbesen.telegram; import org.json.JSONObject; -import java.net.UnknownHostException; import java.util.LinkedList; import java.util.List; -public class AsyncHandler implements Runnable { +public class AsyncHandler { - private List tasks = new LinkedList<>(); - private Thread[] asynchandlerthread; - private int threadsRunning = 0; - private TelegramAPI api; + private List fasttasks = new LinkedList<>(); + private List slowtasks = new LinkedList<>(); + + private AsyncHandlerThread[] asynchandlerthread; private static final String THREADPREFIX = "AsyncTgHandler-"; - //just use 1 async thread + // just use 2 async threads public AsyncHandler(TelegramAPI api) { - this(api, 1); + this(api, 2); } //allow as many as threadCount threads to handle Async Tasks public AsyncHandler(TelegramAPI api, int threadCount) { - this.api = api; - asynchandlerthread = new Thread[threadCount]; + asynchandlerthread = new AsyncHandlerThread[threadCount]; + + asynchandlerthread[0] = new AsyncHandlerThread(fasttasks, THREADPREFIX + "Fast-0", api); + + for(int i = 1; i < threadCount; ++i) { + asynchandlerthread[i] = new AsyncHandlerThread(slowtasks, THREADPREFIX + "Slow-" + i, api); + } + } + + public void stop() { + for(AsyncHandlerThread t : asynchandlerthread) { + t.stop(); + } } public void enque(Task t) { - enque(t, false); + enque(t, false, false); } - public void enque(Task t, boolean priority) { + public void enque(Task t, boolean isSlowMethod, boolean priority) { + List tasks = isSlowMethod ? slowtasks : fasttasks; + if(priority) { synchronized (tasks) { tasks.add(0, t); + tasks.notifyAll(); } } else { synchronized (tasks) { tasks.add(t); + tasks.notifyAll(); } } - - //muss nicht im syncronized liegen, weil es nur ein read ist - ensureThreads(tasks.size()); } public void enque(String request, String parameters, long userid) { enque(new Task(request, parameters, userid)); } - //makes sure, that at least count Threads are running - private void ensureThreads(int count) { - if(count < 1) count = 1; - - synchronized (asynchandlerthread) { - if(threadsRunning >= count) return; //mus im syncronized liegen, damit es nicht zu problemen kommt, wenn diese funktion parallel aufgerufen wird - - //alle threads durchgehen und leere (null) mit neuen Threads füllen - for(int i = 0; i < asynchandlerthread.length && count > 0; i++) { - if(asynchandlerthread[i] == null) { - //spawn a thread - asynchandlerthread[i] = new Thread(this, THREADPREFIX + i); - asynchandlerthread[i].start(); - threadsRunning++; - } - count --; - } - } - } - - @Override - public void run() { - int failed = 0; - while(!tasks.isEmpty()) { - Task current; - - synchronized (tasks) { - current = tasks.remove(0); - } - - if(current == null) - continue; - - //run task - try { - try { - Object obj = api.request(current.apimethod, current.parameters, current.userid); - Callback callb = current.callback; - while(callb != null) { - obj = (Object) callb.callObj(obj); - callb = callb.next; - // throw new Exception("Callbacktype missmatch! Got " + obj.getClass().getSimpleName() + " Wanted: " + wanted.getSimpleName() ); - } - failed = 0; - } catch(UnknownHostException ex) { //host(api.telegram.org) is good -> bad inet - failed ++; - if(failed > 10) - try { - Thread.sleep(1000);//wait 1 second - } catch(InterruptedException ignored) {} - //reenque - enque(current); - } catch(Throwable t) { - if(current.exceptionhandl == null) - throw t; - System.out.println("Exception " + t.getClass().getSimpleName() + " handled by " + current.exceptionhandl.getClass().getSimpleName()); - current.exceptionhandl.call(t); - } - } catch(Throwable t) { - System.out.println("Error executing Task: "); - t.printStackTrace(); - } - } - - int threadid = Integer.parseInt(Thread.currentThread().getName().substring(THREADPREFIX.length())); - - synchronized (asynchandlerthread) { - threadsRunning --; - asynchandlerthread[threadid] = null; //delete thread - } - } - public static class Task { String apimethod; String parameters; @@ -128,9 +67,7 @@ public class AsyncHandler implements Runnable { final long userid; public Task(String apimethod, String parameters, long userid) { - this.apimethod = apimethod; - this.parameters = parameters; - this.userid = userid; + this(apimethod, parameters, userid, null); } public Task(String apimethod, String parameters, long userid, Callback callback) { diff --git a/src/main/java/de/mrbesen/telegram/AsyncHandlerThread.java b/src/main/java/de/mrbesen/telegram/AsyncHandlerThread.java new file mode 100644 index 0000000..3f7f957 --- /dev/null +++ b/src/main/java/de/mrbesen/telegram/AsyncHandlerThread.java @@ -0,0 +1,98 @@ +package de.mrbesen.telegram; + +import java.net.UnknownHostException; +import java.util.List; + +import de.mrbesen.telegram.AsyncHandler.Callback; +import de.mrbesen.telegram.AsyncHandler.Task; + +public class AsyncHandlerThread implements Runnable { + + private boolean shouldrun = false; + private TelegramAPI api; + private int failed = 0; + + private Thread thread; + + private List tasks; + + AsyncHandlerThread(List tasks, String name, TelegramAPI api) { + this.tasks = tasks; + shouldrun = true; + thread = new Thread(this, name); + thread.start(); + } + + public void stop() { + shouldrun = false; + + thread.interrupt(); + + try { + thread.join(); + } catch (InterruptedException e) { + } + } + + @Override + public void run() { + while (shouldrun) { + Task t; + + synchronized (tasks) { + while (tasks.isEmpty()) { + try { + tasks.wait(); + } catch (InterruptedException e) { + } + } + + t = tasks.remove(0); + tasks.notifyAll(); + } + + if (t == null) + continue; + + // run task + try { + processTask(t); + } catch (Throwable thro) { + api.log.log("Error executing Task: "); + thro.printStackTrace(); + } + } + } + + private void processTask(Task t) throws Throwable { + try { + Object obj = api.request(t.apimethod, t.parameters, t.userid); + Callback callb = t.callback; + while (callb != null) { + obj = (Object) callb.callObj(obj); + callb = callb.next; + // throw new Exception("Callbacktype missmatch! Got " + + // obj.getClass().getSimpleName() + " Wanted: " + wanted.getSimpleName() ); + } + failed = 0; + } catch (UnknownHostException ex) { // host(api.telegram.org) is good -> bad inet + failed++; + if (failed > 10) + try { + Thread.sleep(1000); // wait 1 second + } catch (InterruptedException ignored) { + } + + // reenque + synchronized (tasks) { + tasks.add(0, t); + } + } catch (Throwable thro) { + if (t.exceptionhandl == null) + throw thro; + api.log.log("Exception " + thro.getClass().getSimpleName() + " handled by " + + t.exceptionhandl.getClass().getSimpleName()); + t.exceptionhandl.call(thro); + } + } +} diff --git a/src/main/java/de/mrbesen/telegram/MessageBuilder.java b/src/main/java/de/mrbesen/telegram/MessageBuilder.java index bafd375..9e7f028 100644 --- a/src/main/java/de/mrbesen/telegram/MessageBuilder.java +++ b/src/main/java/de/mrbesen/telegram/MessageBuilder.java @@ -17,7 +17,7 @@ public class MessageBuilder { private boolean silent = false; private boolean no_web_view = false; private boolean allow_sending_without_reply = false; - private long reciver_id = 0; + private long receiver_id = 0; private int reply_to_message_id = 0; private TReplyMarkup markup = null; private boolean async = false; @@ -37,12 +37,12 @@ public class MessageBuilder { public MessageBuilder() { } public MessageBuilder setReciver(long id) { - reciver_id = id; + receiver_id = id; return this; } public MessageBuilder setReciver(TUser user) { - reciver_id = user.getID(); + receiver_id = user.getID(); return this; } @@ -80,7 +80,7 @@ public class MessageBuilder { } public MessageBuilder setReplyTo(TMessage msg) { - if(reciver_id == 0) + if(receiver_id == 0) setReciver(msg.getChatID()); return setReplyTo(msg.getMessageID()); } @@ -163,7 +163,7 @@ public class MessageBuilder { } public SendableMessage build() { - if(reciver_id == 0) { + if(receiver_id == 0) { throw new MissingException("Reciver"); } String optionals = ""; @@ -213,13 +213,14 @@ public class MessageBuilder { optionals += "&allow_sending_without_reply=true"; } - String q = "chat_id=" + reciver_id + text + optionals + attachment; + String q = "chat_id=" + receiver_id + text + optionals + attachment; if(async) { - AsyncSendable tmp = new AsyncSendable(cmd, q, reciver_id, callback, excpt); + AsyncSendable tmp = new AsyncSendable(cmd, q, receiver_id, callback, excpt); tmp.prio = asyncprio; + tmp.isSlow = attachmenttype != Attachment.none; return tmp; } - return new SendableMessage(cmd, q, reciver_id); + return new SendableMessage(cmd, q, receiver_id); } /** @@ -243,6 +244,7 @@ public class MessageBuilder { Callback callback; Callback excpt = null; boolean prio = false; + boolean isSlow = false; // is it expected that this message is slow to porcess? sendMedia public AsyncSendable(String cmd, String q, long userid, Callback clb, Callback excpt) { super(cmd, q, userid); diff --git a/src/main/java/de/mrbesen/telegram/TelegramAPI.java b/src/main/java/de/mrbesen/telegram/TelegramAPI.java index 6837406..a81227d 100644 --- a/src/main/java/de/mrbesen/telegram/TelegramAPI.java +++ b/src/main/java/de/mrbesen/telegram/TelegramAPI.java @@ -46,7 +46,6 @@ public class TelegramAPI implements Runnable { private int msg_offset = 0; private int updateInterval = 60; - private String apikey; private String botname; private Thread thread; private boolean run = true; @@ -97,13 +96,13 @@ public class TelegramAPI implements Runnable { } public TelegramAPI(String apiurl, String apikey, String botname) { - this.apiurl = (apiurl == null ? API_URL_DEFAULT : apiurl); + this.apiurl = (apiurl == null ? API_URL_DEFAULT : apiurl); this.botname = botname != null ? botname : ""; if (!apikey.matches(TOKENREGEX) ) { throw new IllegalArgumentException("Invalid API key: " + apikey); } - this.apiurl += this.apikey = apikey; + this.apiurl += apikey; } public void start() { @@ -243,7 +242,7 @@ public class TelegramAPI implements Runnable { adapter.next = asyncm.callback; Task t = new Task(msg.getCommand(), msg.getQ(), msg.getUserid(), adapter); t.setExceptionhandl(asyncm.excpt == null ? IOE400supressor : asyncm.excpt); - async.enque(t); + async.enque(t, asyncm.isSlow, asyncm.prio); } else { JSONObject o = request(msg.getCommand(), msg.getQ(), msg.getUserid(), true); return new TMessage(o.getJSONObject("result"), this); @@ -313,7 +312,7 @@ public class TelegramAPI implements Runnable { callback.accept(file); return null; } - })); + }), true, false); } public void sendTypedMessage(final String msg, final TUser user, final int seconds) {