non-lock multithreading, cache DB, TelegramAdmins

This commit is contained in:
MrBesen 2018-07-23 12:27:51 +02:00
parent 0a7dc697a0
commit 4917369b34
5 changed files with 247 additions and 106 deletions

View File

@ -11,42 +11,50 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Scanner;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.log4j.Logger;
public class Crawler implements Runnable {
private static int jobspeerthread = 100; //the amount of jobs a thread get peer request
private int jobspeerthread = 100; //the amount of jobs a thread get peer request
private ReentrantReadWriteLock listlock = new ReentrantReadWriteLock(true);//only writelock is used, this lock should lock the list toCrawl and toknown because they may be accsessed by other threads
private LinkedList<String> toSave = new LinkedList<>();//all found ytids, witch need to be analysed
private LinkedList<String> toCrawl = new LinkedList<>();//all videos tu crawl
private LinkedList<String> toknown = new LinkedList<>();//list with all videos, to test if they are allready known, if not they are moved to tocrawle
private List<CrawlerThread> threads;//list of all threads
private List<CrawlerThread> requested = new LinkedList<>();
private static DateFormat dateform = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss");
private String currentstate = "undefined";
private long start;
private boolean crawl = true;
private int crawlcount = 0;
private int updateOffset = 0;
private DB db = new DB();
private YoutubeAPI api = new YoutubeAPI();
private File crawlfile = new File("crawl.txt");
private Logger log = Logger.getLogger(Crawler.class.getName());
private Logger log = Logger.getLogger(this.getClass().getName());
private int maxvideostotest;
private int startup = 10;//to keep the beginning cool
private int maxvideostotest = 100;
private int startup = 2;//to keep the beginning cool
public Crawler() {
try {
maxvideostotest = Integer.parseInt(Config.prop.getProperty("crawler.maxvideos"));
} catch(NumberFormatException e) {
log.warn("could not read the number \"" + Config.prop.getProperty("") + "\" from the config file. maxvideo");
log.warn("could not read the number \"" + Config.prop.getProperty("crawler.maxvideos") + "\" from the config file. maxvideo");
maxvideostotest = 100;
}
try {
jobspeerthread = Integer.parseInt(Config.prop.getProperty("crawler.jobspeerthread"));
} catch(NumberFormatException e) {
log.warn("could not read the number \"" + Config.prop.getProperty("crawler.jobspeerthread") + "\" from the config file. maxvideo");
jobspeerthread = 100;
}
}
public void stop() {
@ -54,10 +62,8 @@ public class Crawler implements Runnable {
}
public synchronized void addtoCrawl(String videoid) {
listlock.writeLock().lock();
if(! (toCrawl.contains(videoid) || toknown.contains(videoid)))
toknown.add(videoid);
listlock.writeLock().unlock();
}
public boolean isCrawling() {
@ -73,20 +79,20 @@ public class Crawler implements Runnable {
}
private void send(CrawlerThread t) {
listlock.writeLock().lock();
// listlock.writeLock().lock();
for(int i = 0; i < jobspeerthread && !toCrawl.isEmpty(); i++) {
t.todo.add(toCrawl.removeFirst());
}
listlock.writeLock().unlock();
// listlock.writeLock().unlock();
t.requested = false;
}
@Override
public void run() {
currentstate = "loading crawlfile";
start = System.currentTimeMillis();
log.info("Try to load crawlfile");
if(crawlfile.exists()) {
listlock.writeLock().lock();
try {
Scanner in = new Scanner(crawlfile);
boolean crawl = true;//section of file
@ -112,8 +118,6 @@ public class Crawler implements Runnable {
} catch(IOException e) {
log.warn("Error while loading crawl file.");
e.printStackTrace();
} finally {
listlock.writeLock().unlock();
}
}
@ -131,40 +135,27 @@ public class Crawler implements Runnable {
new Thread(thr, "Crawler #" + i).start();
threads.add(thr);
}
int updateOffset = 0;
while(crawl) {
log.info("to Crawl: " + toCrawl.size() + " known: " + toknown.size() + " Time: " + dateform.format(new Date()));
//fullfill request
while(!requested.isEmpty() && !toCrawl.isEmpty() && crawl) {
log.info("fullfill request");
currentstate = "fullfill requests";
send(requested.remove(0));
}
//kindof idle
while(toCrawl.size() > (jobspeerthread * threads.size()) && crawl && requested.isEmpty()) {
startup = 0;//stop startup count
currentstate = "idle";
Thread.yield();
try {
Thread.sleep(5000);
Thread.sleep(100);
} catch(InterruptedException ignored) {
break;
}
log.info("updating DB Offset= " + updateOffset);
LinkedList<String> vids = db.getUncompleted(50, updateOffset);
LinkedList<Video>[] infos = api.getInfos(vids);
if(infos != null) {
int size = infos[0].size() + infos[1].size();
if(size < 50) {
updateOffset += ((50-size)/2)+1;
}
if(infos[1].size() > 0) {
log.info("delete " + infos[1].size() + " livestreams");
db.removeVideos(infos[1]);
}
db.updateVideos(infos[0]);
log.info("Updated " + infos[0].size() + " Videos.");
}
// updateDB();
}
//nothing left?
if(toknown.isEmpty() && toCrawl.isEmpty() && requested.size() == threads.size()) {//very uncommon
@ -176,7 +167,8 @@ public class Crawler implements Runnable {
if(!toknown.isEmpty()) {
//check in db for known videos
log.info("Checking the DB");
listlock.writeLock().lock();
currentstate = "get new tocrawl";
// listlock.writeLock().lock();
while(toCrawl.size() < jobspeerthread * threads.size() * 2 && crawl && !toknown.isEmpty()) {
LinkedList<String> tocheck = new LinkedList<>();
for(int i = 0; i < toknown.size() && i < maxvideostotest; i++) {
@ -184,12 +176,19 @@ public class Crawler implements Runnable {
}
toCrawl.addAll(db.checkvideos(tocheck));
}
listlock.writeLock().unlock();
// listlock.writeLock().unlock();
}
if(toknown.size() < threadcount * jobspeerthread * 20 && crawl) {
currentstate = "restoretemp";
log.info("restoreTemp");
LinkedList<String> rest = db.restoreTemp();
toknown.addAll(rest);
}
//writing crawlfile
log.info("Writing Crawlfile");
listlock.writeLock().lock();
currentstate = "writing crawlfile";
// listlock.writeLock().lock();
try {
PrintWriter p = new PrintWriter(new BufferedWriter(new FileWriter(crawlfile)));
for(String t : toCrawl) {
@ -202,19 +201,35 @@ public class Crawler implements Runnable {
p.close();
} catch (IOException e) {
log.error("Error writing crawlfile.", e);
} finally {
listlock.writeLock().unlock();
}
//get reports
currentstate = "get report";
log.info("get report");
int count = 0;
for (CrawlerThread crawlerThread : threads) {
LinkedList<String> report = crawlerThread.report();
crawlcount+= report.size();
toSave.addAll(report);
crawlerThread.list.clear();
LinkedList<String>[] report = crawlerThread.report();
crawlcount+= report[0].size();
toSave.addAll(report[0]);
crawlerThread.crawled.clear();
while(report[1].size() > 0) {
LinkedList<String> store = new LinkedList<>();
try {
while(!report[1].isEmpty() && store.size() < 50) {
store.add(report[1].removeFirst());
count++;
}
} catch(NoSuchElementException ignored) {//concurrentmodification fuckery
}
db.storeTemp(store);
}
log.info(count + " videos added.");
crawlerThread.found.clear();
}
//save to db
currentstate = "save to DB";
log.info("save " + toSave.size() + " videos to DB.");
while(!toSave.isEmpty()) {
LinkedList<String> videoids = new LinkedList<>();
@ -230,8 +245,10 @@ public class Crawler implements Runnable {
//at the beginning there is maybe just one video to crawl, so keep it calm.
if(startup > 0) {
startup --;
currentstate = "startup sleep";
log.info("startup sleep");
try {
Thread.sleep(20000);
Thread.sleep(2000);
} catch(InterruptedException e) {}
}
}
@ -253,20 +270,46 @@ public class Crawler implements Runnable {
return new Video();
}
public void printStats() {
public String printStats() {
long runtimes = (System.currentTimeMillis() - start) / 1000;
if(runtimes < 0)
runtimes = 1;
int runtimem = (int) (runtimes / 60);
float vidps = (crawlcount / (float) runtimes);//videos per second
log.info("ToCrawl:" + toCrawl.size());
log.info("Toknown:" + toknown.size());
log.info("ToSave:" + toSave.size());
log.info("Runtime: " + runtimem + "min and " + crawlcount + " videos crawled. ( " + vidps + " v/s )");
log.info("Thread Nr, todo size, requested, listsize");
String out = "";
out += "ToCrawl: " + toCrawl.size();
out += "\nToknown: " + toknown.size();
out += "\nToSave: " + toSave.size();
out += "\nrequested: " + requested.size();
out += "\nRuntime: " + runtimem + "min and " + crawlcount + " videos crawled. ( " + vidps + " v/s )";
out += "\nState: " + currentstate;
out += "\nThread Nr, todo size, requested, crawledsize, foundsize";
for (int i = 0; i < threads.size(); i++) {
CrawlerThread thre = threads.get(i);
log.info(" " + i + " " + thre.todo.size() + " " + thre.requested + " " + thre.list.size());
out += "\n " + i + " " + thre.todo.size() + " " + thre.requested + " " + thre.crawled.size() + " " + thre.found.size();
}
return out;
}
/**
* Updates old entrys of the DB. currently unused.
*/
private void updateDB() {
log.info("updating DB Offset= " + updateOffset);
LinkedList<String> vids = db.getUncompleted(50, updateOffset);
LinkedList<Video>[] infos = api.getInfos(vids);
if(infos != null) {
int size = infos[0].size() + infos[1].size();
if(size < 50) {
updateOffset += ((50-size)/2)+1;
}
if(infos[1].size() > 0) {
log.info("delete " + infos[1].size() + " livestreams");
db.removeVideos(infos[1]);
}
db.updateVideos(infos[0]);
log.info("Updated " + infos[0].size() + " Videos.");
}
}

View File

@ -15,7 +15,8 @@ public class CrawlerThread implements Runnable {
private Crawler parent;
LinkedList<String> todo = new LinkedList<>();//videos, this thread should crawl
LinkedList<String> list = new LinkedList<>();//videos this thread had crawled
LinkedList<String> crawled = new LinkedList<>();//videos this thread had crawled
LinkedList<String> found = new LinkedList<>();//videos this thread had found
boolean requested = true;//is a request pending?
@ -53,13 +54,13 @@ public class CrawlerThread implements Runnable {
* returns a linkedlist of all crawled videos
* @return
*/
LinkedList<String> report() {
return list;
LinkedList<String>[] report() {
return new LinkedList[] {crawled, found};
}
private void crawl(String videoid) {
try {
list.add(videoid);
crawled.add(videoid);
// log.info("crawling: " + videoid);
HTTPS con = new HTTPS("https://youtube.com/watch?v=" + videoid);
@ -74,7 +75,7 @@ public class CrawlerThread implements Runnable {
}
String ytid = s.substring(beginytid, endxtid);
if(ytid.length() > 9 && ytid.length() <= 12) {
parent.addtoCrawl(ytid);
found.add(ytid);
} else {
// log.warn("youtube id has wrong length: \"" + ytid + "\"");
}

View File

@ -42,9 +42,10 @@ public class DB {
update("CREATE DATABASE `" + db + "` /*!40100 DEFAULT CHARACTER SET latin1*/;");
con.setCatalog(db);
update("CREATE TABLE `videos` (`id` varchar(13) NOT NULL,`length` int(11) NOT NULL,`created` int(11) NOT NULL,`langcode` varchar(3) NOT NULL DEFAULT 'en',`category` int(11) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `id_UNIQUE` (`id`)) ENGINE=InnoDB DEFAULT CHARSET=latin1;");
update("CREATE TABLE `temp` ( `ytid` varchar(13) NOT NULL COMMENT 'a Table to store Video ids, when they are found to process them later', PRIMARY KEY (`ytid`), UNIQUE KEY `ytid_UNIQUE` (`ytid`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;");
log.info("Database is set up!");
}
}
} catch (SQLException e) {
log.error("Error while connecting to the database! ", e);
}
@ -104,7 +105,7 @@ public class DB {
}
}
}
public void updateVideos(List<Video> input) {
log.info("Updateing " + input.size() + " videos.");
for(Video v : input) {
@ -112,16 +113,16 @@ public class DB {
updateVideo(v);
}
}
private void updateVideo(Video v) {
try {
String qu = "UPDATE `ytcrawler`.`videos` SET `length` = '" + v.length + "', `created` = '" + v.created + "', `langcode` = SUBSTR('" + v.languageCode + "', 1, 3) ,`category` = '" + v.categorie + "',`videotitle` = SUBSTR('" + v.title + "',1,100),`channel` = SUBSTR('" + v.channel + "',1,20),`tags` = '" + v.tags.substring(0, v.tags.length() > 40 ? 40 : v.tags.length()) + "' WHERE `id` = '" + v.id + "';";
update(qu);
} catch(NullPointerException e) {
}
}
public LinkedList<String> getUncompleted(int limit, int offset) {
LinkedList<String> out = new LinkedList<>();
String sql = "SELECT `id` FROM `videos` WHERE `channel` IS NULL LIMIT " + offset + "," + limit + ";";
@ -135,7 +136,7 @@ public class DB {
}
return out;
}
public void removeVideos(LinkedList<Video> vids) {
log.info("Delete " + vids.size() + " videos.");
for(Video s : vids) {
@ -148,8 +149,8 @@ public class DB {
* @param q
* @return Das resultSet der Query
*/
public ResultSet query(String q) {
public ResultSet query(String q) {
try {
if(con.isClosed()) {
connect(true);
@ -192,4 +193,27 @@ public ResultSet query(String q) {
}
return null;
}
public LinkedList<String> restoreTemp() {
ResultSet res = query("SELECT * FROM `ytcrawler`.`temp` LIMIT 0,500;");
LinkedList<String> out = new LinkedList<>();
log.info("RestoreTemp");
try {
while(res.next()) {
out.add(res.getString(1));
}
update("DELETE FROM `ytcrawler`.`temp` LIMIT 0,500;");
} catch (Exception e) {}
return out;
}
public void storeTemp(LinkedList<String> strings) {
if(!strings.isEmpty()) {
StringBuilder sb = new StringBuilder();
for(String s : strings) {
sb.append("'), ('").append(s);
}
update("INSERT IGNORE INTO `ytcrawler`.`temp` (`ytid`) VALUES ('" + sb.substring(6).toString() + "');");
}
}
}

View File

@ -1,63 +1,89 @@
package de.mrbesen.youtubecrawler;
import java.io.File;
import java.util.ArrayList;
import java.util.Random;
import java.util.Scanner;
import org.apache.log4j.Logger;
import de.mrbesen.telegram.TelegramAPI;
import de.mrbesen.telegram.commands.CommandHandler;
import de.mrbesen.telegram.event.EventHandler;
import de.mrbesen.telegram.event.EventListener;
import de.mrbesen.telegram.event.events.UserSendMessageEvent;
import de.mrbesen.telegram.objects.TUser;
public class Main implements CommandHandler{
public class Main implements CommandHandler, EventListener{
private ArrayList<TUser> admins = new ArrayList<>();
private String adminstr = null;
private long setadminstr = -1;
private static String abc = "abcdefghijklmnopqrstuvwxyz";
private Logger log = Logger.getLogger(this.getClass().getName());
private TelegramAPI tapi;
private Thread mainthread;
public static void main(String[] args) {
new Main().run();
}
Crawler cra;
private Crawler cra;
private void run() {
mainthread = Thread.currentThread();
//init Logger
new Log(); // init logging, set format etc
Logger log = Logger.getLogger(Main.class.getName());
//loading config
new Config(new File("crawl.conf"));
//starting crawler
cra = new Crawler();
Thread t = new Thread(cra, "Crawler");
t.start();
//starting BOT API
TelegramAPI tapi = new TelegramAPI(Config.prop.getProperty("telegramapi.key"));
tapi.getCommandManager().registerCommand("random", this);
tapi.setHelpText("Send the command /random to get a random video.");
tapi.start();
//CLI
Scanner s = new Scanner(System.in);
String in;
while((in= s.nextLine()) != null && t.isAlive()) {
if(in.equalsIgnoreCase("stop")) {
tapi.stop();
cra.stop();
break;
} else if(in.equalsIgnoreCase("add")) {
log.info("please enter ytid:");
String id = s.nextLine().trim();
if(id.length() > 9 && id.length() < 13) {
cra.addtoCrawl(id);
log.info("added.");
}
} else if(in.equalsIgnoreCase("stats")) {
log.info("Getting Stats");
cra.printStats();
}
new Log(); // init logging, set format etc
Logger log = Logger.getLogger(Main.class.getName());
//loading config
new Config(new File("crawl.conf"));
//starting crawler
cra = new Crawler();
Thread t = new Thread(cra, "Crawler");
t.start();
//starting BOT API
tapi = new TelegramAPI(Config.prop.getProperty("telegramapi.key"));
tapi.getCommandManager().registerCommand("random", this);
tapi.getCommandManager().registerCommand("admin", this);
tapi.getCommandManager().registerCommand("stats", this);
tapi.getCommandManager().registerCommand("stop", this);
tapi.getEventManager().registerEvent(this);
tapi.setHelpText("Send the command /random to get a random video.");
tapi.start();
//CLI
Scanner s = new Scanner(System.in);
String in;
while((in= s.nextLine()) != null && t.isAlive()) {
if(in.equalsIgnoreCase("stop")) {
stop();
break;
} else if(in.equalsIgnoreCase("add")) {
log.info("please enter ytid:");
String id = s.nextLine().trim();
if(id.length() > 9 && id.length() < 13) {
cra.addtoCrawl(id);
log.info("added.");
}
s.close();
log.info("Terminated.");
} else if(in.equalsIgnoreCase("stats")) {
log.info("Getting Stats");
for(String line : cra.printStats().split("\n")) {
log.info(line);
}
}
}
s.close();
log.info("Terminated.");
}
private void stop() {
tapi.stop();
cra.stop();
mainthread.interrupt();
}
@Override
@ -66,7 +92,54 @@ public class Main implements CommandHandler{
String ytid = cra.getDB().getRandom();
sender.sendMessage("https://youtube.com/watch?v=" + ytid);
return true;
} else if(cmd.equals("admin")) {
if(admins.contains(sender)) {
sender.sendMessage("You are admin.");
return true;
} else {
adminstr = getRandomStr(8);
setadminstr = System.currentTimeMillis();
log.info("Adminstr: " + adminstr);
}
} else if(cmd.equals("stats")) {
if(admins.contains(sender)) {
sender.sendMessage(cra.printStats());
return true;
}
} else if(cmd.equals("stop")) {
if(admins.contains(sender)) {
stop();
sender.sendMessage("Stop.");
log.info("Stopped via Telegram by " + sender.getFirstName());
return true;
}
}
return false;
}
private String getRandomStr(int length) {
Random rand = new Random();
String out = "";
for(int i = 0; i < length; i++) {
out += abc.charAt(rand.nextInt(abc.length()));
}
return out;
}
@EventHandler
public void onAdmin(UserSendMessageEvent e) {
if(adminstr != null && setadminstr > 0) {
if(e.getMessage() != null && (System.currentTimeMillis() - setadminstr) / 1000 < 60) {
if(e.getMessage().getText() != null) {
if(e.getMessage().getText().equals(adminstr)) {
admins.add(e.getUser());
e.getMessage().reply("You are now Admin!");
adminstr = null;
setadminstr = -1;
log.info(e.getUser().getFirstName() + " is now Admin!");
}
}
}
}
}
}

View File

@ -106,7 +106,7 @@ public class YoutubeAPI {
}
//Seconds
v.length += Integer.parseInt(timeparts[timeparts.length-1]);
} catch(NumberFormatException e) {//failed: P6DT17H59M53S and P15W3DT4H1M11S and P1W2DT20H47M55S video id: 1NPyC0psMaI
} catch(NumberFormatException e) {//failed: P6DT17H59M53S and P15W3DT4H1M11S and P1W2DT20H47M55S video id: 1NPyC0psMaI and P2W2DT23H58M58S video id: Jd9KjbRxhN4 For input string: "W2DT23"
log.warn("Error saving the time string: " + removeunwanted(split[1]) + " video id: " + v.id, e);
}
} else if(split[0].equals("publishedAt")) {