#!/usr/bin/env python3 """ CREATIVE PERFORMANCE ENGINE - Aggregates send/open/click per creative per offer per country/ISP - Detects winners automatically - Feeds back into Quality Guard for smart selection - Run via cron every hour or on-demand """ import psycopg2, json, sys from datetime import datetime, timedelta DB = {'host':'localhost','database':'adx_system','user':'admin','password':'admin123'} def aggregate_performance(): """Join send_log + tracking_events to compute creative performance""" conn = psycopg2.connect(**DB) cur = conn.cursor() print(f"[{datetime.now()}] Aggregating creative performance...") # Aggregate sends per offer/creative/country/isp cur.execute(""" INSERT INTO admin.creative_performance (offer_id, creative_id, country, isp, sends, opens, clicks, last_updated) SELECT s.offer_id, s.creative_id, COALESCE(s.country, 'XX') as country, s.isp_target as isp, COUNT(*) as sends, COUNT(DISTINCT CASE WHEN te.event_type = 'open' THEN te.tracking_id END) as opens, COUNT(DISTINCT CASE WHEN te.event_type = 'click' THEN te.tracking_id END) as clicks, NOW() FROM admin.unified_send_log_new s LEFT JOIN admin.tracking_events te ON te.tracking_id = s.tracking_id WHERE s.offer_id IS NOT NULL AND s.creative_id IS NOT NULL AND s.status = 'sent' GROUP BY s.offer_id, s.creative_id, s.country, s.isp_target ON CONFLICT (offer_id, creative_id, country, isp) DO UPDATE SET sends = EXCLUDED.sends, opens = EXCLUDED.opens, clicks = EXCLUDED.clicks, open_rate = CASE WHEN EXCLUDED.sends > 0 THEN ROUND(100.0 * EXCLUDED.opens / EXCLUDED.sends, 2) ELSE 0 END, click_rate = CASE WHEN EXCLUDED.sends > 0 THEN ROUND(100.0 * EXCLUDED.clicks / EXCLUDED.sends, 2) ELSE 0 END, score = CASE WHEN EXCLUDED.sends > 0 THEN ROUND(50.0 * EXCLUDED.opens / EXCLUDED.sends + 50.0 * EXCLUDED.clicks / GREATEST(EXCLUDED.opens, 1), 2) ELSE 0 END, last_updated = NOW() """) # Also aggregate subject performance cur.execute(""" INSERT INTO admin.subject_performance (offer_id, subject_text, country, isp, sends, opens, clicks, last_updated) SELECT s.offer_id, s.subject, COALESCE(s.country, 'XX') as country, s.isp_target as isp, COUNT(*) as sends, COUNT(DISTINCT CASE WHEN te.event_type = 'open' THEN te.tracking_id END) as opens, COUNT(DISTINCT CASE WHEN te.event_type = 'click' THEN te.tracking_id END) as clicks, NOW() FROM admin.unified_send_log_new s LEFT JOIN admin.tracking_events te ON te.tracking_id = s.tracking_id WHERE s.offer_id IS NOT NULL AND s.subject IS NOT NULL AND s.subject != '' AND s.status = 'sent' GROUP BY s.offer_id, s.subject, s.country, s.isp_target ON CONFLICT (offer_id, subject_text, country, isp) DO UPDATE SET sends = EXCLUDED.sends, opens = EXCLUDED.opens, clicks = EXCLUDED.clicks, open_rate = CASE WHEN EXCLUDED.sends > 0 THEN ROUND(100.0 * EXCLUDED.opens / EXCLUDED.sends, 2) ELSE 0 END, last_updated = NOW() """) conn.commit() # Count results cur.execute("SELECT COUNT(*) FROM admin.creative_performance WHERE sends > 0") cp_count = cur.fetchone()[0] cur.execute("SELECT COUNT(*) FROM admin.subject_performance WHERE sends > 0") sp_count = cur.fetchone()[0] print(f" Creative records: {cp_count}") print(f" Subject records: {sp_count}") cur.close(); conn.close() return {'creative_records': cp_count, 'subject_records': sp_count} def detect_winners(min_sends=20): """Auto-detect winning creatives per offer/country/ISP""" conn = psycopg2.connect(**DB) cur = conn.cursor() print(f"\n[{datetime.now()}] Detecting winners (min {min_sends} sends)...") # Reset all winners cur.execute("UPDATE admin.creative_performance SET is_winner = false") cur.execute("UPDATE admin.subject_performance SET is_winner = false") # Find best creative per offer+country+isp (by score) cur.execute(""" WITH ranked AS ( SELECT id, offer_id, creative_id, country, isp, sends, score, ROW_NUMBER() OVER (PARTITION BY offer_id, country, isp ORDER BY score DESC) as rn FROM admin.creative_performance WHERE sends >= %s ) UPDATE admin.creative_performance cp SET is_winner = true FROM ranked r WHERE cp.id = r.id AND r.rn = 1 """, (min_sends,)) winners_cr = cur.rowcount # Find best subject per offer+country+isp (by open_rate) cur.execute(""" WITH ranked AS ( SELECT id, offer_id, subject_text, country, isp, sends, open_rate, ROW_NUMBER() OVER (PARTITION BY offer_id, country, isp ORDER BY open_rate DESC) as rn FROM admin.subject_performance WHERE sends >= %s ) UPDATE admin.subject_performance sp SET is_winner = true FROM ranked r WHERE sp.id = r.id AND r.rn = 1 """, (min_sends,)) winners_subj = cur.rowcount conn.commit() # Show winners cur.execute(""" SELECT cp.offer_id, o.name, cp.creative_id, cp.country, cp.isp, cp.sends, cp.opens, cp.clicks, cp.score FROM admin.creative_performance cp JOIN affiliate.offers o ON o.id = cp.offer_id WHERE cp.is_winner = true ORDER BY cp.score DESC """) print(f"\n Creative winners: {winners_cr}") for row in cur.fetchall(): print(f" Offer #{row[0]} ({row[1][:30]}) | Creative #{row[2]} | {row[3]}/{row[4]} | " f"Sends:{row[5]} Opens:{row[6]} Clicks:{row[7]} Score:{row[8]}") cur.execute(""" SELECT sp.offer_id, sp.subject_text, sp.country, sp.isp, sp.sends, sp.opens, sp.open_rate FROM admin.subject_performance sp WHERE sp.is_winner = true ORDER BY sp.open_rate DESC """) print(f"\n Subject winners: {winners_subj}") for row in cur.fetchall(): print(f" Offer #{row[0]} | \"{row[1][:40]}\" | {row[2]}/{row[3]} | " f"Sends:{row[4]} Opens:{row[5]} Rate:{row[6]}%") cur.close(); conn.close() return {'creative_winners': winners_cr, 'subject_winners': winners_subj} def report(): """Generate performance report""" conn = psycopg2.connect(**DB) cur = conn.cursor() print(f"\n{'='*60}") print(f" CREATIVE PERFORMANCE REPORT") print(f"{'='*60}") # Overall stats cur.execute(""" SELECT COUNT(DISTINCT offer_id) as offers, COUNT(DISTINCT creative_id) as creatives, SUM(sends) as total_sends, SUM(opens) as total_opens, SUM(clicks) as total_clicks, CASE WHEN SUM(sends) > 0 THEN ROUND(100.0 * SUM(opens) / SUM(sends), 2) ELSE 0 END as avg_open_rate, CASE WHEN SUM(sends) > 0 THEN ROUND(100.0 * SUM(clicks) / SUM(sends), 2) ELSE 0 END as avg_click_rate FROM admin.creative_performance """) row = cur.fetchone() print(f"\n Offers tracked: {row[0]}") print(f" Creatives tracked: {row[1]}") print(f" Total sends: {row[2]}") print(f" Total opens: {row[3]} ({row[5]}%)") print(f" Total clicks: {row[4]} ({row[6]}%)") # Per country cur.execute(""" SELECT country, SUM(sends), SUM(opens), SUM(clicks), CASE WHEN SUM(sends) > 0 THEN ROUND(100.0 * SUM(clicks) / SUM(sends), 2) ELSE 0 END as ctr FROM admin.creative_performance GROUP BY country ORDER BY SUM(sends) DESC """) print(f"\n By Country:") for row in cur.fetchall(): print(f" {row[0]}: {row[1]} sends, {row[2]} opens, {row[3]} clicks ({row[4]}% CTR)") # Best performing offers cur.execute(""" SELECT cp.offer_id, LEFT(o.name, 40), SUM(cp.sends), SUM(cp.clicks), CASE WHEN SUM(cp.sends) > 0 THEN ROUND(100.0 * SUM(cp.clicks) / SUM(cp.sends), 2) ELSE 0 END as ctr FROM admin.creative_performance cp JOIN affiliate.offers o ON o.id = cp.offer_id GROUP BY cp.offer_id, o.name ORDER BY ctr DESC """) print(f"\n By Offer (CTR):") for row in cur.fetchall(): print(f" #{row[0]} {row[1]}: {row[2]} sends, {row[3]} clicks ({row[4]}% CTR)") cur.close(); conn.close() if __name__ == '__main__': action = sys.argv[1] if len(sys.argv) > 1 else 'all' if action in ['all', 'aggregate']: aggregate_performance() if action in ['all', 'winners']: detect_winners(min_sends=int(sys.argv[2]) if len(sys.argv) > 2 else 20) if action in ['all', 'report']: report()