Files
wevads-platform/scripts/harvest_send_data.py
2026-02-26 04:53:11 +01:00

339 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
"""
HARVEST - Extraction de Send Data depuis les seeds inbox
Extrait: headers, body, patterns des emails reçus
"""
import sys
import imaplib
import email
from email.header import decode_header
import json
import re
import psycopg2
from datetime import datetime, timedelta
DB_CONFIG = {
'host': 'localhost',
'database': 'adx_system',
'user': 'admin',
'password': 'admin123'
}
def get_db():
return psycopg2.connect(**DB_CONFIG)
def decode_mime_header(header):
"""Décode un header MIME"""
if not header:
return ""
decoded = decode_header(header)
result = ""
for part, charset in decoded:
if isinstance(part, bytes):
result += part.decode(charset or 'utf-8', errors='ignore')
else:
result += part
return result
def extract_email_data(msg):
"""Extrait toutes les données d'un email"""
data = {
'from_name': '',
'from_email': '',
'reply_to': '',
'return_path': '',
'subject': '',
'headers': {},
'x_mailer': '',
'list_unsubscribe': '',
'content_type': '',
'body_html': '',
'body_text': '',
'links': [],
'images': []
}
# From
from_header = decode_mime_header(msg.get('From', ''))
match = re.match(r'"?([^"<]*)"?\s*<?([^>]*)>?', from_header)
if match:
data['from_name'] = match.group(1).strip()
data['from_email'] = match.group(2).strip()
else:
data['from_email'] = from_header
# Other headers
data['reply_to'] = msg.get('Reply-To', '')
data['return_path'] = msg.get('Return-Path', '').strip('<>')
data['subject'] = decode_mime_header(msg.get('Subject', ''))
data['x_mailer'] = msg.get('X-Mailer', '')
data['list_unsubscribe'] = msg.get('List-Unsubscribe', '')
data['content_type'] = msg.get('Content-Type', '')
# Collect all headers
important_headers = ['DKIM-Signature', 'X-Priority', 'X-MSMail-Priority',
'X-Mailer', 'X-MimeOLE', 'X-Originating-IP',
'Authentication-Results', 'Received-SPF',
'List-Unsubscribe', 'List-Unsubscribe-Post',
'Feedback-ID', 'X-SES-Outgoing', 'X-PM-Message-Id']
for hdr in important_headers:
val = msg.get(hdr)
if val:
data['headers'][hdr] = val
# Body
if msg.is_multipart():
for part in msg.walk():
ctype = part.get_content_type()
if ctype == 'text/html':
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or 'utf-8'
data['body_html'] = payload.decode(charset, errors='ignore')
elif ctype == 'text/plain':
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or 'utf-8'
data['body_text'] = payload.decode(charset, errors='ignore')
else:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or 'utf-8'
if 'html' in msg.get_content_type():
data['body_html'] = payload.decode(charset, errors='ignore')
else:
data['body_text'] = payload.decode(charset, errors='ignore')
# Extract links from HTML
if data['body_html']:
links = re.findall(r'href=["\']([^"\']+)["\']', data['body_html'], re.IGNORECASE)
data['links'] = list(set(links))[:20] # Limit to 20
images = re.findall(r'src=["\']([^"\']+)["\']', data['body_html'], re.IGNORECASE)
data['images'] = [img for img in images if any(ext in img.lower() for ext in ['.png', '.jpg', '.gif', '.jpeg'])][:10]
return data
def save_pattern(data, source='harvest', source_id=None, isp=None):
"""Sauvegarde un pattern en DB"""
conn = get_db()
cur = conn.cursor()
try:
cur.execute("""
INSERT INTO admin.send_data_patterns
(source, source_id, isp_target, from_name, from_email, reply_to, return_path,
headers_json, x_mailer, list_unsubscribe, content_type, subject_template,
body_html, body_text, links_json, images_json)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (
source, source_id, isp,
data['from_name'], data['from_email'], data['reply_to'], data['return_path'],
json.dumps(data['headers']), data['x_mailer'], data['list_unsubscribe'],
data['content_type'], data['subject'],
data['body_html'][:50000] if data['body_html'] else None, # Limit size
data['body_text'][:10000] if data['body_text'] else None,
json.dumps(data['links']), json.dumps(data['images'])
))
conn.commit()
return cur.fetchone()[0]
except Exception as e:
print(f"DB Error: {e}")
conn.rollback()
return None
finally:
cur.close()
conn.close()
def harvest_from_seed(email_addr, password, imap_host, imap_port=993, limit=10, isp=None):
"""Extrait les emails d'un seed"""
patterns = []
try:
print(f" 📥 Connexion à {imap_host}...")
imap = imaplib.IMAP4_SSL(imap_host, imap_port)
imap.login(email_addr, password)
imap.select('INBOX')
# Chercher les emails récents (7 derniers jours)
date = (datetime.now() - timedelta(days=7)).strftime("%d-%b-%Y")
_, msg_nums = imap.search(None, f'(SINCE {date})')
msg_ids = msg_nums[0].split()[-limit:] if msg_nums[0] else []
print(f" 📧 {len(msg_ids)} emails à traiter...")
for msg_id in msg_ids:
_, msg_data = imap.fetch(msg_id, '(RFC822)')
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
data = extract_email_data(msg)
# Ne garder que les emails marketing/newsletter
if data['from_email'] and (data['list_unsubscribe'] or
any(kw in data['from_email'].lower() for kw in ['news', 'market', 'promo', 'info', 'contact', 'no-reply'])):
source_id = f"{email_addr}:{msg_id.decode()}"
pattern_id = save_pattern(data, 'harvest', source_id, isp)
if pattern_id:
patterns.append({
'id': pattern_id,
'from': data['from_email'],
'subject': data['subject'][:50]
})
print(f" ✅ Pattern #{pattern_id}: {data['from_email']}")
imap.logout()
except Exception as e:
print(f" ❌ Erreur: {e}")
return patterns
def harvest_all_seeds(limit_seeds=10, limit_emails=5):
"""Harvest tous les seeds actifs"""
conn = get_db()
cur = conn.cursor()
cur.execute("""
SELECT email, password, imap_host, imap_port, isp
FROM admin.brain_seeds
WHERE is_active = true
AND password IS NOT NULL
AND imap_host IS NOT NULL
LIMIT %s
""", (limit_seeds,))
seeds = cur.fetchall()
cur.close()
conn.close()
print(f"🌾 HARVEST - {len(seeds)} seeds à traiter")
print("=" * 50)
total_patterns = 0
for seed in seeds:
email_addr, password, imap_host, imap_port, isp = seed
print(f"\n📬 {email_addr} ({isp}):")
patterns = harvest_from_seed(email_addr, password, imap_host, imap_port or 993, limit_emails, isp)
total_patterns += len(patterns)
print(f"\n{'=' * 50}")
print(f"✅ Total patterns extraits: {total_patterns}")
return total_patterns
def harvest_from_competitor_subs(limit=5):
"""Harvest depuis les abonnements concurrents"""
conn = get_db()
cur = conn.cursor()
cur.execute("""
SELECT email, password, imap_host, competitor_name, isp
FROM admin.competitor_subscriptions
WHERE is_active = true
LIMIT %s
""", (limit,))
subs = cur.fetchall()
cur.close()
conn.close()
if not subs:
print("⚠️ Aucun abonnement concurrent configuré")
return 0
print(f"🕵️ COMPETITOR HARVEST - {len(subs)} abonnements")
print("=" * 50)
total = 0
for sub in subs:
email_addr, password, imap_host, competitor, isp = sub
print(f"\n🎯 {competitor} ({email_addr}):")
patterns = harvest_from_seed(email_addr, password, imap_host or 'outlook.office365.com', 993, 10, isp)
total += len(patterns)
# Update last_check
conn = get_db()
cur = conn.cursor()
cur.execute("UPDATE admin.competitor_subscriptions SET last_check_at = NOW(), emails_received = emails_received + %s WHERE email = %s",
(len(patterns), email_addr))
conn.commit()
cur.close()
conn.close()
print(f"\n✅ Total patterns concurrents: {total}")
return total
def show_stats():
"""Affiche les stats Send Data"""
conn = get_db()
cur = conn.cursor()
cur.execute("SELECT source, COUNT(*) FROM admin.send_data_patterns GROUP BY source ORDER BY count DESC")
by_source = cur.fetchall()
cur.execute("SELECT isp_target, COUNT(*) FROM admin.send_data_patterns WHERE isp_target IS NOT NULL GROUP BY isp_target ORDER BY count DESC LIMIT 10")
by_isp = cur.fetchall()
cur.execute("SELECT COUNT(*), COUNT(CASE WHEN is_validated THEN 1 END) FROM admin.send_data_patterns")
totals = cur.fetchone()
cur.close()
conn.close()
print("📊 SEND DATA STATS")
print("=" * 40)
print(f"Total patterns: {totals[0]}")
print(f"Validés: {totals[1]}")
print("\nPar source:")
for src, cnt in by_source:
print(f" {src}: {cnt}")
print("\nPar ISP:")
for isp, cnt in by_isp:
print(f" {isp}: {cnt}")
def main():
if len(sys.argv) < 2:
print("""
Usage:
python3 harvest_send_data.py seeds [limit_seeds] [limit_emails] - Harvest depuis seeds
python3 harvest_send_data.py competitors - Harvest concurrents
python3 harvest_send_data.py stats - Voir statistiques
python3 harvest_send_data.py test <email> <password> <imap_host> - Test un compte
""")
return
cmd = sys.argv[1].lower()
if cmd == 'seeds':
limit_seeds = int(sys.argv[2]) if len(sys.argv) > 2 else 10
limit_emails = int(sys.argv[3]) if len(sys.argv) > 3 else 5
harvest_all_seeds(limit_seeds, limit_emails)
elif cmd == 'competitors':
harvest_from_competitor_subs()
elif cmd == 'stats':
show_stats()
elif cmd == 'test':
if len(sys.argv) < 5:
print("Usage: harvest_send_data.py test <email> <password> <imap_host>")
return
patterns = harvest_from_seed(sys.argv[2], sys.argv[3], sys.argv[4])
print(f"{len(patterns)} patterns extraits")
if __name__ == '__main__':
main()