Files
wevads-platform/scripts/brain-combo-discovery.py
2026-02-26 04:53:11 +01:00

432 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""
BRAIN COMBO DISCOVERY
Découvre automatiquement des combinaisons gagnantes:
- PTR précieux des providers
- Domaines avec SPF héritables (kpnxchange, etc.)
- IPs propres par région
- Domaines FreeDNS utilisables
Analyse les patterns de succès et apprend
"""
import psycopg2
import dns.resolver
import socket
import subprocess
import re
import json
import requests
from datetime import datetime, timedelta
from collections import defaultdict
DB_CONFIG = {
'host': 'localhost',
'database': 'adx_system',
'user': 'admin',
'password': 'admin123'
}
# SPF includes précieux (passent bien chez certains ISPs)
VALUABLE_SPF_INCLUDES = {
'spf.ews.kpnxchange.com': {'isps': ['ziggo', 'kpn', 'xs4all'], 'country': 'NL', 'score': 90},
'spf.protection.outlook.com': {'isps': ['hotmail', 'outlook'], 'country': 'US', 'score': 85},
'_spf.google.com': {'isps': ['gmail'], 'country': 'US', 'score': 80},
'sendgrid.net': {'isps': ['*'], 'country': 'US', 'score': 70},
'amazonses.com': {'isps': ['*'], 'country': 'US', 'score': 70},
'mailgun.org': {'isps': ['*'], 'country': 'US', 'score': 65},
'spf.mandrillapp.com': {'isps': ['*'], 'country': 'US', 'score': 65},
'servers.mcsv.net': {'isps': ['*'], 'country': 'US', 'score': 60}, # Mailchimp
}
# Patterns de PTR propres (bonne réputation)
CLEAN_PTR_PATTERNS = [
r'mail\.',
r'smtp\.',
r'mx\d*\.',
r'relay\.',
r'out\.',
r'send\.',
]
# Patterns de PTR à éviter
BAD_PTR_PATTERNS = [
r'dynamic',
r'dhcp',
r'pool',
r'cable',
r'dsl',
r'residential',
r'home\.',
r'unknown',
]
def get_db():
return psycopg2.connect(**DB_CONFIG)
def get_spf(domain):
"""Get SPF record"""
try:
answers = dns.resolver.resolve(domain, 'TXT')
for rdata in answers:
txt = str(rdata).strip('"')
if txt.startswith('v=spf1'):
return txt
except:
pass
return None
def get_ptr(ip):
"""Get PTR record"""
try:
result = subprocess.run(['dig', '+short', '-x', ip],
capture_output=True, text=True, timeout=10)
return result.stdout.strip().rstrip('.')
except:
pass
return None
def analyze_spf_value(spf_record):
"""Analyze SPF record for valuable includes"""
if not spf_record:
return {'score': 0, 'includes': [], 'ips': [], 'targets': []}
result = {
'score': 30, # Base score
'includes': [],
'ips': [],
'targets': set()
}
# Extract includes
includes = re.findall(r'include:([^\s]+)', spf_record)
for inc in includes:
result['includes'].append(inc)
for valuable, info in VALUABLE_SPF_INCLUDES.items():
if valuable in inc:
result['score'] += info['score']
if info['isps'][0] != '*':
result['targets'].update(info['isps'])
break
# Extract IPs
ips = re.findall(r'ip4:([^\s]+)', spf_record)
result['ips'] = ips
if ips:
result['score'] += 10
# Check mechanism
if '?all' in spf_record or '~all' in spf_record:
result['score'] += 5 # Soft fail = more permissive
result['targets'] = list(result['targets'])
return result
def is_clean_ptr(ptr_hostname):
"""Check if PTR looks clean/legitimate"""
if not ptr_hostname:
return False, "No PTR"
ptr_lower = ptr_hostname.lower()
# Check for bad patterns
for pattern in BAD_PTR_PATTERNS:
if re.search(pattern, ptr_lower):
return False, f"Bad pattern: {pattern}"
# Check for clean patterns
for pattern in CLEAN_PTR_PATTERNS:
if re.search(pattern, ptr_lower):
return True, f"Clean pattern: {pattern}"
# Check if it's a provider default (usually OK)
provider_patterns = ['hetzner', 'scaleway', 'ovh', 'vultr', 'digitalocean']
for p in provider_patterns:
if p in ptr_lower:
return True, f"Provider PTR: {p}"
# Default: OK if has valid domain structure
if '.' in ptr_hostname and len(ptr_hostname.split('.')) >= 2:
return True, "Valid domain structure"
return False, "Invalid structure"
def discover_valuable_domain(domain):
"""Analyze a domain for sending potential"""
result = {
'domain': domain,
'spf': None,
'spf_analysis': None,
'is_valuable': False,
'score': 0,
'targets': [],
'reason': None
}
# Get SPF
spf = get_spf(domain)
result['spf'] = spf
if spf:
analysis = analyze_spf_value(spf)
result['spf_analysis'] = analysis
result['score'] = analysis['score']
result['targets'] = analysis['targets']
# Valuable if score > 50 or has specific targets
if analysis['score'] >= 50 or analysis['targets']:
result['is_valuable'] = True
result['reason'] = f"Score {analysis['score']}, targets: {analysis['targets']}"
return result
def discover_from_ip_range(ip_base, count=10):
"""Discover PTRs from IP range"""
print(f"\n🔍 Scanning IP range {ip_base}x (first {count})...")
discovered = []
for i in range(1, count + 1):
ip = f"{ip_base}{i}"
ptr = get_ptr(ip)
if ptr:
is_clean, reason = is_clean_ptr(ptr)
# Extract domain
parts = ptr.split('.')
domain = '.'.join(parts[-2:]) if len(parts) >= 2 else ptr
# Analyze domain
domain_info = discover_valuable_domain(domain)
result = {
'ip': ip,
'ptr': ptr,
'domain': domain,
'is_clean': is_clean,
'clean_reason': reason,
'spf': domain_info['spf'],
'score': domain_info['score'],
'targets': domain_info['targets'],
'is_valuable': domain_info['is_valuable']
}
discovered.append(result)
status = "" if result['is_valuable'] else ""
print(f" {status} {ip}{ptr} (score: {result['score']})")
return discovered
def save_discovered_combo(conn, combo):
"""Save discovered combination to database"""
cur = conn.cursor()
try:
cur.execute("""
INSERT INTO admin.ptr_records
(ip_address, ptr_domain, ptr_hostname, has_spf, spf_record,
reputation_score, status, best_target_isps, is_provider_default)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (ip_address, ptr_domain) DO UPDATE SET
spf_record = EXCLUDED.spf_record,
reputation_score = GREATEST(admin.ptr_records.reputation_score, EXCLUDED.reputation_score),
best_target_isps = EXCLUDED.best_target_isps,
last_updated = NOW()
RETURNING id
""", (
combo['ip'],
combo['domain'],
combo['ptr'],
combo['spf'] is not None,
combo['spf'],
combo['score'],
'discovered' if not combo['is_valuable'] else 'candidate',
combo['targets'] if combo['targets'] else None,
not combo['is_clean']
))
conn.commit()
return cur.fetchone()[0]
except Exception as e:
print(f" Error saving: {e}")
conn.rollback()
return None
def scan_provider_ranges():
"""Scan known provider IP ranges for valuable PTRs"""
conn = get_db()
cur = conn.cursor()
# Get provider IP ranges
cur.execute("""
SELECT provider_name, region_code, ip_range_prefix
FROM admin.provider_regions
WHERE ip_range_prefix IS NOT NULL
LIMIT 10
""")
ranges = cur.fetchall()
print("=" * 70)
print("🧠 BRAIN COMBO DISCOVERY - Provider Scan")
print("=" * 70)
all_discovered = []
valuable_count = 0
for provider, region, ip_prefix in ranges:
print(f"\n📡 {provider} / {region} ({ip_prefix})")
# Scan first 5 IPs of each range
discovered = discover_from_ip_range(ip_prefix, 5)
for combo in discovered:
combo['provider'] = provider
combo['region'] = region
if combo['is_valuable']:
save_discovered_combo(conn, combo)
valuable_count += 1
all_discovered.append(combo)
conn.close()
print("\n" + "=" * 70)
print(f"✅ Scan complete! Found {valuable_count} valuable combos")
print("=" * 70)
return all_discovered
def scan_known_valuable_domains():
"""Scan list of known valuable domains"""
# Domains connus pour avoir des SPF intéressants
known_domains = [
'testb2c.nl',
'kpn.com',
'ziggo.nl',
'xs4all.nl',
't-mobile.nl',
'currently.com',
'att.net',
# Add more as discovered
]
print("=" * 70)
print("🧠 BRAIN COMBO DISCOVERY - Known Domains")
print("=" * 70)
conn = get_db()
valuable = []
for domain in known_domains:
print(f"\n🔍 Analyzing {domain}...")
info = discover_valuable_domain(domain)
if info['spf']:
print(f" SPF: {info['spf'][:60]}...")
print(f" Score: {info['score']}")
print(f" Targets: {info['targets']}")
if info['is_valuable']:
valuable.append(info)
print(f" ✅ VALUABLE!")
conn.close()
print(f"\n✅ Found {len(valuable)} valuable domains")
return valuable
def learn_from_success():
"""Learn patterns from successful sends"""
conn = get_db()
cur = conn.cursor()
print("=" * 70)
print("🧠 BRAIN LEARNING - Analyzing Success Patterns")
print("=" * 70)
# Find high-performing sender domains
cur.execute("""
SELECT
SPLIT_PART(sender_email, '@', 2) as domain,
seed_isp,
COUNT(*) as total,
SUM(CASE WHEN inbox_placement = 'inbox' THEN 1 ELSE 0 END) as inbox,
ROUND(SUM(CASE WHEN inbox_placement = 'inbox' THEN 1 ELSE 0 END)::numeric / COUNT(*) * 100, 2) as inbox_rate
FROM admin.seed_tracking
WHERE sender_email IS NOT NULL
AND sent_at > NOW() - INTERVAL '7 days'
GROUP BY SPLIT_PART(sender_email, '@', 2), seed_isp
HAVING COUNT(*) >= 10
ORDER BY inbox_rate DESC
LIMIT 20
""")
results = cur.fetchall()
print("\n🏆 Top Performing Domain → ISP Combos:")
print("-" * 70)
patterns = defaultdict(list)
for domain, isp, total, inbox, rate in results:
print(f" {domain}{isp}: {rate}% inbox ({inbox}/{total})")
# Analyze why it works
spf = get_spf(domain)
if spf:
analysis = analyze_spf_value(spf)
patterns[isp].append({
'domain': domain,
'inbox_rate': float(rate),
'spf_includes': analysis['includes'],
'score': analysis['score']
})
# Save learned patterns
cur.execute("""
INSERT INTO admin.brain_instructions (instruction_type, instruction_data, confidence_score)
VALUES ('learned_patterns', %s, 0.8)
ON CONFLICT (instruction_type) DO UPDATE SET
instruction_data = EXCLUDED.instruction_data,
last_updated = NOW()
""", (json.dumps(dict(patterns)),))
conn.commit()
conn.close()
return patterns
def main():
import sys
if len(sys.argv) > 1:
cmd = sys.argv[1]
if cmd == 'providers':
scan_provider_ranges()
elif cmd == 'domains':
scan_known_valuable_domains()
elif cmd == 'learn':
learn_from_success()
elif cmd == 'analyze':
domain = sys.argv[2] if len(sys.argv) > 2 else 'testb2c.nl'
info = discover_valuable_domain(domain)
print(json.dumps(info, indent=2))
elif cmd == 'full':
# Full discovery run
scan_provider_ranges()
scan_known_valuable_domains()
learn_from_success()
else:
# Default: full run
print("Usage: brain-combo-discovery.py [providers|domains|learn|analyze|full]")
print("\nRunning full discovery...")
scan_known_valuable_domains()
learn_from_success()
if __name__ == '__main__':
main()