Files
wevads-platform/scripts/ptr-discovery.py
2026-02-26 04:53:11 +01:00

414 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
PTR DISCOVERY & ANALYSIS
- Découvre PTR des serveurs
- Analyse DNS (SPF, MX, DMARC)
- Évalue réputation
- Identifie PTR gagnants pour ISPs spéciaux
"""
import psycopg2
import socket
import subprocess
import dns.resolver
import dns.reversename
import re
import json
from datetime import datetime
DB_CONFIG = {
'host': 'localhost',
'database': 'adx_system',
'user': 'admin',
'password': 'admin123'
}
def get_db():
return psycopg2.connect(**DB_CONFIG)
def get_ptr(ip):
"""Get PTR record for IP"""
try:
# Using dig
result = subprocess.run(
['dig', '+short', '-x', ip],
capture_output=True, text=True, timeout=10
)
ptr = result.stdout.strip().rstrip('.')
if ptr:
return ptr
except:
pass
try:
# Fallback: socket
hostname, _, _ = socket.gethostbyaddr(ip)
return hostname
except:
pass
try:
# Fallback: dnspython
rev_name = dns.reversename.from_address(ip)
answers = dns.resolver.resolve(rev_name, 'PTR')
return str(answers[0]).rstrip('.')
except:
pass
return None
def get_spf(domain):
"""Get SPF record"""
try:
answers = dns.resolver.resolve(domain, 'TXT')
for rdata in answers:
txt = str(rdata).strip('"')
if txt.startswith('v=spf1'):
return txt
except:
pass
return None
def get_mx(domain):
"""Get MX records"""
try:
answers = dns.resolver.resolve(domain, 'MX')
return [str(r.exchange).rstrip('.') for r in answers]
except:
pass
return []
def get_dmarc(domain):
"""Get DMARC record"""
try:
answers = dns.resolver.resolve(f'_dmarc.{domain}', 'TXT')
for rdata in answers:
txt = str(rdata).strip('"')
if 'v=DMARC1' in txt:
return txt
except:
pass
return None
def analyze_ptr(ip, provider=None, region=None):
"""Full PTR analysis"""
ptr = get_ptr(ip)
if not ptr:
return None
# Extract domain from PTR hostname
parts = ptr.split('.')
if len(parts) >= 2:
domain = '.'.join(parts[-2:]) # Get TLD
if len(parts) >= 3 and parts[-2] in ['co', 'com', 'net', 'org']:
domain = '.'.join(parts[-3:])
else:
domain = ptr
result = {
'ip': ip,
'ptr_hostname': ptr,
'ptr_domain': domain,
'provider': provider,
'region': region,
'spf': get_spf(domain),
'mx': get_mx(domain),
'dmarc': get_dmarc(domain),
'is_provider_default': is_provider_ptr(ptr)
}
# Calculate reputation score
score = 50
if result['spf']:
score += 15
if result['mx']:
score += 10
if result['dmarc']:
score += 10
if not result['is_provider_default']:
score += 15 # Custom PTR = better
result['reputation_score'] = min(100, score)
return result
def is_provider_ptr(ptr):
"""Check if PTR is provider-generated"""
provider_patterns = [
r'\.hetzner\.', r'\.your-server\.de',
r'\.scaleway\.', r'\.scw\.',
r'\.vultr\.', r'\.choopa\.net',
r'\.ovh\.', r'\.runabove\.',
r'\.digitalocean\.', r'\.do\.',
r'\.linode\.', r'\.linodeusercontent\.',
r'\.huaweicloud\.', r'\.myhuaweicloud\.',
r'ip-\d+-\d+-\d+-\d+',
r'vps\d+', r'server\d+',
r'^(\d{1,3}-){3}\d{1,3}\.',
]
for pattern in provider_patterns:
if re.search(pattern, ptr, re.IGNORECASE):
return True
return False
def save_ptr_record(conn, ptr_data, server_id=None):
"""Save PTR record to database"""
cur = conn.cursor()
try:
cur.execute("""
INSERT INTO admin.ptr_records
(server_id, provider_name, region, ip_address, ptr_domain, ptr_hostname,
has_spf, spf_record, has_mx, mx_records, has_dmarc,
is_provider_default, reputation_score, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'discovered')
ON CONFLICT (ip_address, ptr_domain) DO UPDATE SET
has_spf = EXCLUDED.has_spf,
spf_record = EXCLUDED.spf_record,
has_mx = EXCLUDED.has_mx,
mx_records = EXCLUDED.mx_records,
reputation_score = EXCLUDED.reputation_score,
last_updated = NOW()
RETURNING id
""", (
server_id,
ptr_data['provider'],
ptr_data['region'],
ptr_data['ip'],
ptr_data['ptr_domain'],
ptr_data['ptr_hostname'],
ptr_data['spf'] is not None,
ptr_data['spf'],
len(ptr_data['mx']) > 0,
ptr_data['mx'],
ptr_data['dmarc'] is not None,
ptr_data['is_provider_default'],
ptr_data['reputation_score']
))
conn.commit()
return cur.fetchone()[0]
except Exception as e:
print(f"Error saving PTR: {e}")
conn.rollback()
return None
def discover_all_server_ptrs():
"""Discover PTR for all servers"""
conn = get_db()
cur = conn.cursor()
cur.execute("""
SELECT id, ip_address, provider_name, region
FROM admin.servers
WHERE status IN ('running', 'provisioning')
AND ip_address IS NOT NULL
""")
servers = cur.fetchall()
print(f"Discovering PTR for {len(servers)} servers...")
discovered = 0
for server_id, ip, provider, region in servers:
ip_str = str(ip)
print(f"\n🔍 {ip_str} ({provider}/{region})")
ptr_data = analyze_ptr(ip_str, provider, region)
if ptr_data:
print(f" PTR: {ptr_data['ptr_hostname']}")
print(f" Domain: {ptr_data['ptr_domain']}")
print(f" SPF: {ptr_data['spf'][:50] if ptr_data['spf'] else 'None'}...")
print(f" Score: {ptr_data['reputation_score']}")
save_ptr_record(conn, ptr_data, server_id)
discovered += 1
else:
print(f" ❌ No PTR found")
conn.close()
print(f"\n✅ Discovered {discovered} PTR records")
return discovered
def check_ptr_for_special_isps(ptr_domain, spf_record):
"""Check if PTR works for special ISPs (Netherlands, etc.)"""
conn = get_db()
cur = conn.cursor()
matches = []
# Check against special ISPs
cur.execute("SELECT isp_name, country_code, best_spf_includes FROM admin.target_isps_special")
for isp, country, spf_includes in cur.fetchall():
if spf_includes and spf_record:
for include in spf_includes:
if include in spf_record:
matches.append({
'isp': isp,
'country': country,
'reason': f'SPF includes {include}'
})
conn.close()
return matches
def analyze_ptr_for_targets():
"""Analyze all PTRs for special target ISPs"""
conn = get_db()
cur = conn.cursor()
print("=" * 70)
print("🎯 PTR ANALYSIS FOR SPECIAL TARGETS")
print("=" * 70)
cur.execute("""
SELECT id, ptr_domain, spf_record, ip_address, provider_name
FROM admin.ptr_records
WHERE has_spf = true AND is_usable = true
""")
for ptr_id, domain, spf, ip, provider in cur.fetchall():
matches = check_ptr_for_special_isps(domain, spf)
if matches:
print(f"\n🏆 {domain} ({ip})")
print(f" Provider: {provider}")
print(f" SPF: {spf[:60]}...")
for m in matches:
print(f" ✅ Works for: {m['isp']} ({m['country']}) - {m['reason']}")
# Update PTR record
cur.execute("""
UPDATE admin.ptr_records SET
best_target_isps = array_append(
COALESCE(best_target_isps, ARRAY[]::text[]), %s
),
best_target_countries = array_append(
COALESCE(best_target_countries, ARRAY[]::text[]), %s
),
status = 'winner'
WHERE id = %s
""", (m['isp'], m['country'], ptr_id))
conn.commit()
conn.close()
def show_ptr_report():
"""Show PTR discovery report"""
conn = get_db()
cur = conn.cursor()
print("\n" + "=" * 70)
print("📊 PTR DISCOVERY REPORT")
print("=" * 70)
# Stats
cur.execute("SELECT COUNT(*), COUNT(CASE WHEN status = 'winner' THEN 1 END) FROM admin.ptr_records")
total, winners = cur.fetchone()
print(f"\nTotal PTRs: {total}")
print(f"Winners: {winners}")
# Best PTRs
print("\n🏆 TOP PTR DOMAINS:")
cur.execute("""
SELECT ptr_domain, ip_address, provider_name, reputation_score,
best_target_isps, spf_record
FROM admin.ptr_records
WHERE reputation_score >= 70 OR status = 'winner'
ORDER BY reputation_score DESC
LIMIT 10
""")
print(f"{'Domain':<30} {'IP':<16} {'Provider':<12} {'Score':<6} {'Targets'}")
print("-" * 90)
for domain, ip, provider, score, targets, spf in cur.fetchall():
targets_str = ', '.join(targets) if targets else '-'
print(f"{domain[:30]:<30} {str(ip):<16} {provider or '-':<12} {score:<6} {targets_str}")
# Winners for Netherlands
print("\n🇳🇱 WINNERS FOR NETHERLANDS ISPs:")
cur.execute("""
SELECT ptr_domain, ip_address, spf_record, best_target_isps
FROM admin.ptr_records
WHERE 'NL' = ANY(best_target_countries) OR 'ziggo' = ANY(best_target_isps) OR 'kpn' = ANY(best_target_isps)
""")
for domain, ip, spf, targets in cur.fetchall():
print(f"{domain} ({ip})")
print(f" Targets: {targets}")
print(f" SPF: {spf[:60] if spf else 'None'}...")
conn.close()
def add_manual_ptr(ip, ptr_domain, spf=None, targets=None):
"""Add manually discovered PTR"""
conn = get_db()
ptr_data = {
'ip': ip,
'ptr_hostname': ptr_domain,
'ptr_domain': ptr_domain,
'provider': 'manual',
'region': None,
'spf': spf,
'mx': [],
'dmarc': None,
'is_provider_default': False,
'reputation_score': 80
}
ptr_id = save_ptr_record(conn, ptr_data)
if targets:
cur = conn.cursor()
cur.execute("""
UPDATE admin.ptr_records SET
best_target_isps = %s,
status = 'winner'
WHERE id = %s
""", (targets, ptr_id))
conn.commit()
conn.close()
print(f"✅ Added PTR: {ptr_domain} ({ip})")
return ptr_id
def main():
import sys
if len(sys.argv) > 1:
cmd = sys.argv[1]
if cmd == 'discover':
discover_all_server_ptrs()
elif cmd == 'analyze':
analyze_ptr_for_targets()
elif cmd == 'report':
show_ptr_report()
elif cmd == 'check':
ip = sys.argv[2] if len(sys.argv) > 2 else '89.167.40.150'
ptr_data = analyze_ptr(ip)
print(json.dumps(ptr_data, indent=2, default=str))
elif cmd == 'add':
if len(sys.argv) < 4:
print("Usage: ptr-discovery.py add <ip> <ptr_domain> [spf] [targets]")
return
ip = sys.argv[2]
domain = sys.argv[3]
spf = sys.argv[4] if len(sys.argv) > 4 else None
targets = sys.argv[5].split(',') if len(sys.argv) > 5 else None
add_manual_ptr(ip, domain, spf, targets)
else:
# Full run
discover_all_server_ptrs()
analyze_ptr_for_targets()
show_ptr_report()
if __name__ == '__main__':
main()