414 lines
12 KiB
Python
Executable File
414 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
PTR DISCOVERY & ANALYSIS
|
|
- Découvre PTR des serveurs
|
|
- Analyse DNS (SPF, MX, DMARC)
|
|
- Évalue réputation
|
|
- Identifie PTR gagnants pour ISPs spéciaux
|
|
"""
|
|
|
|
import psycopg2
|
|
import socket
|
|
import subprocess
|
|
import dns.resolver
|
|
import dns.reversename
|
|
import re
|
|
import json
|
|
from datetime import datetime
|
|
|
|
DB_CONFIG = {
|
|
'host': 'localhost',
|
|
'database': 'adx_system',
|
|
'user': 'admin',
|
|
'password': 'admin123'
|
|
}
|
|
|
|
def get_db():
|
|
return psycopg2.connect(**DB_CONFIG)
|
|
|
|
def get_ptr(ip):
|
|
"""Get PTR record for IP"""
|
|
try:
|
|
# Using dig
|
|
result = subprocess.run(
|
|
['dig', '+short', '-x', ip],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
ptr = result.stdout.strip().rstrip('.')
|
|
if ptr:
|
|
return ptr
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
# Fallback: socket
|
|
hostname, _, _ = socket.gethostbyaddr(ip)
|
|
return hostname
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
# Fallback: dnspython
|
|
rev_name = dns.reversename.from_address(ip)
|
|
answers = dns.resolver.resolve(rev_name, 'PTR')
|
|
return str(answers[0]).rstrip('.')
|
|
except:
|
|
pass
|
|
|
|
return None
|
|
|
|
def get_spf(domain):
|
|
"""Get SPF record"""
|
|
try:
|
|
answers = dns.resolver.resolve(domain, 'TXT')
|
|
for rdata in answers:
|
|
txt = str(rdata).strip('"')
|
|
if txt.startswith('v=spf1'):
|
|
return txt
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
def get_mx(domain):
|
|
"""Get MX records"""
|
|
try:
|
|
answers = dns.resolver.resolve(domain, 'MX')
|
|
return [str(r.exchange).rstrip('.') for r in answers]
|
|
except:
|
|
pass
|
|
return []
|
|
|
|
def get_dmarc(domain):
|
|
"""Get DMARC record"""
|
|
try:
|
|
answers = dns.resolver.resolve(f'_dmarc.{domain}', 'TXT')
|
|
for rdata in answers:
|
|
txt = str(rdata).strip('"')
|
|
if 'v=DMARC1' in txt:
|
|
return txt
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
def analyze_ptr(ip, provider=None, region=None):
|
|
"""Full PTR analysis"""
|
|
ptr = get_ptr(ip)
|
|
if not ptr:
|
|
return None
|
|
|
|
# Extract domain from PTR hostname
|
|
parts = ptr.split('.')
|
|
if len(parts) >= 2:
|
|
domain = '.'.join(parts[-2:]) # Get TLD
|
|
if len(parts) >= 3 and parts[-2] in ['co', 'com', 'net', 'org']:
|
|
domain = '.'.join(parts[-3:])
|
|
else:
|
|
domain = ptr
|
|
|
|
result = {
|
|
'ip': ip,
|
|
'ptr_hostname': ptr,
|
|
'ptr_domain': domain,
|
|
'provider': provider,
|
|
'region': region,
|
|
'spf': get_spf(domain),
|
|
'mx': get_mx(domain),
|
|
'dmarc': get_dmarc(domain),
|
|
'is_provider_default': is_provider_ptr(ptr)
|
|
}
|
|
|
|
# Calculate reputation score
|
|
score = 50
|
|
if result['spf']:
|
|
score += 15
|
|
if result['mx']:
|
|
score += 10
|
|
if result['dmarc']:
|
|
score += 10
|
|
if not result['is_provider_default']:
|
|
score += 15 # Custom PTR = better
|
|
|
|
result['reputation_score'] = min(100, score)
|
|
|
|
return result
|
|
|
|
def is_provider_ptr(ptr):
|
|
"""Check if PTR is provider-generated"""
|
|
provider_patterns = [
|
|
r'\.hetzner\.', r'\.your-server\.de',
|
|
r'\.scaleway\.', r'\.scw\.',
|
|
r'\.vultr\.', r'\.choopa\.net',
|
|
r'\.ovh\.', r'\.runabove\.',
|
|
r'\.digitalocean\.', r'\.do\.',
|
|
r'\.linode\.', r'\.linodeusercontent\.',
|
|
r'\.huaweicloud\.', r'\.myhuaweicloud\.',
|
|
r'ip-\d+-\d+-\d+-\d+',
|
|
r'vps\d+', r'server\d+',
|
|
r'^(\d{1,3}-){3}\d{1,3}\.',
|
|
]
|
|
|
|
for pattern in provider_patterns:
|
|
if re.search(pattern, ptr, re.IGNORECASE):
|
|
return True
|
|
return False
|
|
|
|
def save_ptr_record(conn, ptr_data, server_id=None):
|
|
"""Save PTR record to database"""
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
cur.execute("""
|
|
INSERT INTO admin.ptr_records
|
|
(server_id, provider_name, region, ip_address, ptr_domain, ptr_hostname,
|
|
has_spf, spf_record, has_mx, mx_records, has_dmarc,
|
|
is_provider_default, reputation_score, status)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'discovered')
|
|
ON CONFLICT (ip_address, ptr_domain) DO UPDATE SET
|
|
has_spf = EXCLUDED.has_spf,
|
|
spf_record = EXCLUDED.spf_record,
|
|
has_mx = EXCLUDED.has_mx,
|
|
mx_records = EXCLUDED.mx_records,
|
|
reputation_score = EXCLUDED.reputation_score,
|
|
last_updated = NOW()
|
|
RETURNING id
|
|
""", (
|
|
server_id,
|
|
ptr_data['provider'],
|
|
ptr_data['region'],
|
|
ptr_data['ip'],
|
|
ptr_data['ptr_domain'],
|
|
ptr_data['ptr_hostname'],
|
|
ptr_data['spf'] is not None,
|
|
ptr_data['spf'],
|
|
len(ptr_data['mx']) > 0,
|
|
ptr_data['mx'],
|
|
ptr_data['dmarc'] is not None,
|
|
ptr_data['is_provider_default'],
|
|
ptr_data['reputation_score']
|
|
))
|
|
|
|
conn.commit()
|
|
return cur.fetchone()[0]
|
|
except Exception as e:
|
|
print(f"Error saving PTR: {e}")
|
|
conn.rollback()
|
|
return None
|
|
|
|
def discover_all_server_ptrs():
|
|
"""Discover PTR for all servers"""
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT id, ip_address, provider_name, region
|
|
FROM admin.servers
|
|
WHERE status IN ('running', 'provisioning')
|
|
AND ip_address IS NOT NULL
|
|
""")
|
|
|
|
servers = cur.fetchall()
|
|
|
|
print(f"Discovering PTR for {len(servers)} servers...")
|
|
|
|
discovered = 0
|
|
for server_id, ip, provider, region in servers:
|
|
ip_str = str(ip)
|
|
print(f"\n🔍 {ip_str} ({provider}/{region})")
|
|
|
|
ptr_data = analyze_ptr(ip_str, provider, region)
|
|
|
|
if ptr_data:
|
|
print(f" PTR: {ptr_data['ptr_hostname']}")
|
|
print(f" Domain: {ptr_data['ptr_domain']}")
|
|
print(f" SPF: {ptr_data['spf'][:50] if ptr_data['spf'] else 'None'}...")
|
|
print(f" Score: {ptr_data['reputation_score']}")
|
|
|
|
save_ptr_record(conn, ptr_data, server_id)
|
|
discovered += 1
|
|
else:
|
|
print(f" ❌ No PTR found")
|
|
|
|
conn.close()
|
|
print(f"\n✅ Discovered {discovered} PTR records")
|
|
return discovered
|
|
|
|
def check_ptr_for_special_isps(ptr_domain, spf_record):
|
|
"""Check if PTR works for special ISPs (Netherlands, etc.)"""
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
|
|
matches = []
|
|
|
|
# Check against special ISPs
|
|
cur.execute("SELECT isp_name, country_code, best_spf_includes FROM admin.target_isps_special")
|
|
|
|
for isp, country, spf_includes in cur.fetchall():
|
|
if spf_includes and spf_record:
|
|
for include in spf_includes:
|
|
if include in spf_record:
|
|
matches.append({
|
|
'isp': isp,
|
|
'country': country,
|
|
'reason': f'SPF includes {include}'
|
|
})
|
|
|
|
conn.close()
|
|
return matches
|
|
|
|
def analyze_ptr_for_targets():
|
|
"""Analyze all PTRs for special target ISPs"""
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
|
|
print("=" * 70)
|
|
print("🎯 PTR ANALYSIS FOR SPECIAL TARGETS")
|
|
print("=" * 70)
|
|
|
|
cur.execute("""
|
|
SELECT id, ptr_domain, spf_record, ip_address, provider_name
|
|
FROM admin.ptr_records
|
|
WHERE has_spf = true AND is_usable = true
|
|
""")
|
|
|
|
for ptr_id, domain, spf, ip, provider in cur.fetchall():
|
|
matches = check_ptr_for_special_isps(domain, spf)
|
|
|
|
if matches:
|
|
print(f"\n🏆 {domain} ({ip})")
|
|
print(f" Provider: {provider}")
|
|
print(f" SPF: {spf[:60]}...")
|
|
|
|
for m in matches:
|
|
print(f" ✅ Works for: {m['isp']} ({m['country']}) - {m['reason']}")
|
|
|
|
# Update PTR record
|
|
cur.execute("""
|
|
UPDATE admin.ptr_records SET
|
|
best_target_isps = array_append(
|
|
COALESCE(best_target_isps, ARRAY[]::text[]), %s
|
|
),
|
|
best_target_countries = array_append(
|
|
COALESCE(best_target_countries, ARRAY[]::text[]), %s
|
|
),
|
|
status = 'winner'
|
|
WHERE id = %s
|
|
""", (m['isp'], m['country'], ptr_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def show_ptr_report():
|
|
"""Show PTR discovery report"""
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
|
|
print("\n" + "=" * 70)
|
|
print("📊 PTR DISCOVERY REPORT")
|
|
print("=" * 70)
|
|
|
|
# Stats
|
|
cur.execute("SELECT COUNT(*), COUNT(CASE WHEN status = 'winner' THEN 1 END) FROM admin.ptr_records")
|
|
total, winners = cur.fetchone()
|
|
print(f"\nTotal PTRs: {total}")
|
|
print(f"Winners: {winners}")
|
|
|
|
# Best PTRs
|
|
print("\n🏆 TOP PTR DOMAINS:")
|
|
cur.execute("""
|
|
SELECT ptr_domain, ip_address, provider_name, reputation_score,
|
|
best_target_isps, spf_record
|
|
FROM admin.ptr_records
|
|
WHERE reputation_score >= 70 OR status = 'winner'
|
|
ORDER BY reputation_score DESC
|
|
LIMIT 10
|
|
""")
|
|
|
|
print(f"{'Domain':<30} {'IP':<16} {'Provider':<12} {'Score':<6} {'Targets'}")
|
|
print("-" * 90)
|
|
|
|
for domain, ip, provider, score, targets, spf in cur.fetchall():
|
|
targets_str = ', '.join(targets) if targets else '-'
|
|
print(f"{domain[:30]:<30} {str(ip):<16} {provider or '-':<12} {score:<6} {targets_str}")
|
|
|
|
# Winners for Netherlands
|
|
print("\n🇳🇱 WINNERS FOR NETHERLANDS ISPs:")
|
|
cur.execute("""
|
|
SELECT ptr_domain, ip_address, spf_record, best_target_isps
|
|
FROM admin.ptr_records
|
|
WHERE 'NL' = ANY(best_target_countries) OR 'ziggo' = ANY(best_target_isps) OR 'kpn' = ANY(best_target_isps)
|
|
""")
|
|
|
|
for domain, ip, spf, targets in cur.fetchall():
|
|
print(f" ✅ {domain} ({ip})")
|
|
print(f" Targets: {targets}")
|
|
print(f" SPF: {spf[:60] if spf else 'None'}...")
|
|
|
|
conn.close()
|
|
|
|
def add_manual_ptr(ip, ptr_domain, spf=None, targets=None):
|
|
"""Add manually discovered PTR"""
|
|
conn = get_db()
|
|
|
|
ptr_data = {
|
|
'ip': ip,
|
|
'ptr_hostname': ptr_domain,
|
|
'ptr_domain': ptr_domain,
|
|
'provider': 'manual',
|
|
'region': None,
|
|
'spf': spf,
|
|
'mx': [],
|
|
'dmarc': None,
|
|
'is_provider_default': False,
|
|
'reputation_score': 80
|
|
}
|
|
|
|
ptr_id = save_ptr_record(conn, ptr_data)
|
|
|
|
if targets:
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
UPDATE admin.ptr_records SET
|
|
best_target_isps = %s,
|
|
status = 'winner'
|
|
WHERE id = %s
|
|
""", (targets, ptr_id))
|
|
conn.commit()
|
|
|
|
conn.close()
|
|
print(f"✅ Added PTR: {ptr_domain} ({ip})")
|
|
return ptr_id
|
|
|
|
def main():
|
|
import sys
|
|
|
|
if len(sys.argv) > 1:
|
|
cmd = sys.argv[1]
|
|
|
|
if cmd == 'discover':
|
|
discover_all_server_ptrs()
|
|
elif cmd == 'analyze':
|
|
analyze_ptr_for_targets()
|
|
elif cmd == 'report':
|
|
show_ptr_report()
|
|
elif cmd == 'check':
|
|
ip = sys.argv[2] if len(sys.argv) > 2 else '89.167.40.150'
|
|
ptr_data = analyze_ptr(ip)
|
|
print(json.dumps(ptr_data, indent=2, default=str))
|
|
elif cmd == 'add':
|
|
if len(sys.argv) < 4:
|
|
print("Usage: ptr-discovery.py add <ip> <ptr_domain> [spf] [targets]")
|
|
return
|
|
ip = sys.argv[2]
|
|
domain = sys.argv[3]
|
|
spf = sys.argv[4] if len(sys.argv) > 4 else None
|
|
targets = sys.argv[5].split(',') if len(sys.argv) > 5 else None
|
|
add_manual_ptr(ip, domain, spf, targets)
|
|
else:
|
|
# Full run
|
|
discover_all_server_ptrs()
|
|
analyze_ptr_for_targets()
|
|
show_ptr_report()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|