Files
wevads-platform/scripts/cross-isp-tester.py
2026-02-26 04:53:11 +01:00

242 lines
9.2 KiB
Python
Executable File

#!/usr/bin/env python3
"""
CROSS-ISP TESTER
Teste une config validée sur d'autres ISPs du même filter.
"""
import psycopg2
import json
from datetime import datetime
DB_CONFIG = {
'host': 'localhost',
'database': 'adx_system',
'user': 'admin',
'password': 'admin123'
}
HEADER_COMPATIBILITY = {
'mailchimp_spoof': ['gmail', 'googlemail', 'hotmail', 'outlook', 'yahoo', 'aol'],
'exchange': ['hotmail', 'outlook', 'live', 'msn'],
'newsletter': ['t-online', 'gmx', 'web.de', 'freenet', '1und1', 'spectrum', 'roadrunner', 'twc'],
'standard': ['videotron', 'bell', 'rogers', 'comcast', 'att', 'charter']
}
def get_db():
return psycopg2.connect(**DB_CONFIG)
def get_config_test_status(config_id, config_type='headers'):
conn = get_db()
cur = conn.cursor()
cur.execute("""
SELECT isp_name, result, inbox_rate, test_date
FROM admin.config_isp_tests
WHERE config_type = %s AND config_id = %s
ORDER BY test_date DESC
""", (config_type, config_id))
tests = {r[0]: {'result': r[1], 'inbox_rate': r[2], 'date': r[3]} for r in cur.fetchall()}
conn.close()
return tests
def get_untested_isps(config_id, config_type='headers'):
conn = get_db()
cur = conn.cursor()
cur.execute("SELECT target_method, target_isps FROM admin.winning_headers WHERE id = %s", (config_id,))
row = cur.fetchone()
if not row:
conn.close()
return []
method, validated_isps = row
validated_isps = validated_isps or []
compatible = HEADER_COMPATIBILITY.get(method, [])
cur.execute("SELECT DISTINCT isp_name FROM admin.config_isp_tests WHERE config_type = %s AND config_id = %s", (config_type, config_id))
tested = [r[0] for r in cur.fetchall()]
conn.close()
return [isp for isp in compatible if isp not in validated_isps and isp not in tested]
def record_test_result(config_id, config_type, config_name, isp, sent, inbox, spam, notes=''):
conn = get_db()
cur = conn.cursor()
inbox_rate = (inbox / sent * 100) if sent > 0 else 0
result = 'pass' if inbox_rate >= 70 else ('partial' if inbox_rate >= 50 else 'fail')
cur.execute("""
INSERT INTO admin.config_isp_tests
(config_type, config_id, config_name, isp_name, emails_sent, emails_inbox, emails_spam, inbox_rate, result, notes)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (config_type, config_id, config_name, isp, sent, inbox, spam, inbox_rate, result, notes))
test_id = cur.fetchone()[0]
if result == 'pass':
cur.execute("""
UPDATE admin.winning_headers
SET target_isps = array_append(COALESCE(target_isps, ARRAY[]::text[]), %s)
WHERE id = %s AND NOT (%s = ANY(COALESCE(target_isps, ARRAY[]::text[])))
""", (isp, config_id, isp))
print(f"{isp.upper()} PASSED! Added to validated ISPs ({inbox_rate:.1f}%)")
else:
print(f"{'⚠️' if result == 'partial' else ''} {isp.upper()}: {result} ({inbox_rate:.1f}%)")
conn.commit()
conn.close()
return test_id
def show_config_matrix():
conn = get_db()
cur = conn.cursor()
print("=" * 90)
print("📊 CONFIG x ISP TEST MATRIX")
print("=" * 90)
cur.execute("SELECT id, config_name, target_method, target_isps, status, inbox_rate FROM admin.winning_headers ORDER BY status DESC")
for cid, name, method, validated, status, rate in cur.fetchall():
printcat > /opt/wevads/scripts/cross-isp-tester.py << 'EOFPY'
#!/usr/bin/env python3
"""
CROSS-ISP TESTER
Teste une config validée sur d'autres ISPs du même filter.
"""
import psycopg2
import json
from datetime import datetime
DB_CONFIG = {
'host': 'localhost',
'database': 'adx_system',
'user': 'admin',
'password': 'admin123'
}
HEADER_COMPATIBILITY = {
'mailchimp_spoof': ['gmail', 'googlemail', 'hotmail', 'outlook', 'yahoo', 'aol'],
'exchange': ['hotmail', 'outlook', 'live', 'msn'],
'newsletter': ['t-online', 'gmx', 'web.de', 'freenet', '1und1', 'spectrum', 'roadrunner', 'twc'],
'standard': ['videotron', 'bell', 'rogers', 'comcast', 'att', 'charter']
}
def get_db():
return psycopg2.connect(**DB_CONFIG)
def get_config_test_status(config_id, config_type='headers'):
conn = get_db()
cur = conn.cursor()
cur.execute("""
SELECT isp_name, result, inbox_rate, test_date
FROM admin.config_isp_tests
WHERE config_type = %s AND config_id = %s
ORDER BY test_date DESC
""", (config_type, config_id))
tests = {r[0]: {'result': r[1], 'inbox_rate': r[2], 'date': r[3]} for r in cur.fetchall()}
conn.close()
return tests
def get_untested_isps(config_id, config_type='headers'):
conn = get_db()
cur = conn.cursor()
cur.execute("SELECT target_method, target_isps FROM admin.winning_headers WHERE id = %s", (config_id,))
row = cur.fetchone()
if not row:
conn.close()
return []
method, validated_isps = row
validated_isps = validated_isps or []
compatible = HEADER_COMPATIBILITY.get(method, [])
cur.execute("SELECT DISTINCT isp_name FROM admin.config_isp_tests WHERE config_type = %s AND config_id = %s", (config_type, config_id))
tested = [r[0] for r in cur.fetchall()]
conn.close()
return [isp for isp in compatible if isp not in validated_isps and isp not in tested]
def record_test_result(config_id, config_type, config_name, isp, sent, inbox, spam, notes=''):
conn = get_db()
cur = conn.cursor()
inbox_rate = (inbox / sent * 100) if sent > 0 else 0
result = 'pass' if inbox_rate >= 70 else ('partial' if inbox_rate >= 50 else 'fail')
cur.execute("""
INSERT INTO admin.config_isp_tests
(config_type, config_id, config_name, isp_name, emails_sent, emails_inbox, emails_spam, inbox_rate, result, notes)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (config_type, config_id, config_name, isp, sent, inbox, spam, inbox_rate, result, notes))
test_id = cur.fetchone()[0]
if result == 'pass':
cur.execute("""
UPDATE admin.winning_headers
SET target_isps = array_append(COALESCE(target_isps, ARRAY[]::text[]), %s)
WHERE id = %s AND NOT (%s = ANY(COALESCE(target_isps, ARRAY[]::text[])))
""", (isp, config_id, isp))
print(f"{isp.upper()} PASSED! Added to validated ISPs ({inbox_rate:.1f}%)")
else:
print(f"{'⚠️' if result == 'partial' else ''} {isp.upper()}: {result} ({inbox_rate:.1f}%)")
conn.commit()
conn.close()
return test_id
def show_config_matrix():
conn = get_db()
cur = conn.cursor()
print("=" * 90)
print("📊 CONFIG x ISP TEST MATRIX")
print("=" * 90)
cur.execute("SELECT id, config_name, target_method, target_isps, status, inbox_rate FROM admin.winning_headers ORDER BY status DESC")
for cid, name, method, validated, status, rate in cur.fetchall():
print(f"\n🔧 [{cid}] {name[:50]}")
print(f" Method: {method} | Status: {status} | Rate: {rate or 'N/A'}%")
print(f" Validated: {', '.join(validated or [])}")
tests = get_config_test_status(cid)
if tests:
print(f" Tests: ", end='')
for isp, data in tests.items():
icon = '' if data['result'] == 'pass' else ('⚠️' if data['result'] == 'partial' else '')
print(f"{icon}{isp}", end=' ')
print()
untested = get_untested_isps(cid)
if untested:
print(f" ⏳ Untested: {', '.join(untested)}")
conn.close()
def show_cloudmark_family():
conn = get_db()
cur = conn.cursor()
print("=" * 70)
print("🔒 CLOUDMARK FAMILY (Config T-Online/GMX compatible)")
print("=" * 70)
cur.execute("""
SELECT m.isp_name, m.confidence,
EXISTS(SELECT 1 FROM admin.sending_procedures sp WHERE sp.target_isp = m.isp_name) as has_proc
FROM admin.isp_filter_mapping m
JOIN admin.filter_providers fp ON m.filter_provider_id = fp.id
WHERE fp.provider_name = 'Cloudmark'
ORDER BY m.isp_name
""")
for isp, conf, has_proc in cur.fetchall():
proc_icon = '' if has_proc else ''
print(f" {proc_icon} {isp.upper():15} ({conf})")
conn.close()
def main():
import sys
if len(sys.argv) < 2:
show_config_matrix()
print("\n")
show_cloudmark_family()
return
cmd = sys.argv[1]
if cmd == 'matrix':
show_config_matrix()
elif cmd == 'cloudmark':
show_cloudmark_family()
elif cmd == 'record':
if len(sys.argv) < 7:
print("Usage: cross-isp-tester.py record <config_id> <isp> <sent> <inbox> <spam>")
return
config_id, isp, sent, inbox, spam = int(sys.argv[2]), sys.argv[3], int(sys.argv[4]), int(sys.argv[5]), int(sys.argv[6])
conn = get_db()
cur = conn.cursor()
cur.execute("SELECT config_name FROM admin.winning_headers WHERE id = %s", (config_id,))
name = cur.fetchone()[0]
conn.close()
record_test_result(config_id, 'headers', name, isp, sent, inbox, spam)
else:
print("Commands: matrix, cloudmark, record <config_id> <isp> <sent> <inbox> <spam>")
if __name__ == '__main__':
main()