Files
wevads-platform/scripts/brain-creative-optimizer.py
2026-02-26 04:53:11 +01:00

476 lines
18 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Brain Creative Optimizer v1.0
- A/B testing with auto-promotion of winning creatives
- Translation pipeline via HAMID IA
- Offer discovery & recommendation engine
Tables created:
admin.creative_performance - per creative×offer×country stats
admin.offer_recommendations - Brain-suggested offers pending approval
"""
import psycopg2
import json
import sys
DB = {'host':'localhost','database':'adx_system','user':'admin','password':'admin123'}
SCHEMA_SQL = """
-- Creative Performance Tracking
CREATE TABLE IF NOT EXISTS admin.creative_performance (
id SERIAL PRIMARY KEY,
offer_id INTEGER NOT NULL,
creative_id INTEGER NOT NULL,
country VARCHAR(5) DEFAULT 'WW',
isp VARCHAR(30) DEFAULT 'ALL',
sends INTEGER DEFAULT 0,
opens INTEGER DEFAULT 0,
clicks INTEGER DEFAULT 0,
conversions INTEGER DEFAULT 0,
open_rate NUMERIC(5,2) DEFAULT 0,
click_rate NUMERIC(5,2) DEFAULT 0,
conversion_rate NUMERIC(5,2) DEFAULT 0,
score NUMERIC(8,2) DEFAULT 0,
is_winner BOOLEAN DEFAULT FALSE,
is_paused BOOLEAN DEFAULT FALSE,
last_calculated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(offer_id, creative_id, country, isp)
);
-- Translation Queue
CREATE TABLE IF NOT EXISTS admin.creative_translations (
id SERIAL PRIMARY KEY,
source_creative_id INTEGER NOT NULL,
source_lang VARCHAR(5) NOT NULL,
target_lang VARCHAR(5) NOT NULL,
target_offer_id INTEGER,
translated_html TEXT,
translated_subject TEXT,
translated_from_name TEXT,
status VARCHAR(20) DEFAULT 'pending', -- pending, translated, approved, deployed
translated_by VARCHAR(50), -- which AI provider
quality_check BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
deployed_at TIMESTAMP
);
-- Offer Recommendations (Brain suggests, human approves)
CREATE TABLE IF NOT EXISTS admin.offer_recommendations (
id SERIAL PRIMARY KEY,
source VARCHAR(20) NOT NULL, -- everflow, cx3, mailmagnet
external_offer_id VARCHAR(50),
offer_name TEXT,
country VARCHAR(5),
vertical VARCHAR(50),
payout NUMERIC(8,2),
tracking_url TEXT,
preview_url TEXT,
thumbnail_url TEXT,
brain_score NUMERIC(5,2) DEFAULT 0, -- Brain's confidence score
brain_reason TEXT, -- Why Brain recommends it
status VARCHAR(20) DEFAULT 'pending', -- pending, approved, rejected, imported
approved_by VARCHAR(50),
approved_at TIMESTAMP,
imported_offer_id INTEGER, -- FK to affiliate.offers after import
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_cp_offer_creative ON admin.creative_performance(offer_id, creative_id);
CREATE INDEX IF NOT EXISTS idx_cp_winner ON admin.creative_performance(is_winner) WHERE is_winner = true;
CREATE INDEX IF NOT EXISTS idx_or_status ON admin.offer_recommendations(status);
CREATE INDEX IF NOT EXISTS idx_ct_status ON admin.creative_translations(status);
"""
def init_schema():
conn = psycopg2.connect(**DB)
cur = conn.cursor()
cur.execute(SCHEMA_SQL)
conn.commit()
print("Schema created: creative_performance, creative_translations, offer_recommendations")
cur.close(); conn.close()
def calculate_performance():
"""
Calculate creative performance from unified_send_log + tracking_events.
Score formula: (clicks * 10 + opens * 1) / sends * 100
Minimum 10 sends to score.
"""
conn = psycopg2.connect(**DB)
cur = conn.cursor()
# Aggregate sends per creative×offer×country
cur.execute("""
INSERT INTO admin.creative_performance (offer_id, creative_id, country, sends, opens, clicks)
SELECT
usl.offer_id,
usl.creative_id,
COALESCE(usl.country, 'WW'),
COUNT(*) as sends,
COUNT(DISTINCT CASE WHEN te.event_type = 'open' OR te.type = 'open' THEN te.tracking_id END) as opens,
COUNT(DISTINCT CASE WHEN te.event_type = 'click' OR te.type = 'click' THEN te.tracking_id END) as clicks
FROM admin.unified_send_log_new usl
LEFT JOIN admin.tracking_events te ON te.tracking_id = usl.tracking_id
WHERE usl.offer_id IS NOT NULL AND usl.creative_id IS NOT NULL
GROUP BY usl.offer_id, usl.creative_id, COALESCE(usl.country, 'WW')
ON CONFLICT (offer_id, creative_id, country, isp)
DO UPDATE SET
sends = EXCLUDED.sends,
opens = EXCLUDED.opens,
clicks = EXCLUDED.clicks,
last_calculated = CURRENT_TIMESTAMP
""")
# Calculate rates and scores
cur.execute("""
UPDATE admin.creative_performance SET
open_rate = CASE WHEN sends > 0 THEN (opens::numeric / sends) * 100 ELSE 0 END,
click_rate = CASE WHEN sends > 0 THEN (clicks::numeric / sends) * 100 ELSE 0 END,
score = CASE WHEN sends >= 10 THEN (clicks * 10.0 + opens * 1.0) / sends * 100 ELSE 0 END,
last_calculated = CURRENT_TIMESTAMP
""")
# Mark winners: best creative per offer×country (minimum 10 sends)
cur.execute("UPDATE admin.creative_performance SET is_winner = false")
cur.execute("""
UPDATE admin.creative_performance cp SET is_winner = true
FROM (
SELECT DISTINCT ON (offer_id, country) id
FROM admin.creative_performance
WHERE sends >= 10 AND NOT is_paused
ORDER BY offer_id, country, score DESC
) winners
WHERE cp.id = winners.id
""")
# Pause low performers (< 30% of winner's score, minimum 20 sends)
cur.execute("""
UPDATE admin.creative_performance cp SET is_paused = true
FROM (
SELECT w.offer_id, w.country, w.score * 0.3 as threshold
FROM admin.creative_performance w
WHERE w.is_winner = true AND w.score > 0
) thresholds
WHERE cp.offer_id = thresholds.offer_id
AND cp.country = thresholds.country
AND cp.score < thresholds.threshold
AND cp.sends >= 20
AND NOT cp.is_winner
""")
conn.commit()
# Report
cur.execute("""
SELECT offer_id, creative_id, country, sends, opens, clicks,
open_rate, click_rate, score, is_winner, is_paused
FROM admin.creative_performance
WHERE sends > 0
ORDER BY score DESC LIMIT 20
""")
rows = cur.fetchall()
print(f"\n=== CREATIVE PERFORMANCE (Top 20) ===")
print(f"{'Offer':>6} {'Cr':>4} {'Geo':>4} {'Sends':>6} {'Opens':>6} {'Clicks':>6} {'OR%':>6} {'CR%':>6} {'Score':>7} {'W':>2} {'P':>2}")
for r in rows:
print(f"{r[0]:>6} {r[1]:>4} {r[2]:>4} {r[3]:>6} {r[4]:>6} {r[5]:>6} {r[6]:>6.1f} {r[7]:>6.1f} {r[8]:>7.1f} {'' if r[9] else ' ':>2} {'' if r[10] else ' ':>2}")
# Summary
cur.execute("SELECT COUNT(*) FROM admin.creative_performance WHERE is_winner = true")
winners = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM admin.creative_performance WHERE is_paused = true")
paused = cur.fetchone()[0]
print(f"\nWinners: {winners} | Paused: {paused}")
cur.close(); conn.close()
def get_best_creative(offer_id, country='WW'):
"""
Called by warmup-engine: returns best creative_id for this offer+country.
Falls back to random if no data yet.
"""
conn = psycopg2.connect(**DB)
cur = conn.cursor()
# Try winner for this exact offer+country
cur.execute("""
SELECT creative_id FROM admin.creative_performance
WHERE offer_id = %s AND country = %s AND is_winner = true AND NOT is_paused
LIMIT 1
""", (offer_id, country))
row = cur.fetchone()
if not row:
# Try winner for this offer any country
cur.execute("""
SELECT creative_id FROM admin.creative_performance
WHERE offer_id = %s AND is_winner = true AND NOT is_paused
ORDER BY score DESC LIMIT 1
""", (offer_id,))
row = cur.fetchone()
if not row:
# No performance data yet - return random non-paused creative
cur.execute("""
SELECT id FROM affiliate.creatives
WHERE offer_id = %s AND status = 'Activated' AND quality_score >= 3
ORDER BY RANDOM() LIMIT 1
""", (offer_id,))
row = cur.fetchone()
result = row[0] if row else None
cur.close(); conn.close()
return result
def scan_everflow_offers():
"""
Scan Everflow for new offers not yet in our system.
Score them and add to recommendations.
"""
import requests
API = "https://api.eflow.team"
KEY = "b0PPowhR3CI9EtcqtLHA"
H = {"X-Eflow-API-Key": KEY}
conn = psycopg2.connect(**DB)
cur = conn.cursor()
# Get existing offer production_ids
cur.execute("SELECT production_id FROM affiliate.offers WHERE production_id IS NOT NULL")
existing = set(r[0] for r in cur.fetchall())
# Get already-recommended
cur.execute("SELECT external_offer_id FROM admin.offer_recommendations WHERE source = 'everflow'")
recommended = set(r[0] for r in cur.fetchall())
# Fetch Everflow offers
r = requests.get(f"{API}/v1/affiliates/alloffers", headers=H, timeout=30)
offers = r.json().get('offers', [])
new_found = 0
for o in offers:
noid = str(o['network_offer_id'])
if noid in existing or noid in recommended:
continue
name = o.get('name', '')
track = o.get('tracking_url', '')
thumb = o.get('thumbnail_url', '')
# Extract country from name pattern "XX - ..."
country = name[:2] if len(name) > 2 and name[2] in [' ', '-'] else 'WW'
# Detect vertical from name
vertical = 'unknown'
name_lower = name.lower()
if any(w in name_lower for w in ['beauty','lancôme','sephora','nocibé']): vertical = 'beauty'
elif any(w in name_lower for w in ['car','auto','emergency','adac','aaa']): vertical = 'auto'
elif any(w in name_lower for w in ['gift','aldi','lidl','walmart','shein']): vertical = 'retail'
elif any(w in name_lower for w in ['health','krankenkasse','insurance']): vertical = 'health'
elif any(w in name_lower for w in ['loan','credit','finance']): vertical = 'finance'
elif any(w in name_lower for w in ['dating','girl','women','meet']): vertical = 'dating'
elif any(w in name_lower for w in ['food','coffee','restaurant','tim horton']): vertical = 'food'
# Brain scoring
score = 50.0 # base
# Priority countries boost
if country in ['DE', 'FR']: score += 20
elif country in ['UK', 'IT', 'ES']: score += 15
elif country in ['US', 'CA', 'AU']: score += 10
# High-performing verticals boost (based on our data)
if vertical in ['retail', 'auto']: score += 15
elif vertical in ['health', 'beauty']: score += 10
# Has tracking URL
if 'rivoweb.com' in track: score += 10
# Validate link
try:
lr = requests.head(track, timeout=10, allow_redirects=True)
if lr.status_code in [200, 204, 301, 302]:
score += 5
link_status = 'live'
else:
link_status = 'dead'
score -= 20
except:
link_status = 'unknown'
reason_parts = []
if country in ['DE', 'FR', 'UK']: reason_parts.append(f"High-value market ({country})")
if vertical in ['retail', 'auto']: reason_parts.append(f"Strong vertical ({vertical})")
if link_status == 'live': reason_parts.append("Link verified live")
brain_reason = '. '.join(reason_parts) if reason_parts else f"New {country} offer in {vertical}"
# Get preview URL from detail
try:
det = requests.get(f"{API}/v1/affiliates/offers/{noid}", headers=H, timeout=10).json()
preview = det.get('preview_url', '')
except:
preview = ''
cur.execute("""
INSERT INTO admin.offer_recommendations
(source, external_offer_id, offer_name, country, vertical, tracking_url,
preview_url, thumbnail_url, brain_score, brain_reason, status)
VALUES ('everflow', %s, %s, %s, %s, %s, %s, %s, %s, %s, 'pending')
""", (noid, name, country, vertical, track, preview, thumb, score, brain_reason))
new_found += 1
if score >= 70:
print(f" ⭐ RECOMMENDED #{noid} [{country}] {name[:50]} (score: {score:.0f})")
else:
print(f" 📋 Noted #{noid} [{country}] {name[:50]} (score: {score:.0f})")
conn.commit()
# Show pending recommendations sorted by score
cur.execute("""
SELECT external_offer_id, country, LEFT(offer_name, 45), vertical, brain_score, brain_reason
FROM admin.offer_recommendations
WHERE status = 'pending'
ORDER BY brain_score DESC LIMIT 15
""")
rows = cur.fetchall()
if rows:
print(f"\n=== TOP RECOMMENDATIONS (Pending Approval) ===")
for r in rows:
emoji = '' if r[4] >= 70 else '📋'
print(f" {emoji} #{r[0]} [{r[1]}] {r[2]} | {r[3]} | Score: {r[4]:.0f}")
print(f" Reason: {r[5]}")
print(f"\nNew offers found: {new_found}")
cur.close(); conn.close()
def approve_offer(rec_id):
"""Approve a recommendation and import it"""
conn = psycopg2.connect(**DB)
cur = conn.cursor()
cur.execute("SELECT * FROM admin.offer_recommendations WHERE id = %s AND status = 'pending'", (rec_id,))
rec = cur.fetchone()
if not rec:
print(f"Recommendation #{rec_id} not found or already processed")
return
cur.execute("UPDATE admin.offer_recommendations SET status = 'approved', approved_by = 'yacine', approved_at = NOW() WHERE id = %s", (rec_id,))
conn.commit()
print(f"Approved recommendation #{rec_id}. Run import-everflow-live.py to complete import.")
cur.close(); conn.close()
def generate_translation(source_creative_id, target_lang, target_offer_id=None):
"""
Queue a creative for translation via HAMID IA.
"""
conn = psycopg2.connect(**DB)
cur = conn.cursor()
# Get source creative
cur.execute("SELECT value, offer_id FROM affiliate.creatives WHERE id = %s", (source_creative_id,))
row = cur.fetchone()
if not row:
print(f"Creative #{source_creative_id} not found")
return
import base64
html = base64.b64decode(row[0]).decode('utf-8', errors='replace')
source_offer_id = row[1]
# Detect source language from HTML
source_lang = 'en'
if any(w in html.lower() for w in ['ihr', 'sie', 'jetzt', 'erhalten']): source_lang = 'de'
elif any(w in html.lower() for w in ['votre', 'vous', 'maintenant', 'offert']): source_lang = 'fr'
elif any(w in html.lower() for w in ['il tuo', 'gratuito', 'ottieni']): source_lang = 'it'
elif any(w in html.lower() for w in ['tu ', 'gratis', 'ahora', 'consíg']): source_lang = 'es'
cur.execute("""
INSERT INTO admin.creative_translations
(source_creative_id, source_lang, target_lang, target_offer_id, status)
VALUES (%s, %s, %s, %s, 'pending')
RETURNING id
""", (source_creative_id, source_lang, target_lang, target_offer_id or source_offer_id))
tid = cur.fetchone()[0]
conn.commit()
print(f"Translation queued #{tid}: Creative #{source_creative_id} ({source_lang}{target_lang})")
cur.close(); conn.close()
return tid
def show_status():
"""Show full optimizer status"""
conn = psycopg2.connect(**DB)
cur = conn.cursor()
print("=== BRAIN CREATIVE OPTIMIZER STATUS ===\n")
# Performance
cur.execute("SELECT COUNT(*), SUM(sends), SUM(opens), SUM(clicks) FROM admin.creative_performance")
r = cur.fetchone()
print(f"Performance tracking: {r[0]} entries | {r[1] or 0} sends | {r[2] or 0} opens | {r[3] or 0} clicks")
cur.execute("SELECT COUNT(*) FROM admin.creative_performance WHERE is_winner = true")
print(f"Winners identified: {cur.fetchone()[0]}")
cur.execute("SELECT COUNT(*) FROM admin.creative_performance WHERE is_paused = true")
print(f"Low performers paused: {cur.fetchone()[0]}")
# Translations
for st in ['pending', 'translated', 'deployed']:
cur.execute("SELECT COUNT(*) FROM admin.creative_translations WHERE status = %s", (st,))
print(f"Translations {st}: {cur.fetchone()[0]}")
# Recommendations
for st in ['pending', 'approved', 'rejected', 'imported']:
cur.execute("SELECT COUNT(*) FROM admin.offer_recommendations WHERE status = %s", (st,))
cnt = cur.fetchone()[0]
if cnt > 0:
print(f"Offer recommendations {st}: {cnt}")
cur.close(); conn.close()
if __name__ == '__main__':
cmd = sys.argv[1] if len(sys.argv) > 1 else 'status'
if cmd == 'init':
init_schema()
elif cmd == 'calculate':
calculate_performance()
elif cmd == 'scan':
scan_everflow_offers()
elif cmd == 'approve':
if len(sys.argv) < 3:
print("Usage: approve <recommendation_id>")
else:
approve_offer(int(sys.argv[2]))
elif cmd == 'translate':
if len(sys.argv) < 4:
print("Usage: translate <creative_id> <target_lang> [target_offer_id]")
else:
target_oid = int(sys.argv[4]) if len(sys.argv) > 4 else None
generate_translation(int(sys.argv[2]), sys.argv[3], target_oid)
elif cmd == 'best':
if len(sys.argv) < 3:
print("Usage: best <offer_id> [country]")
else:
country = sys.argv[3] if len(sys.argv) > 3 else 'WW'
best = get_best_creative(int(sys.argv[2]), country)
print(f"Best creative for offer #{sys.argv[2]} ({country}): #{best}")
elif cmd == 'status':
show_status()
else:
print("Commands: init | calculate | scan | approve <id> | translate <cr_id> <lang> | best <offer> [country] | status")