242 lines
9.2 KiB
Python
Executable File
242 lines
9.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
CROSS-ISP TESTER
|
|
Teste une config validée sur d'autres ISPs du même filter.
|
|
"""
|
|
|
|
import psycopg2
|
|
import json
|
|
from datetime import datetime
|
|
|
|
DB_CONFIG = {
|
|
'host': 'localhost',
|
|
'database': 'adx_system',
|
|
'user': 'admin',
|
|
'password': 'admin123'
|
|
}
|
|
|
|
HEADER_COMPATIBILITY = {
|
|
'mailchimp_spoof': ['gmail', 'googlemail', 'hotmail', 'outlook', 'yahoo', 'aol'],
|
|
'exchange': ['hotmail', 'outlook', 'live', 'msn'],
|
|
'newsletter': ['t-online', 'gmx', 'web.de', 'freenet', '1und1', 'spectrum', 'roadrunner', 'twc'],
|
|
'standard': ['videotron', 'bell', 'rogers', 'comcast', 'att', 'charter']
|
|
}
|
|
|
|
def get_db():
|
|
return psycopg2.connect(**DB_CONFIG)
|
|
|
|
def get_config_test_status(config_id, config_type='headers'):
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT isp_name, result, inbox_rate, test_date
|
|
FROM admin.config_isp_tests
|
|
WHERE config_type = %s AND config_id = %s
|
|
ORDER BY test_date DESC
|
|
""", (config_type, config_id))
|
|
tests = {r[0]: {'result': r[1], 'inbox_rate': r[2], 'date': r[3]} for r in cur.fetchall()}
|
|
conn.close()
|
|
return tests
|
|
|
|
def get_untested_isps(config_id, config_type='headers'):
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT target_method, target_isps FROM admin.winning_headers WHERE id = %s", (config_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
conn.close()
|
|
return []
|
|
method, validated_isps = row
|
|
validated_isps = validated_isps or []
|
|
compatible = HEADER_COMPATIBILITY.get(method, [])
|
|
cur.execute("SELECT DISTINCT isp_name FROM admin.config_isp_tests WHERE config_type = %s AND config_id = %s", (config_type, config_id))
|
|
tested = [r[0] for r in cur.fetchall()]
|
|
conn.close()
|
|
return [isp for isp in compatible if isp not in validated_isps and isp not in tested]
|
|
|
|
def record_test_result(config_id, config_type, config_name, isp, sent, inbox, spam, notes=''):
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
inbox_rate = (inbox / sent * 100) if sent > 0 else 0
|
|
result = 'pass' if inbox_rate >= 70 else ('partial' if inbox_rate >= 50 else 'fail')
|
|
cur.execute("""
|
|
INSERT INTO admin.config_isp_tests
|
|
(config_type, config_id, config_name, isp_name, emails_sent, emails_inbox, emails_spam, inbox_rate, result, notes)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""", (config_type, config_id, config_name, isp, sent, inbox, spam, inbox_rate, result, notes))
|
|
test_id = cur.fetchone()[0]
|
|
if result == 'pass':
|
|
cur.execute("""
|
|
UPDATE admin.winning_headers
|
|
SET target_isps = array_append(COALESCE(target_isps, ARRAY[]::text[]), %s)
|
|
WHERE id = %s AND NOT (%s = ANY(COALESCE(target_isps, ARRAY[]::text[])))
|
|
""", (isp, config_id, isp))
|
|
print(f"✅ {isp.upper()} PASSED! Added to validated ISPs ({inbox_rate:.1f}%)")
|
|
else:
|
|
print(f"{'⚠️' if result == 'partial' else '❌'} {isp.upper()}: {result} ({inbox_rate:.1f}%)")
|
|
conn.commit()
|
|
conn.close()
|
|
return test_id
|
|
|
|
def show_config_matrix():
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
print("=" * 90)
|
|
print("📊 CONFIG x ISP TEST MATRIX")
|
|
print("=" * 90)
|
|
cur.execute("SELECT id, config_name, target_method, target_isps, status, inbox_rate FROM admin.winning_headers ORDER BY status DESC")
|
|
for cid, name, method, validated, status, rate in cur.fetchall():
|
|
printcat > /opt/wevads/scripts/cross-isp-tester.py << 'EOFPY'
|
|
#!/usr/bin/env python3
|
|
"""
|
|
CROSS-ISP TESTER
|
|
Teste une config validée sur d'autres ISPs du même filter.
|
|
"""
|
|
|
|
import psycopg2
|
|
import json
|
|
from datetime import datetime
|
|
|
|
DB_CONFIG = {
|
|
'host': 'localhost',
|
|
'database': 'adx_system',
|
|
'user': 'admin',
|
|
'password': 'admin123'
|
|
}
|
|
|
|
HEADER_COMPATIBILITY = {
|
|
'mailchimp_spoof': ['gmail', 'googlemail', 'hotmail', 'outlook', 'yahoo', 'aol'],
|
|
'exchange': ['hotmail', 'outlook', 'live', 'msn'],
|
|
'newsletter': ['t-online', 'gmx', 'web.de', 'freenet', '1und1', 'spectrum', 'roadrunner', 'twc'],
|
|
'standard': ['videotron', 'bell', 'rogers', 'comcast', 'att', 'charter']
|
|
}
|
|
|
|
def get_db():
|
|
return psycopg2.connect(**DB_CONFIG)
|
|
|
|
def get_config_test_status(config_id, config_type='headers'):
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT isp_name, result, inbox_rate, test_date
|
|
FROM admin.config_isp_tests
|
|
WHERE config_type = %s AND config_id = %s
|
|
ORDER BY test_date DESC
|
|
""", (config_type, config_id))
|
|
tests = {r[0]: {'result': r[1], 'inbox_rate': r[2], 'date': r[3]} for r in cur.fetchall()}
|
|
conn.close()
|
|
return tests
|
|
|
|
def get_untested_isps(config_id, config_type='headers'):
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT target_method, target_isps FROM admin.winning_headers WHERE id = %s", (config_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
conn.close()
|
|
return []
|
|
method, validated_isps = row
|
|
validated_isps = validated_isps or []
|
|
compatible = HEADER_COMPATIBILITY.get(method, [])
|
|
cur.execute("SELECT DISTINCT isp_name FROM admin.config_isp_tests WHERE config_type = %s AND config_id = %s", (config_type, config_id))
|
|
tested = [r[0] for r in cur.fetchall()]
|
|
conn.close()
|
|
return [isp for isp in compatible if isp not in validated_isps and isp not in tested]
|
|
|
|
def record_test_result(config_id, config_type, config_name, isp, sent, inbox, spam, notes=''):
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
inbox_rate = (inbox / sent * 100) if sent > 0 else 0
|
|
result = 'pass' if inbox_rate >= 70 else ('partial' if inbox_rate >= 50 else 'fail')
|
|
cur.execute("""
|
|
INSERT INTO admin.config_isp_tests
|
|
(config_type, config_id, config_name, isp_name, emails_sent, emails_inbox, emails_spam, inbox_rate, result, notes)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""", (config_type, config_id, config_name, isp, sent, inbox, spam, inbox_rate, result, notes))
|
|
test_id = cur.fetchone()[0]
|
|
if result == 'pass':
|
|
cur.execute("""
|
|
UPDATE admin.winning_headers
|
|
SET target_isps = array_append(COALESCE(target_isps, ARRAY[]::text[]), %s)
|
|
WHERE id = %s AND NOT (%s = ANY(COALESCE(target_isps, ARRAY[]::text[])))
|
|
""", (isp, config_id, isp))
|
|
print(f"✅ {isp.upper()} PASSED! Added to validated ISPs ({inbox_rate:.1f}%)")
|
|
else:
|
|
print(f"{'⚠️' if result == 'partial' else '❌'} {isp.upper()}: {result} ({inbox_rate:.1f}%)")
|
|
conn.commit()
|
|
conn.close()
|
|
return test_id
|
|
|
|
def show_config_matrix():
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
print("=" * 90)
|
|
print("📊 CONFIG x ISP TEST MATRIX")
|
|
print("=" * 90)
|
|
cur.execute("SELECT id, config_name, target_method, target_isps, status, inbox_rate FROM admin.winning_headers ORDER BY status DESC")
|
|
for cid, name, method, validated, status, rate in cur.fetchall():
|
|
print(f"\n🔧 [{cid}] {name[:50]}")
|
|
print(f" Method: {method} | Status: {status} | Rate: {rate or 'N/A'}%")
|
|
print(f" Validated: {', '.join(validated or [])}")
|
|
tests = get_config_test_status(cid)
|
|
if tests:
|
|
print(f" Tests: ", end='')
|
|
for isp, data in tests.items():
|
|
icon = '✅' if data['result'] == 'pass' else ('⚠️' if data['result'] == 'partial' else '❌')
|
|
print(f"{icon}{isp}", end=' ')
|
|
print()
|
|
untested = get_untested_isps(cid)
|
|
if untested:
|
|
print(f" ⏳ Untested: {', '.join(untested)}")
|
|
conn.close()
|
|
|
|
def show_cloudmark_family():
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
print("=" * 70)
|
|
print("🔒 CLOUDMARK FAMILY (Config T-Online/GMX compatible)")
|
|
print("=" * 70)
|
|
cur.execute("""
|
|
SELECT m.isp_name, m.confidence,
|
|
EXISTS(SELECT 1 FROM admin.sending_procedures sp WHERE sp.target_isp = m.isp_name) as has_proc
|
|
FROM admin.isp_filter_mapping m
|
|
JOIN admin.filter_providers fp ON m.filter_provider_id = fp.id
|
|
WHERE fp.provider_name = 'Cloudmark'
|
|
ORDER BY m.isp_name
|
|
""")
|
|
for isp, conf, has_proc in cur.fetchall():
|
|
proc_icon = '✅' if has_proc else '⏳'
|
|
print(f" {proc_icon} {isp.upper():15} ({conf})")
|
|
conn.close()
|
|
|
|
def main():
|
|
import sys
|
|
if len(sys.argv) < 2:
|
|
show_config_matrix()
|
|
print("\n")
|
|
show_cloudmark_family()
|
|
return
|
|
cmd = sys.argv[1]
|
|
if cmd == 'matrix':
|
|
show_config_matrix()
|
|
elif cmd == 'cloudmark':
|
|
show_cloudmark_family()
|
|
elif cmd == 'record':
|
|
if len(sys.argv) < 7:
|
|
print("Usage: cross-isp-tester.py record <config_id> <isp> <sent> <inbox> <spam>")
|
|
return
|
|
config_id, isp, sent, inbox, spam = int(sys.argv[2]), sys.argv[3], int(sys.argv[4]), int(sys.argv[5]), int(sys.argv[6])
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT config_name FROM admin.winning_headers WHERE id = %s", (config_id,))
|
|
name = cur.fetchone()[0]
|
|
conn.close()
|
|
record_test_result(config_id, 'headers', name, isp, sent, inbox, spam)
|
|
else:
|
|
print("Commands: matrix, cloudmark, record <config_id> <isp> <sent> <inbox> <spam>")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|