YoutubeCrawler/src/de/mrbesen/youtubecrawler/Crawler.java

358 lines
11 KiB
Java

package de.mrbesen.youtubecrawler;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Scanner;
import org.apache.log4j.Logger;
public class Crawler implements Runnable {
private int jobspeerthread = 100; //the amount of jobs a thread get peer request
private LinkedList<String> toSave = new LinkedList<>();//all found ytids, witch need to be analysed
private LinkedList<String> toCrawl = new LinkedList<>();//all videos tu crawl
private LinkedList<String> toknown = new LinkedList<>();//list with all videos, to test if they are allready known, if not they are moved to tocrawle
private List<CrawlerThread> threads;//list of all threads
private List<CrawlerThread> requested = new LinkedList<>();
private static DateFormat dateform = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss");
private String currentstate = "undefined";
private long start;
private boolean crawl = true;
private int crawlcount = 0;
private int updateOffset = 0;
private DB db = new DB();
private YoutubeAPI api = new YoutubeAPI();
private File crawlfile = new File("crawl.txt");
private Logger log = Logger.getLogger(this.getClass().getName());
private int maxvideostotest = 100;
private int startup = 2;//to keep the beginning cool
public Crawler() {
try {
maxvideostotest = Integer.parseInt(Config.prop.getProperty("crawler.maxvideos"));
} catch(NumberFormatException e) {
log.warn("could not read the number \"" + Config.prop.getProperty("crawler.maxvideos") + "\" from the config file. maxvideo");
maxvideostotest = 100;
}
try {
jobspeerthread = Integer.parseInt(Config.prop.getProperty("crawler.jobspeerthread"));
} catch(NumberFormatException e) {
log.warn("could not read the number \"" + Config.prop.getProperty("crawler.jobspeerthread") + "\" from the config file. maxvideo");
jobspeerthread = 100;
}
}
public void stop() {
crawl = false;
db.stop();
}
public synchronized void addtoCrawl(String videoid) {
if(! (toCrawl.contains(videoid) || toknown.contains(videoid)))
toknown.add(videoid);
}
public boolean isCrawling() {
return crawl;
}
public void request(CrawlerThread t) {
if(!toCrawl.isEmpty()) {
send(t);
} else {
requested.add(t);
}
}
private void send(CrawlerThread t) {
// listlock.writeLock().lock();
for(int i = 0; i < jobspeerthread && !toCrawl.isEmpty(); i++) {
t.todo.add(toCrawl.removeFirst());
}
// listlock.writeLock().unlock();
t.requested = false;
}
@Override
public void run() {
currentstate = "loading crawlfile";
start = System.currentTimeMillis();
log.info("Try to load crawlfile");
if(crawlfile.exists()) {
try {
Scanner in = new Scanner(crawlfile);
boolean crawl = true;//section of file
while(in.hasNextLine()) {
String line = in.nextLine();
if(line == null) {
break;
} else {
if(!line.isEmpty()) {
if(line.equals("-")) {//section delimiter
crawl = false;
} else {
if(crawl) {
toCrawl.add(line);
} else {
toknown.add(line);
}
}
}
}
}
in.close();
} catch(IOException e) {
log.warn("Error while loading crawl file.");
e.printStackTrace();
}
}
//populate threads
int threadcount = 4;
try {
threadcount = Integer.parseInt(Config.prop.getProperty("crawler.threadcount"));
} catch(NumberFormatException e) {
log.warn("Could not read the Number \"" + Config.prop.getProperty("crawler.threadcount") + "\" from the Config.");
}
threads = new ArrayList<>(threadcount);
for(int i = 0; i < threadcount; i++) {
CrawlerThread thr = new CrawlerThread( this);
thr.setThread(new Thread(thr, "Crawler #" + i));
threads.add(thr);
thr.thread.start();
}
long lastdoubledelete = System.currentTimeMillis();
db.deleteDouble();
while(crawl) {
log.info("to Crawl: " + toCrawl.size() + " known: " + toknown.size() + " Time: " + dateform.format(new Date()));
try {
//fullfill request
while(!requested.isEmpty() && !toCrawl.isEmpty() && crawl) {
log.info("fullfill request");
currentstate = "fullfill requests";
send(requested.remove(0));
}
//kindof idle
while(toCrawl.size() > (jobspeerthread * threads.size()) && crawl && requested.isEmpty()) {
startup = 0;//stop startup count
currentstate = "idle";
if((System.currentTimeMillis() - lastdoubledelete) / 1000 > 1800) {
db.deleteDouble();
lastdoubledelete = System.currentTimeMillis();
} else {
Thread.yield();
try {
Thread.sleep(100);
} catch(InterruptedException ignored) {
break;
}
}
// updateDB();
}
//nothing left?
if(toknown.isEmpty() && toCrawl.isEmpty() && requested.size() == threads.size()) {//very uncommon
log.warn("nothing left to crawl");
crawl = false;
}
//refil the tocrawl list.
if(!toknown.isEmpty()) {
//check in db for known videos
log.info("Checking the DB");
currentstate = "get new tocrawl";
// listlock.writeLock().lock();
while(toCrawl.size() < jobspeerthread * threads.size() * 2 && crawl && !toknown.isEmpty()) {
LinkedList<String> tocheck = new LinkedList<>();
for(int i = 0; i < toknown.size() && i < maxvideostotest; i++) {
tocheck.add(toknown.removeFirst());
}
toCrawl.addAll(db.checkvideos(tocheck));
}
// listlock.writeLock().unlock();
}
if(toknown.size() < threadcount * jobspeerthread * 20 && crawl) {
currentstate = "restoretemp";
log.info("restoreTemp");
LinkedList<String> rest = db.restoreTemp();
toknown.addAll(rest);
}
//writing crawlfile
log.info("Writing Crawlfile");
currentstate = "writing crawlfile";
// listlock.writeLock().lock();
try {
PrintWriter p = new PrintWriter(new BufferedWriter(new FileWriter(crawlfile)));
for(String t : toCrawl) {
p.println(t);
}
p.println("-");
for(String t : toknown) {
p.println(t);
}
p.close();
} catch (IOException e) {
log.error("Error writing crawlfile.", e);
}
//get reports
currentstate = "get report";
log.info("get report");
int count = 0;
for (CrawlerThread crawlerThread : threads) {
LinkedList<String>[] report = crawlerThread.report();
crawlcount+= report[0].size();
toSave.addAll(report[0]);
crawlerThread.crawled.clear();
while(report[1].size() > 1) {//2 videos werden ggf. gelöscht ohne gesehen zu werden.
LinkedList<String> store = new LinkedList<>();
try {
while(!report[1].isEmpty() && store.size() < 50) {
store.add(report[1].removeFirst());
count++;
}
} catch(NoSuchElementException ignored) {//concurrentmodification fuckery
log.info("no suchelement bla");
}
db.storeTemp(store);
}
log.info(count + " videos added.");
crawlerThread.found.clear();
crawlerThread.thread.interrupt();//free from lock
}
long runtimes = (System.currentTimeMillis() - start) / 1000;
if(runtimes < 0)
runtimes = 1;
float vidps = (crawlcount / (float) runtimes);//videos per second
Main.getMain().broadcastAdmin(vidps + "v/s " + crawlcount + " total V");
//save to db
currentstate = "save to DB";
log.info("save " + toSave.size() + " videos to DB.");
while(!toSave.isEmpty()) {
LinkedList<String> videoids = new LinkedList<>();
for(int i = 0; i < 50 && !toSave.isEmpty(); i++) {
videoids.add(toSave.remove(0));
}
if(videoids.size() > 0) {
List<Video> videos = api.getInfos(videoids)[0];
db.addVideos(videos);
}
}
//at the beginning there is maybe just one video to crawl, so keep it calm.
if(startup > 0) {
startup --;
currentstate = "startup sleep";
log.info("startup sleep");
try {
Thread.sleep(2000);
} catch(InterruptedException e) {}
}
} catch(Throwable t) {
log.warn("exception in Crawler!", t);
StringBuilder sb = new StringBuilder();
for(StackTraceElement elem : t.getStackTrace()) {
sb.append(elem.getFileName() + "(").append(elem.getMethodName() + ":").append(elem.getLineNumber() + ")\n");
}
Main.getMain().broadcastAdmin("Excpetion in crawler: " + t.toString() + "\n" + sb.toString() );
crawl = false;
Main.getMain().stop();
}
}
db.deleteDouble();
//end
long runtimes = (System.currentTimeMillis() - start) / 1000;
if(runtimes < 0)
runtimes = 1;
int runtimem = (int) (runtimes / 60);
float vidps = (crawlcount / (float) runtimes);//videos per second
log.info("Crawling Stopped. Runtime: " + runtimem + "min and " + crawlcount + " videos crawled. ( " + vidps + " v/s )");
}
public DB getDB() {
return db;
}
public static Video getVideo() {
return new Video();
}
public String getStats() {
long runtimes = (System.currentTimeMillis() - start) / 1000;
if(runtimes < 0)
runtimes = 1;
float vidps = (crawlcount / (float) runtimes);//videos per second
int runtimem = (int) (runtimes / 60);
String out = "";
out += "ToCrawl: " + toCrawl.size();
out += "\nToknown: " + toknown.size();
out += "\nToSave: " + toSave.size();
out += "\nrequested: " + requested.size();
out += "\nRuntime: " + runtimem + "min and " + crawlcount + " videos crawled. ( " + vidps + " v/s )";
out += "\nState: " + currentstate;
out += "\nThread Nr, todo size, requested, crawledsize, foundsize";
for (int i = 0; i < threads.size(); i++) {
CrawlerThread thre = threads.get(i);
out += "\n " + i + " " + thre.todo.size() + " " + thre.requested + " " + thre.crawled.size() + " " + thre.found.size();
}
return out;
}
/**
* Updates old entrys of the DB. currently unused.
*/
private void updateDB() {
log.info("updating DB Offset= " + updateOffset);
LinkedList<String> vids = db.getUncompleted(50, updateOffset);
LinkedList<Video>[] infos = api.getInfos(vids);
if(infos != null) {
int size = infos[0].size() + infos[1].size();
if(size < 50) {
updateOffset += ((50-size)/2)+1;
}
if(infos[1].size() > 0) {
log.info("delete " + infos[1].size() + " livestreams");
db.removeVideos(infos[1]);
}
db.updateVideos(infos[0]);
log.info("Updated " + infos[0].size() + " Videos.");
}
}
public static class Video {
String id = "";
String title = "";
String channel = "";
String tags = "";
int length = 0;//the length of the video in seconds
String languageCode = "";
byte categorie = 0;
long created = 0;
boolean live = false;
}
}