#!/usr/bin/env python3 from telethon import TelegramClient, sync from telethon.tl.types import User from timeit import default_timer as timer import configparser import out import math, datetime from colorama import Fore, Style from collections import Counter import re class Stat: name = None def __init__(self, name): self.name = name def addMsg(self, msgnum, msg, chat): pass def getValue(self, count): out = '' all = self.getAll(count) if len(all) == 0: return '' if isinstance(all, list) and len(all) == 1: return next(iter(all)) for name, val in all.items(): if out: out = out + ", " out = out + name + ": " + str(val) return out def getAll(self, msgcount): pass def parse(self, val): return str(val) class Countable(Stat): acc = 0 #mult = 0 min = 1 << 31 max = 0 def __init__(self, name): self.acc = 0 #self.mult = 0 self.min = 1 << 31 self.max = 0 super().__init__(name) def getMin(self): return self.parse(self.min) def getMax(self): return self.parse(self.max) def getAcc(self): return self.parse(self.acc) def getAvg(self, count): return self.parse(int(self.acc / count)) # def getGAvg(self): # return self.parse(math.sqrt(self.mult)) def getAll(self, msgcount): return {'min': self.getMin(), 'max': self.getMax(), 'count': self.getAcc(), 'avg': self.getAvg(msgcount)} # ,'gavg': self.getGAvg()} def addMsg(self, msgnum, msg, chat): count = self.count(msgnum, msg, chat) if count > self.max: self.max = count if count < self.min: self.min = count self.acc = self.acc + count # self.mult = self.mult + (count * count) def count(self, msgnum, msg, chat): pass class CharCount(Countable): def __init__(self, name: str = "CharCount"): super().__init__(name) def count(self, msgnum, msg, chat): if msg.message != None: return len(msg.message) return 0 class Dist(Countable): prev = 0 def __init__(self, name: str = "Distanz"): self.prev = 0 super().__init__(name) def count(self, msgnum, msg, chat): if msg.date != None: old = self.prev self.prev = msg.date if old == 0: return 0 return int((old - msg.date).total_seconds()) def parse(self, val): return str(datetime.timedelta(seconds=val)) def getAvg(self, count): if count < 2: return -1 return self.parse(int( self.acc / (count - 1))) # da hier die abstände zwischen nachrichten betrachtet werden ist es um 1 kleiner class DiscreteCount(Stat): list = {} def __init__(self, name): self.list = {} super().__init__(name) def getAll(self, count): return self.list def inc(self, key): self.list[key] = self.list.get(key, 0) + 1 class UserCount(DiscreteCount): def __init__(self, name: str = "Msg/User"): super().__init__(name) def addMsg(self, msgnum, msg, chat): self.inc(msg.from_id) def getAll(self, msgcount): # replace ids with names newdict = {} for id, count in self.list.items(): newdict[getUsernamebyID(id)] = count return newdict # prozentuale discrete metric class UserProp(UserCount): def __init__(self, name: str = "%/User"): super().__init__(name) self.counter = UserCount("") def addMsg(self, msgnum, msg, chat): self.counter.addMsg(msgnum, msg, chat) def getAll(self, msgcount): # replace ids with names newdict = {} for id, count in self.list.items(): name = getUsernamebyID(id) if count: newdict[name] = count/self.counter.list.get(id) else: newdict[name] = 0 return newdict # user welche selten nachrichten hinter einander schreiben # 1 = der user schreibt nie 2 Nachrichten hintereinander, 0 = der User schreibt immer alle Nachrichten hintereinander, ohne das einer dazwischen quatscht class Interactivity(UserProp): def __init__(self, name : str = "User Interactivity"): super().__init__(name) self.lastmsg = None # nachrichten werden von jüngster zu ältester durch gegangen def addMsg(self, msgnum, msg, chat): super().addMsg(msgnum, msg, chat) fromid = msg.from_id if self.lastmsg: if self.lastmsg != fromid: self.inc(fromid) self.lastmsg = fromid class UserEdits(UserCount): def __init__(self): super().__init__("Edits/User") def addMsg(self, msgnum, msg, chat): if msg.edit_date: self.inc(msg.from_id) class UserAwnsers(UserCount): def __init__(self): super().__init__("AwnsersFromUser") def addMsg(self, msgnum, msg, chat): if msg.is_reply: self.inc(msg.from_id) class MediaCounter(UserCount): def __init__(self): super().__init__("Media/User") def addMsg(self, msgnum, msg, chat): if msg.media: self.inc(msg.from_id) # How many messages from a user are scheduled? # class ScheduledCounter(UserCount): # def __init__(self): # super().__init__("Schedules/User") # # def addMsg(self, msgnum, msg, chat): # if msg.from_scheduled: # self.inc(msg.from_id) class MaxCounter(DiscreteCount): count = 0 def __init__(self, name, c: int = 5): super().__init__(name) self.count = c def getAll(self, msgcount): out = Counter(self.list) return dict(out.most_common(self.count)) class WordCounter(MaxCounter): def __init__(self, name: str = "WordCounts", c: int = 5): super().__init__(name, c) def addMsg(self, msgnum, msg, chat): txt = msg.message if txt is not None: txt = re.findall(r'\w+', txt.lower()) for i in txt: i = i.strip().lower() if i: self.inc(i) class MentionCounter(MaxCounter): def __init__(self, name: str = "MentionCounter", c: int = 5): super().__init__(name, c) def addMsg(self, msgnum, msg, chat): txt = msg.message if txt is not None: txt = re.findall(r'\s@\w{5,}', txt.lower()) for i in txt: i = i.strip().lower() if i: self.inc(i) class MaxCharCounter(MaxCounter): Blacklist = "" def __init__(self, name: str = "MaxCharCounter", c: int = 5): super().__init__(name, c) def addMsg(self, msgnum, msg, chat): txt = msg.message if txt is not None: txt = txt.replace(" ", "").lower() for i in txt: if self.checkChar(i): if i not in self.Blacklist: self.inc(i) def checkChar(self, char): return True class EmojiCounter(MaxCharCounter): Blacklist = "äÄöÖüÜßẞ̒ ͎'^°~`…·–÷×" def __init__(self, name: str = "EmojiCounter", c: int = 5): super().__init__(name, c) def checkChar(self, char): return ord(char) > 0x7F def printDialog(interid, d): # debug only color = '' if interid & 1: color = Fore.LIGHTBLACK_EX print(color + '{0:3d}| {1:14d} | {2:30} | {3:1}'.format(interid, d.id, getDialogType(d) + d.name, d.pinned) + Fore.RESET) def getDialogType(dialog): if dialog.id < 0: # group return '👥' ent = dialog.entity if isinstance(ent, User): if ent.bot: return 'BOT ' return '👤' def getUsernamebyID(userid): global idlookup try: return idlookup[userid] except KeyError: global dialogs for d in dialogs: if d.id == userid: return d.name return "Unknown" def dialogByTgID(tgid): global dialogs for d in dialogs: if d.id == tgid: return d return None def dialogByTgName(name): name = name.lower() global dialogs for d in dialogs: if d.name.lower() == name: return d return None def analyseChat(dialog, output): print(Fore.GREEN + 'Lade kompletten Chat: ' + Fore.RESET, dialog.name, sep='') # chat = client.get_messages(selectedDialog) chat = client.iter_messages(dialog, limit=None) statList = { CharCount(), Dist(), UserCount(), # oldest msg?, media types, most linked site UserEdits(), WordCounter(), MentionCounter(), # ScheduledCounter(), EmojiCounter(), UserAwnsers(), MediaCounter(), Interactivity() } """print('before' + Fore.MAGENTA) outputs['stdout'].print(stats, 10, dialog) print(Fore.RESET + 'after' + Fore.BLUE)""" # run messure msgnum = 0 for msg in chat: # reinfolge ist sehr wichtig! jüngste -< älteste if msg != None: msgnum = msgnum + 1 for stat in statList: stat.addMsg(msgnum, msg, dialog) print(Fore.BLUE, msgnum, Fore.MAGENTA + 'Nachrichten Analysiert' + Fore.RESET) # write to output output.print(statList, msgnum, dialog) def parseInput(uinput): selected = [] if uinput == 'all': selected = dialogs else: splited = uinput.split(',') for entry in splited: entry = entry.strip() if not entry: # empty string continue if entry.isnumeric(): get = int(entry) if get >= 0 and get < dialogCount: # use id selected.append(dialogs[get]) continue # search by id bytgid = dialogByTgID(get) if bytgid is not None: selected.append(bytgid) continue # search by name bytgname = dialogByTgName(entry) if bytgname is not None: selected.append(bytgname) continue print(Fore.YELLOW + 'Kein Chat für Eingabe gefunden: ' + Fore.BLUE, entry, Fore.RESET) raise ValueError('unknown chat') return selected #returns a dict with a id -> name maping def getUsers(dialog): global client parts = client.get_participants(dialog) outdict = {} for p in parts: name = p.first_name if(p.last_name != None): name += " " + p.last_name outdict[p.id] = name return outdict # ================================== # MAIN program if __name__ == "__main__": outputs = { "stdout": out.STDOUT(), "json": out.jsonOut() # TODO: add csv,xml,yaml maybe sqlite or mysql? } # read config try: config = configparser.ConfigParser() config.read('config.ini') api_id = int(config.get('Main', 'api_id')) api_hash = config.get('Main', 'api_hash') session_name = config.get('Main', 'user') except (configparser.NoSectionError, configparser.NoOptionError, ValueError): print('invalid config.ini') exit(3) client = TelegramClient(session_name, api_id, api_hash) client.start() me = client.get_me() print('me.id: ', me.id) # get dialogs dialogs = client.get_dialogs() dialogCount = len(dialogs) print(Fore.YELLOW, dialogCount, Fore.GREEN + 'Chats geladen.' + Fore.RESET) # select output while True: uinput = input("Bitte Ausgabemethode wählen: [stdout,json] (stdout): ").strip().lower(); if uinput == "": uinput = "stdout" try: out = outputs[uinput] break except KeyError: pass # try again outfile = None if out.needsFilename: outfile = input("Bitte Dateinamen eingeben: ").strip() out.open(outfile) # select chat print( ' ID| Internal ID | Username | pinned\n———+————————————————+————————————————————————————————+———————') interid = 0 for d in dialogs: printDialog(interid, d) interid = interid + 1 while True: uinput = input( 'Bitte ' + Fore.BLUE + 'chatid1,chatid2,...' + Fore.RESET + ' oder "' + Fore.BLUE + 'all' + Fore.RESET + '" eingeben: ').strip().lower() try: selected = parseInput(uinput) break except: pass # some error, retry start = timer() idlookup = {} for d in selected: try: newusers = getUsers(d) idlookup.update(newusers) analyseChat(d, out) except Exception as e: print(Fore.RED + 'Fehler beim bearbeiten von: ' + Fore.RESET, d.name, '\n', str(e)) out.close() took = datetime.timedelta(seconds=(timer() - start)) took = datetime.timedelta(seconds=int(took.total_seconds())) print(Fore.GREEN + "Fertig. Benötigte Zeit: " + Fore.YELLOW, took, Style.RESET_ALL)