#!/usr/bin/env python3 from telethon import TelegramClient, sync from telethon.tl.types import User from timeit import default_timer as timer import configparser import out import math, datetime import colorama from colorama import Fore, Style class Stat: name = None def __init__(self, name): self.name = name def addMsg(self, msgnum, msg, chat): pass def getValue(self, count): out = '' all = self.getAll(count) if len(all) == 0: return '' if len(all) == 1: return next(iter(all)) for name, val in all.items(): if out: out = out + ", " out = out + name + ": " + str(val) return out def getAll(self, count): pass def parse(self, val): return str(val) class Countable(Stat): acc = 0 mult = 0 min = 1 << 31 max = 0 def __init__(self, name): self.acc = 0 self.mult = 0 self.min = 1 << 31 self.max = 0 super().__init__(name) def getMin(self): return self.parse(self.min) def getMax(self): return self.parse(self.max) def getAcc(self): return self.parse(self.acc) def getAvg(self, count): return self.parse(int(self.acc / count)) def getGAvg(self): return self.parse(math.sqrt(self.mult)) def getAll(self, count): return {'min': self.getMin(), 'max': self.getMax(), 'count': self.getAcc(), 'avg': self.getAvg(count), 'gavg': self.getGAvg()} def addMsg(self, msgnum, msg, chat): count = self.count(msgnum, msg, chat) if count > self.max: self.max = count if count < self.min: self.min = count self.acc = self.acc + count self.mult = self.mult + (count * count) def count(self, msgnum, msg, chat): pass class CharCount(Countable): def __init__(self, name: str = "CharCount"): super().__init__(name) def count(self, msgnum, msg, chat): if msg.message != None: return len(msg.message) return 0 class Dist(Countable): prev = 0 def __init__(self, name: str = "Distanz"): self.prev = 0 super().__init__(name) def count(self, msgnum, msg, chat): if msg.date != None: old = self.prev self.prev = msg.date if old == 0: return 0 return int((old - msg.date).total_seconds()) def parse(self, val): return str(datetime.timedelta(seconds=val)) def getAvg(self, count): if count < 2: return -1 return self.parse(int( self.acc / (count - 1))) # da hier die abstΓ€nde zwischen nachrichten betrachtet werden ist es um 1 kleiner class DiscreteCount(Stat): list = {} def __init__(self, name): self.list = {} super().__init__(name) def getAll(self, count): return self.list class UserCount(DiscreteCount): def __init__(self, name: str = "Msg/User"): super().__init__(name) def addMsg(self, msgnum, msg, chat): global me fromid = msg.from_id self.list[fromid] = self.list.get(fromid, 0) + 1 def getAll(self, count): # replace ids with names newdict = {} for id, count in self.list.items(): newdict[getUsernamebyID(id)] = count return newdict def printDialog(interid, d): # debug only color = '' if interid & 1: color = Fore.LIGHTBLACK_EX print(color + '{0:3d}| {1:14d} | {2:30} | {3:1}'.format(interid, d.id, getDialogType(d) + d.name, d.pinned) + Fore.RESET) def getDialogType(dialog): if dialog.id < 0: # group return 'πŸ‘₯' ent = dialog.entity if isinstance(ent, User): if ent.bot: return 'πŸ€–' return 'πŸ‘€' def getUsernamebyID(userid): global dialogs for d in dialogs: if d.id == userid: return d.name return "Unknown" # refresh dialog cache? def dialogByTgID(tgid): global dialogs for d in dialogs: if d.id == tgid: return d return None def dialogByTgName(name): name = name.lower() global dialogs for d in dialogs: if d.name.lower() == name: return d return None def analyseChat(dialog, output): print(Fore.GREEN + 'Lade kompletten Chat: ' + Fore.RESET, dialog.name, sep='') # chat = client.get_messages(selectedDialog) chat = client.iter_messages(dialog, limit=None) stats = { CharCount(), Dist(), UserCount() } """print('before' + Fore.MAGENTA) outputs['stdout'].print(stats, 10, dialog) print(Fore.RESET + 'after' + Fore.BLUE)""" # run messure msgnum = 0 for msg in chat: if msg != None: msgnum = msgnum + 1 for stat in stats: stat.addMsg(msgnum, msg, dialog) print(Fore.BLUE, msgnum, Fore.MAGENTA + 'Nachrichten Analysiert' + Fore.RESET) # write to output output.print(stats, msgnum, dialog) def parseInput(uinput): selected = [] if uinput == 'all': selected = dialogs else: splited = uinput.split(',') for entry in splited: entry = entry.strip() if not entry: # empty string continue if entry.isnumeric(): get = int(entry) if get >= 0 and get < dialogCount: # use id selected.append(dialogs[get]) continue # search by id bytgid = dialogByTgID(get) if bytgid is not None: selected.append(bytgid) continue # search by name bytgname = dialogByTgName(entry) if bytgname is not None: selected.append(bytgname) continue print(Fore.YELLOW + 'Kein Chat fΓΌr Eingabe gefunden: ' + Fore.BLUE, entry, Fore.RESET) raise ValueError('unknown chat') return selected # ================================== # MAIN program if __name__ == "__main__": outputs = { "stdout": out.STDOUT(), "json": out.jsonOut() # TODO: add csv,xml,yaml maybe sqlite or mysql? } # read config try: config = configparser.ConfigParser() config.read('config.ini') api_id = int(config.get('Main', 'api_id')) api_hash = config.get('Main', 'api_hash') session_name = config.get('Main', 'user') except (configparser.NoSectionError, configparser.NoOptionError, ValueError): print('invalid config.ini') exit(3) client = TelegramClient(session_name, api_id, api_hash) client.start() me = client.get_me() print('me.id: ', me.id) # get dialogs dialogs = client.get_dialogs() dialogCount = len(dialogs) print(Fore.YELLOW, dialogCount, Fore.GREEN + 'Chats geladen.' + Fore.RESET) # select output while True: uinput = input("Bitte Ausgabemethode wΓ€hlen: [stdout,json] (stdout): ").strip().lower(); if uinput == "": uinput = "stdout" try: out = outputs[uinput] break except KeyError: pass # try again outfile = None if out.needsFilename: outfile = input("Bitte Dateinamen eingeben: ").strip() out.open(outfile) # select chat print( ' ID| Internal ID | Username | pinned\nβ€”β€”β€”+β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”β€”') interid = 0 for d in dialogs: printDialog(interid, d) interid = interid + 1 while True: uinput = input( 'Bitte ' + Fore.BLUE + 'chatid1,chatid2,...' + Fore.RESET + ' oder "' + Fore.BLUE + 'all' + Fore.RESET + '" eingeben: ').strip().lower() try: selected = parseInput(uinput) break except: pass # some error, retry start = timer() for d in selected: try: analyseChat(d, out) except Exception as e: print(Fore.RED + 'Fehler beim bearbeiten von: ' + Fore.RESET, d.name, '\n', e) print(e) out.close() took = datetime.timedelta(seconds=(timer() - start)) took = datetime.timedelta(seconds=int(took.total_seconds())) print(Fore.GREEN + "Fertig. BenΓΆtigte Zeit: " + Fore.YELLOW, took, Style.RESET_ALL)