432 lines
13 KiB
Python
Executable File
432 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
BRAIN COMBO DISCOVERY
|
|
Découvre automatiquement des combinaisons gagnantes:
|
|
- PTR précieux des providers
|
|
- Domaines avec SPF héritables (kpnxchange, etc.)
|
|
- IPs propres par région
|
|
- Domaines FreeDNS utilisables
|
|
|
|
Analyse les patterns de succès et apprend
|
|
"""
|
|
|
|
import psycopg2
|
|
import dns.resolver
|
|
import socket
|
|
import subprocess
|
|
import re
|
|
import json
|
|
import requests
|
|
from datetime import datetime, timedelta
|
|
from collections import defaultdict
|
|
|
|
DB_CONFIG = {
|
|
'host': 'localhost',
|
|
'database': 'adx_system',
|
|
'user': 'admin',
|
|
'password': 'admin123'
|
|
}
|
|
|
|
# SPF includes précieux (passent bien chez certains ISPs)
|
|
VALUABLE_SPF_INCLUDES = {
|
|
'spf.ews.kpnxchange.com': {'isps': ['ziggo', 'kpn', 'xs4all'], 'country': 'NL', 'score': 90},
|
|
'spf.protection.outlook.com': {'isps': ['hotmail', 'outlook'], 'country': 'US', 'score': 85},
|
|
'_spf.google.com': {'isps': ['gmail'], 'country': 'US', 'score': 80},
|
|
'sendgrid.net': {'isps': ['*'], 'country': 'US', 'score': 70},
|
|
'amazonses.com': {'isps': ['*'], 'country': 'US', 'score': 70},
|
|
'mailgun.org': {'isps': ['*'], 'country': 'US', 'score': 65},
|
|
'spf.mandrillapp.com': {'isps': ['*'], 'country': 'US', 'score': 65},
|
|
'servers.mcsv.net': {'isps': ['*'], 'country': 'US', 'score': 60}, # Mailchimp
|
|
}
|
|
|
|
# Patterns de PTR propres (bonne réputation)
|
|
CLEAN_PTR_PATTERNS = [
|
|
r'mail\.',
|
|
r'smtp\.',
|
|
r'mx\d*\.',
|
|
r'relay\.',
|
|
r'out\.',
|
|
r'send\.',
|
|
]
|
|
|
|
# Patterns de PTR à éviter
|
|
BAD_PTR_PATTERNS = [
|
|
r'dynamic',
|
|
r'dhcp',
|
|
r'pool',
|
|
r'cable',
|
|
r'dsl',
|
|
r'residential',
|
|
r'home\.',
|
|
r'unknown',
|
|
]
|
|
|
|
def get_db():
|
|
return psycopg2.connect(**DB_CONFIG)
|
|
|
|
def get_spf(domain):
|
|
"""Get SPF record"""
|
|
try:
|
|
answers = dns.resolver.resolve(domain, 'TXT')
|
|
for rdata in answers:
|
|
txt = str(rdata).strip('"')
|
|
if txt.startswith('v=spf1'):
|
|
return txt
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
def get_ptr(ip):
|
|
"""Get PTR record"""
|
|
try:
|
|
result = subprocess.run(['dig', '+short', '-x', ip],
|
|
capture_output=True, text=True, timeout=10)
|
|
return result.stdout.strip().rstrip('.')
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
def analyze_spf_value(spf_record):
|
|
"""Analyze SPF record for valuable includes"""
|
|
if not spf_record:
|
|
return {'score': 0, 'includes': [], 'ips': [], 'targets': []}
|
|
|
|
result = {
|
|
'score': 30, # Base score
|
|
'includes': [],
|
|
'ips': [],
|
|
'targets': set()
|
|
}
|
|
|
|
# Extract includes
|
|
includes = re.findall(r'include:([^\s]+)', spf_record)
|
|
for inc in includes:
|
|
result['includes'].append(inc)
|
|
for valuable, info in VALUABLE_SPF_INCLUDES.items():
|
|
if valuable in inc:
|
|
result['score'] += info['score']
|
|
if info['isps'][0] != '*':
|
|
result['targets'].update(info['isps'])
|
|
break
|
|
|
|
# Extract IPs
|
|
ips = re.findall(r'ip4:([^\s]+)', spf_record)
|
|
result['ips'] = ips
|
|
if ips:
|
|
result['score'] += 10
|
|
|
|
# Check mechanism
|
|
if '?all' in spf_record or '~all' in spf_record:
|
|
result['score'] += 5 # Soft fail = more permissive
|
|
|
|
result['targets'] = list(result['targets'])
|
|
return result
|
|
|
|
def is_clean_ptr(ptr_hostname):
|
|
"""Check if PTR looks clean/legitimate"""
|
|
if not ptr_hostname:
|
|
return False, "No PTR"
|
|
|
|
ptr_lower = ptr_hostname.lower()
|
|
|
|
# Check for bad patterns
|
|
for pattern in BAD_PTR_PATTERNS:
|
|
if re.search(pattern, ptr_lower):
|
|
return False, f"Bad pattern: {pattern}"
|
|
|
|
# Check for clean patterns
|
|
for pattern in CLEAN_PTR_PATTERNS:
|
|
if re.search(pattern, ptr_lower):
|
|
return True, f"Clean pattern: {pattern}"
|
|
|
|
# Check if it's a provider default (usually OK)
|
|
provider_patterns = ['hetzner', 'scaleway', 'ovh', 'vultr', 'digitalocean']
|
|
for p in provider_patterns:
|
|
if p in ptr_lower:
|
|
return True, f"Provider PTR: {p}"
|
|
|
|
# Default: OK if has valid domain structure
|
|
if '.' in ptr_hostname and len(ptr_hostname.split('.')) >= 2:
|
|
return True, "Valid domain structure"
|
|
|
|
return False, "Invalid structure"
|
|
|
|
def discover_valuable_domain(domain):
|
|
"""Analyze a domain for sending potential"""
|
|
result = {
|
|
'domain': domain,
|
|
'spf': None,
|
|
'spf_analysis': None,
|
|
'is_valuable': False,
|
|
'score': 0,
|
|
'targets': [],
|
|
'reason': None
|
|
}
|
|
|
|
# Get SPF
|
|
spf = get_spf(domain)
|
|
result['spf'] = spf
|
|
|
|
if spf:
|
|
analysis = analyze_spf_value(spf)
|
|
result['spf_analysis'] = analysis
|
|
result['score'] = analysis['score']
|
|
result['targets'] = analysis['targets']
|
|
|
|
# Valuable if score > 50 or has specific targets
|
|
if analysis['score'] >= 50 or analysis['targets']:
|
|
result['is_valuable'] = True
|
|
result['reason'] = f"Score {analysis['score']}, targets: {analysis['targets']}"
|
|
|
|
return result
|
|
|
|
def discover_from_ip_range(ip_base, count=10):
|
|
"""Discover PTRs from IP range"""
|
|
print(f"\n🔍 Scanning IP range {ip_base}x (first {count})...")
|
|
|
|
discovered = []
|
|
|
|
for i in range(1, count + 1):
|
|
ip = f"{ip_base}{i}"
|
|
ptr = get_ptr(ip)
|
|
|
|
if ptr:
|
|
is_clean, reason = is_clean_ptr(ptr)
|
|
|
|
# Extract domain
|
|
parts = ptr.split('.')
|
|
domain = '.'.join(parts[-2:]) if len(parts) >= 2 else ptr
|
|
|
|
# Analyze domain
|
|
domain_info = discover_valuable_domain(domain)
|
|
|
|
result = {
|
|
'ip': ip,
|
|
'ptr': ptr,
|
|
'domain': domain,
|
|
'is_clean': is_clean,
|
|
'clean_reason': reason,
|
|
'spf': domain_info['spf'],
|
|
'score': domain_info['score'],
|
|
'targets': domain_info['targets'],
|
|
'is_valuable': domain_info['is_valuable']
|
|
}
|
|
|
|
discovered.append(result)
|
|
|
|
status = "✅" if result['is_valuable'] else "⚪"
|
|
print(f" {status} {ip} → {ptr} (score: {result['score']})")
|
|
|
|
return discovered
|
|
|
|
def save_discovered_combo(conn, combo):
|
|
"""Save discovered combination to database"""
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
cur.execute("""
|
|
INSERT INTO admin.ptr_records
|
|
(ip_address, ptr_domain, ptr_hostname, has_spf, spf_record,
|
|
reputation_score, status, best_target_isps, is_provider_default)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (ip_address, ptr_domain) DO UPDATE SET
|
|
spf_record = EXCLUDED.spf_record,
|
|
reputation_score = GREATEST(admin.ptr_records.reputation_score, EXCLUDED.reputation_score),
|
|
best_target_isps = EXCLUDED.best_target_isps,
|
|
last_updated = NOW()
|
|
RETURNING id
|
|
""", (
|
|
combo['ip'],
|
|
combo['domain'],
|
|
combo['ptr'],
|
|
combo['spf'] is not None,
|
|
combo['spf'],
|
|
combo['score'],
|
|
'discovered' if not combo['is_valuable'] else 'candidate',
|
|
combo['targets'] if combo['targets'] else None,
|
|
not combo['is_clean']
|
|
))
|
|
conn.commit()
|
|
return cur.fetchone()[0]
|
|
except Exception as e:
|
|
print(f" Error saving: {e}")
|
|
conn.rollback()
|
|
return None
|
|
|
|
def scan_provider_ranges():
|
|
"""Scan known provider IP ranges for valuable PTRs"""
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
|
|
# Get provider IP ranges
|
|
cur.execute("""
|
|
SELECT provider_name, region_code, ip_range_prefix
|
|
FROM admin.provider_regions
|
|
WHERE ip_range_prefix IS NOT NULL
|
|
LIMIT 10
|
|
""")
|
|
|
|
ranges = cur.fetchall()
|
|
|
|
print("=" * 70)
|
|
print("🧠 BRAIN COMBO DISCOVERY - Provider Scan")
|
|
print("=" * 70)
|
|
|
|
all_discovered = []
|
|
valuable_count = 0
|
|
|
|
for provider, region, ip_prefix in ranges:
|
|
print(f"\n📡 {provider} / {region} ({ip_prefix})")
|
|
|
|
# Scan first 5 IPs of each range
|
|
discovered = discover_from_ip_range(ip_prefix, 5)
|
|
|
|
for combo in discovered:
|
|
combo['provider'] = provider
|
|
combo['region'] = region
|
|
|
|
if combo['is_valuable']:
|
|
save_discovered_combo(conn, combo)
|
|
valuable_count += 1
|
|
all_discovered.append(combo)
|
|
|
|
conn.close()
|
|
|
|
print("\n" + "=" * 70)
|
|
print(f"✅ Scan complete! Found {valuable_count} valuable combos")
|
|
print("=" * 70)
|
|
|
|
return all_discovered
|
|
|
|
def scan_known_valuable_domains():
|
|
"""Scan list of known valuable domains"""
|
|
|
|
# Domains connus pour avoir des SPF intéressants
|
|
known_domains = [
|
|
'testb2c.nl',
|
|
'kpn.com',
|
|
'ziggo.nl',
|
|
'xs4all.nl',
|
|
't-mobile.nl',
|
|
'currently.com',
|
|
'att.net',
|
|
# Add more as discovered
|
|
]
|
|
|
|
print("=" * 70)
|
|
print("🧠 BRAIN COMBO DISCOVERY - Known Domains")
|
|
print("=" * 70)
|
|
|
|
conn = get_db()
|
|
valuable = []
|
|
|
|
for domain in known_domains:
|
|
print(f"\n🔍 Analyzing {domain}...")
|
|
info = discover_valuable_domain(domain)
|
|
|
|
if info['spf']:
|
|
print(f" SPF: {info['spf'][:60]}...")
|
|
print(f" Score: {info['score']}")
|
|
print(f" Targets: {info['targets']}")
|
|
|
|
if info['is_valuable']:
|
|
valuable.append(info)
|
|
print(f" ✅ VALUABLE!")
|
|
|
|
conn.close()
|
|
|
|
print(f"\n✅ Found {len(valuable)} valuable domains")
|
|
return valuable
|
|
|
|
def learn_from_success():
|
|
"""Learn patterns from successful sends"""
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
|
|
print("=" * 70)
|
|
print("🧠 BRAIN LEARNING - Analyzing Success Patterns")
|
|
print("=" * 70)
|
|
|
|
# Find high-performing sender domains
|
|
cur.execute("""
|
|
SELECT
|
|
SPLIT_PART(sender_email, '@', 2) as domain,
|
|
seed_isp,
|
|
COUNT(*) as total,
|
|
SUM(CASE WHEN inbox_placement = 'inbox' THEN 1 ELSE 0 END) as inbox,
|
|
ROUND(SUM(CASE WHEN inbox_placement = 'inbox' THEN 1 ELSE 0 END)::numeric / COUNT(*) * 100, 2) as inbox_rate
|
|
FROM admin.seed_tracking
|
|
WHERE sender_email IS NOT NULL
|
|
AND sent_at > NOW() - INTERVAL '7 days'
|
|
GROUP BY SPLIT_PART(sender_email, '@', 2), seed_isp
|
|
HAVING COUNT(*) >= 10
|
|
ORDER BY inbox_rate DESC
|
|
LIMIT 20
|
|
""")
|
|
|
|
results = cur.fetchall()
|
|
|
|
print("\n🏆 Top Performing Domain → ISP Combos:")
|
|
print("-" * 70)
|
|
|
|
patterns = defaultdict(list)
|
|
|
|
for domain, isp, total, inbox, rate in results:
|
|
print(f" {domain} → {isp}: {rate}% inbox ({inbox}/{total})")
|
|
|
|
# Analyze why it works
|
|
spf = get_spf(domain)
|
|
if spf:
|
|
analysis = analyze_spf_value(spf)
|
|
patterns[isp].append({
|
|
'domain': domain,
|
|
'inbox_rate': float(rate),
|
|
'spf_includes': analysis['includes'],
|
|
'score': analysis['score']
|
|
})
|
|
|
|
# Save learned patterns
|
|
cur.execute("""
|
|
INSERT INTO admin.brain_instructions (instruction_type, instruction_data, confidence_score)
|
|
VALUES ('learned_patterns', %s, 0.8)
|
|
ON CONFLICT (instruction_type) DO UPDATE SET
|
|
instruction_data = EXCLUDED.instruction_data,
|
|
last_updated = NOW()
|
|
""", (json.dumps(dict(patterns)),))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return patterns
|
|
|
|
def main():
|
|
import sys
|
|
|
|
if len(sys.argv) > 1:
|
|
cmd = sys.argv[1]
|
|
|
|
if cmd == 'providers':
|
|
scan_provider_ranges()
|
|
elif cmd == 'domains':
|
|
scan_known_valuable_domains()
|
|
elif cmd == 'learn':
|
|
learn_from_success()
|
|
elif cmd == 'analyze':
|
|
domain = sys.argv[2] if len(sys.argv) > 2 else 'testb2c.nl'
|
|
info = discover_valuable_domain(domain)
|
|
print(json.dumps(info, indent=2))
|
|
elif cmd == 'full':
|
|
# Full discovery run
|
|
scan_provider_ranges()
|
|
scan_known_valuable_domains()
|
|
learn_from_success()
|
|
else:
|
|
# Default: full run
|
|
print("Usage: brain-combo-discovery.py [providers|domains|learn|analyze|full]")
|
|
print("\nRunning full discovery...")
|
|
scan_known_valuable_domains()
|
|
learn_from_success()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|