476 lines
18 KiB
Python
Executable File
476 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
Brain Creative Optimizer v1.0
|
||
- A/B testing with auto-promotion of winning creatives
|
||
- Translation pipeline via HAMID IA
|
||
- Offer discovery & recommendation engine
|
||
|
||
Tables created:
|
||
admin.creative_performance - per creative×offer×country stats
|
||
admin.offer_recommendations - Brain-suggested offers pending approval
|
||
"""
|
||
|
||
import psycopg2
|
||
import json
|
||
import sys
|
||
|
||
DB = {'host':'localhost','database':'adx_system','user':'admin','password':'admin123'}
|
||
|
||
SCHEMA_SQL = """
|
||
-- Creative Performance Tracking
|
||
CREATE TABLE IF NOT EXISTS admin.creative_performance (
|
||
id SERIAL PRIMARY KEY,
|
||
offer_id INTEGER NOT NULL,
|
||
creative_id INTEGER NOT NULL,
|
||
country VARCHAR(5) DEFAULT 'WW',
|
||
isp VARCHAR(30) DEFAULT 'ALL',
|
||
sends INTEGER DEFAULT 0,
|
||
opens INTEGER DEFAULT 0,
|
||
clicks INTEGER DEFAULT 0,
|
||
conversions INTEGER DEFAULT 0,
|
||
open_rate NUMERIC(5,2) DEFAULT 0,
|
||
click_rate NUMERIC(5,2) DEFAULT 0,
|
||
conversion_rate NUMERIC(5,2) DEFAULT 0,
|
||
score NUMERIC(8,2) DEFAULT 0,
|
||
is_winner BOOLEAN DEFAULT FALSE,
|
||
is_paused BOOLEAN DEFAULT FALSE,
|
||
last_calculated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
UNIQUE(offer_id, creative_id, country, isp)
|
||
);
|
||
|
||
-- Translation Queue
|
||
CREATE TABLE IF NOT EXISTS admin.creative_translations (
|
||
id SERIAL PRIMARY KEY,
|
||
source_creative_id INTEGER NOT NULL,
|
||
source_lang VARCHAR(5) NOT NULL,
|
||
target_lang VARCHAR(5) NOT NULL,
|
||
target_offer_id INTEGER,
|
||
translated_html TEXT,
|
||
translated_subject TEXT,
|
||
translated_from_name TEXT,
|
||
status VARCHAR(20) DEFAULT 'pending', -- pending, translated, approved, deployed
|
||
translated_by VARCHAR(50), -- which AI provider
|
||
quality_check BOOLEAN DEFAULT FALSE,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
deployed_at TIMESTAMP
|
||
);
|
||
|
||
-- Offer Recommendations (Brain suggests, human approves)
|
||
CREATE TABLE IF NOT EXISTS admin.offer_recommendations (
|
||
id SERIAL PRIMARY KEY,
|
||
source VARCHAR(20) NOT NULL, -- everflow, cx3, mailmagnet
|
||
external_offer_id VARCHAR(50),
|
||
offer_name TEXT,
|
||
country VARCHAR(5),
|
||
vertical VARCHAR(50),
|
||
payout NUMERIC(8,2),
|
||
tracking_url TEXT,
|
||
preview_url TEXT,
|
||
thumbnail_url TEXT,
|
||
brain_score NUMERIC(5,2) DEFAULT 0, -- Brain's confidence score
|
||
brain_reason TEXT, -- Why Brain recommends it
|
||
status VARCHAR(20) DEFAULT 'pending', -- pending, approved, rejected, imported
|
||
approved_by VARCHAR(50),
|
||
approved_at TIMESTAMP,
|
||
imported_offer_id INTEGER, -- FK to affiliate.offers after import
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_cp_offer_creative ON admin.creative_performance(offer_id, creative_id);
|
||
CREATE INDEX IF NOT EXISTS idx_cp_winner ON admin.creative_performance(is_winner) WHERE is_winner = true;
|
||
CREATE INDEX IF NOT EXISTS idx_or_status ON admin.offer_recommendations(status);
|
||
CREATE INDEX IF NOT EXISTS idx_ct_status ON admin.creative_translations(status);
|
||
"""
|
||
|
||
def init_schema():
|
||
conn = psycopg2.connect(**DB)
|
||
cur = conn.cursor()
|
||
cur.execute(SCHEMA_SQL)
|
||
conn.commit()
|
||
print("Schema created: creative_performance, creative_translations, offer_recommendations")
|
||
cur.close(); conn.close()
|
||
|
||
|
||
def calculate_performance():
|
||
"""
|
||
Calculate creative performance from unified_send_log + tracking_events.
|
||
Score formula: (clicks * 10 + opens * 1) / sends * 100
|
||
Minimum 10 sends to score.
|
||
"""
|
||
conn = psycopg2.connect(**DB)
|
||
cur = conn.cursor()
|
||
|
||
# Aggregate sends per creative×offer×country
|
||
cur.execute("""
|
||
INSERT INTO admin.creative_performance (offer_id, creative_id, country, sends, opens, clicks)
|
||
SELECT
|
||
usl.offer_id,
|
||
usl.creative_id,
|
||
COALESCE(usl.country, 'WW'),
|
||
COUNT(*) as sends,
|
||
COUNT(DISTINCT CASE WHEN te.event_type = 'open' OR te.type = 'open' THEN te.tracking_id END) as opens,
|
||
COUNT(DISTINCT CASE WHEN te.event_type = 'click' OR te.type = 'click' THEN te.tracking_id END) as clicks
|
||
FROM admin.unified_send_log_new usl
|
||
LEFT JOIN admin.tracking_events te ON te.tracking_id = usl.tracking_id
|
||
WHERE usl.offer_id IS NOT NULL AND usl.creative_id IS NOT NULL
|
||
GROUP BY usl.offer_id, usl.creative_id, COALESCE(usl.country, 'WW')
|
||
ON CONFLICT (offer_id, creative_id, country, isp)
|
||
DO UPDATE SET
|
||
sends = EXCLUDED.sends,
|
||
opens = EXCLUDED.opens,
|
||
clicks = EXCLUDED.clicks,
|
||
last_calculated = CURRENT_TIMESTAMP
|
||
""")
|
||
|
||
# Calculate rates and scores
|
||
cur.execute("""
|
||
UPDATE admin.creative_performance SET
|
||
open_rate = CASE WHEN sends > 0 THEN (opens::numeric / sends) * 100 ELSE 0 END,
|
||
click_rate = CASE WHEN sends > 0 THEN (clicks::numeric / sends) * 100 ELSE 0 END,
|
||
score = CASE WHEN sends >= 10 THEN (clicks * 10.0 + opens * 1.0) / sends * 100 ELSE 0 END,
|
||
last_calculated = CURRENT_TIMESTAMP
|
||
""")
|
||
|
||
# Mark winners: best creative per offer×country (minimum 10 sends)
|
||
cur.execute("UPDATE admin.creative_performance SET is_winner = false")
|
||
cur.execute("""
|
||
UPDATE admin.creative_performance cp SET is_winner = true
|
||
FROM (
|
||
SELECT DISTINCT ON (offer_id, country) id
|
||
FROM admin.creative_performance
|
||
WHERE sends >= 10 AND NOT is_paused
|
||
ORDER BY offer_id, country, score DESC
|
||
) winners
|
||
WHERE cp.id = winners.id
|
||
""")
|
||
|
||
# Pause low performers (< 30% of winner's score, minimum 20 sends)
|
||
cur.execute("""
|
||
UPDATE admin.creative_performance cp SET is_paused = true
|
||
FROM (
|
||
SELECT w.offer_id, w.country, w.score * 0.3 as threshold
|
||
FROM admin.creative_performance w
|
||
WHERE w.is_winner = true AND w.score > 0
|
||
) thresholds
|
||
WHERE cp.offer_id = thresholds.offer_id
|
||
AND cp.country = thresholds.country
|
||
AND cp.score < thresholds.threshold
|
||
AND cp.sends >= 20
|
||
AND NOT cp.is_winner
|
||
""")
|
||
|
||
conn.commit()
|
||
|
||
# Report
|
||
cur.execute("""
|
||
SELECT offer_id, creative_id, country, sends, opens, clicks,
|
||
open_rate, click_rate, score, is_winner, is_paused
|
||
FROM admin.creative_performance
|
||
WHERE sends > 0
|
||
ORDER BY score DESC LIMIT 20
|
||
""")
|
||
rows = cur.fetchall()
|
||
|
||
print(f"\n=== CREATIVE PERFORMANCE (Top 20) ===")
|
||
print(f"{'Offer':>6} {'Cr':>4} {'Geo':>4} {'Sends':>6} {'Opens':>6} {'Clicks':>6} {'OR%':>6} {'CR%':>6} {'Score':>7} {'W':>2} {'P':>2}")
|
||
for r in rows:
|
||
print(f"{r[0]:>6} {r[1]:>4} {r[2]:>4} {r[3]:>6} {r[4]:>6} {r[5]:>6} {r[6]:>6.1f} {r[7]:>6.1f} {r[8]:>7.1f} {'✓' if r[9] else ' ':>2} {'✗' if r[10] else ' ':>2}")
|
||
|
||
# Summary
|
||
cur.execute("SELECT COUNT(*) FROM admin.creative_performance WHERE is_winner = true")
|
||
winners = cur.fetchone()[0]
|
||
cur.execute("SELECT COUNT(*) FROM admin.creative_performance WHERE is_paused = true")
|
||
paused = cur.fetchone()[0]
|
||
print(f"\nWinners: {winners} | Paused: {paused}")
|
||
|
||
cur.close(); conn.close()
|
||
|
||
|
||
def get_best_creative(offer_id, country='WW'):
|
||
"""
|
||
Called by warmup-engine: returns best creative_id for this offer+country.
|
||
Falls back to random if no data yet.
|
||
"""
|
||
conn = psycopg2.connect(**DB)
|
||
cur = conn.cursor()
|
||
|
||
# Try winner for this exact offer+country
|
||
cur.execute("""
|
||
SELECT creative_id FROM admin.creative_performance
|
||
WHERE offer_id = %s AND country = %s AND is_winner = true AND NOT is_paused
|
||
LIMIT 1
|
||
""", (offer_id, country))
|
||
row = cur.fetchone()
|
||
|
||
if not row:
|
||
# Try winner for this offer any country
|
||
cur.execute("""
|
||
SELECT creative_id FROM admin.creative_performance
|
||
WHERE offer_id = %s AND is_winner = true AND NOT is_paused
|
||
ORDER BY score DESC LIMIT 1
|
||
""", (offer_id,))
|
||
row = cur.fetchone()
|
||
|
||
if not row:
|
||
# No performance data yet - return random non-paused creative
|
||
cur.execute("""
|
||
SELECT id FROM affiliate.creatives
|
||
WHERE offer_id = %s AND status = 'Activated' AND quality_score >= 3
|
||
ORDER BY RANDOM() LIMIT 1
|
||
""", (offer_id,))
|
||
row = cur.fetchone()
|
||
|
||
result = row[0] if row else None
|
||
cur.close(); conn.close()
|
||
return result
|
||
|
||
|
||
def scan_everflow_offers():
|
||
"""
|
||
Scan Everflow for new offers not yet in our system.
|
||
Score them and add to recommendations.
|
||
"""
|
||
import requests
|
||
|
||
API = "https://api.eflow.team"
|
||
KEY = "b0PPowhR3CI9EtcqtLHA"
|
||
H = {"X-Eflow-API-Key": KEY}
|
||
|
||
conn = psycopg2.connect(**DB)
|
||
cur = conn.cursor()
|
||
|
||
# Get existing offer production_ids
|
||
cur.execute("SELECT production_id FROM affiliate.offers WHERE production_id IS NOT NULL")
|
||
existing = set(r[0] for r in cur.fetchall())
|
||
|
||
# Get already-recommended
|
||
cur.execute("SELECT external_offer_id FROM admin.offer_recommendations WHERE source = 'everflow'")
|
||
recommended = set(r[0] for r in cur.fetchall())
|
||
|
||
# Fetch Everflow offers
|
||
r = requests.get(f"{API}/v1/affiliates/alloffers", headers=H, timeout=30)
|
||
offers = r.json().get('offers', [])
|
||
|
||
new_found = 0
|
||
for o in offers:
|
||
noid = str(o['network_offer_id'])
|
||
if noid in existing or noid in recommended:
|
||
continue
|
||
|
||
name = o.get('name', '')
|
||
track = o.get('tracking_url', '')
|
||
thumb = o.get('thumbnail_url', '')
|
||
|
||
# Extract country from name pattern "XX - ..."
|
||
country = name[:2] if len(name) > 2 and name[2] in [' ', '-'] else 'WW'
|
||
|
||
# Detect vertical from name
|
||
vertical = 'unknown'
|
||
name_lower = name.lower()
|
||
if any(w in name_lower for w in ['beauty','lancôme','sephora','nocibé']): vertical = 'beauty'
|
||
elif any(w in name_lower for w in ['car','auto','emergency','adac','aaa']): vertical = 'auto'
|
||
elif any(w in name_lower for w in ['gift','aldi','lidl','walmart','shein']): vertical = 'retail'
|
||
elif any(w in name_lower for w in ['health','krankenkasse','insurance']): vertical = 'health'
|
||
elif any(w in name_lower for w in ['loan','credit','finance']): vertical = 'finance'
|
||
elif any(w in name_lower for w in ['dating','girl','women','meet']): vertical = 'dating'
|
||
elif any(w in name_lower for w in ['food','coffee','restaurant','tim horton']): vertical = 'food'
|
||
|
||
# Brain scoring
|
||
score = 50.0 # base
|
||
|
||
# Priority countries boost
|
||
if country in ['DE', 'FR']: score += 20
|
||
elif country in ['UK', 'IT', 'ES']: score += 15
|
||
elif country in ['US', 'CA', 'AU']: score += 10
|
||
|
||
# High-performing verticals boost (based on our data)
|
||
if vertical in ['retail', 'auto']: score += 15
|
||
elif vertical in ['health', 'beauty']: score += 10
|
||
|
||
# Has tracking URL
|
||
if 'rivoweb.com' in track: score += 10
|
||
|
||
# Validate link
|
||
try:
|
||
lr = requests.head(track, timeout=10, allow_redirects=True)
|
||
if lr.status_code in [200, 204, 301, 302]:
|
||
score += 5
|
||
link_status = 'live'
|
||
else:
|
||
link_status = 'dead'
|
||
score -= 20
|
||
except:
|
||
link_status = 'unknown'
|
||
|
||
reason_parts = []
|
||
if country in ['DE', 'FR', 'UK']: reason_parts.append(f"High-value market ({country})")
|
||
if vertical in ['retail', 'auto']: reason_parts.append(f"Strong vertical ({vertical})")
|
||
if link_status == 'live': reason_parts.append("Link verified live")
|
||
brain_reason = '. '.join(reason_parts) if reason_parts else f"New {country} offer in {vertical}"
|
||
|
||
# Get preview URL from detail
|
||
try:
|
||
det = requests.get(f"{API}/v1/affiliates/offers/{noid}", headers=H, timeout=10).json()
|
||
preview = det.get('preview_url', '')
|
||
except:
|
||
preview = ''
|
||
|
||
cur.execute("""
|
||
INSERT INTO admin.offer_recommendations
|
||
(source, external_offer_id, offer_name, country, vertical, tracking_url,
|
||
preview_url, thumbnail_url, brain_score, brain_reason, status)
|
||
VALUES ('everflow', %s, %s, %s, %s, %s, %s, %s, %s, %s, 'pending')
|
||
""", (noid, name, country, vertical, track, preview, thumb, score, brain_reason))
|
||
|
||
new_found += 1
|
||
if score >= 70:
|
||
print(f" ⭐ RECOMMENDED #{noid} [{country}] {name[:50]} (score: {score:.0f})")
|
||
else:
|
||
print(f" 📋 Noted #{noid} [{country}] {name[:50]} (score: {score:.0f})")
|
||
|
||
conn.commit()
|
||
|
||
# Show pending recommendations sorted by score
|
||
cur.execute("""
|
||
SELECT external_offer_id, country, LEFT(offer_name, 45), vertical, brain_score, brain_reason
|
||
FROM admin.offer_recommendations
|
||
WHERE status = 'pending'
|
||
ORDER BY brain_score DESC LIMIT 15
|
||
""")
|
||
rows = cur.fetchall()
|
||
if rows:
|
||
print(f"\n=== TOP RECOMMENDATIONS (Pending Approval) ===")
|
||
for r in rows:
|
||
emoji = '⭐' if r[4] >= 70 else '📋'
|
||
print(f" {emoji} #{r[0]} [{r[1]}] {r[2]} | {r[3]} | Score: {r[4]:.0f}")
|
||
print(f" Reason: {r[5]}")
|
||
|
||
print(f"\nNew offers found: {new_found}")
|
||
cur.close(); conn.close()
|
||
|
||
|
||
def approve_offer(rec_id):
|
||
"""Approve a recommendation and import it"""
|
||
conn = psycopg2.connect(**DB)
|
||
cur = conn.cursor()
|
||
|
||
cur.execute("SELECT * FROM admin.offer_recommendations WHERE id = %s AND status = 'pending'", (rec_id,))
|
||
rec = cur.fetchone()
|
||
if not rec:
|
||
print(f"Recommendation #{rec_id} not found or already processed")
|
||
return
|
||
|
||
cur.execute("UPDATE admin.offer_recommendations SET status = 'approved', approved_by = 'yacine', approved_at = NOW() WHERE id = %s", (rec_id,))
|
||
conn.commit()
|
||
print(f"Approved recommendation #{rec_id}. Run import-everflow-live.py to complete import.")
|
||
|
||
cur.close(); conn.close()
|
||
|
||
|
||
def generate_translation(source_creative_id, target_lang, target_offer_id=None):
|
||
"""
|
||
Queue a creative for translation via HAMID IA.
|
||
"""
|
||
conn = psycopg2.connect(**DB)
|
||
cur = conn.cursor()
|
||
|
||
# Get source creative
|
||
cur.execute("SELECT value, offer_id FROM affiliate.creatives WHERE id = %s", (source_creative_id,))
|
||
row = cur.fetchone()
|
||
if not row:
|
||
print(f"Creative #{source_creative_id} not found")
|
||
return
|
||
|
||
import base64
|
||
html = base64.b64decode(row[0]).decode('utf-8', errors='replace')
|
||
source_offer_id = row[1]
|
||
|
||
# Detect source language from HTML
|
||
source_lang = 'en'
|
||
if any(w in html.lower() for w in ['ihr', 'sie', 'jetzt', 'erhalten']): source_lang = 'de'
|
||
elif any(w in html.lower() for w in ['votre', 'vous', 'maintenant', 'offert']): source_lang = 'fr'
|
||
elif any(w in html.lower() for w in ['il tuo', 'gratuito', 'ottieni']): source_lang = 'it'
|
||
elif any(w in html.lower() for w in ['tu ', 'gratis', 'ahora', 'consíg']): source_lang = 'es'
|
||
|
||
cur.execute("""
|
||
INSERT INTO admin.creative_translations
|
||
(source_creative_id, source_lang, target_lang, target_offer_id, status)
|
||
VALUES (%s, %s, %s, %s, 'pending')
|
||
RETURNING id
|
||
""", (source_creative_id, source_lang, target_lang, target_offer_id or source_offer_id))
|
||
|
||
tid = cur.fetchone()[0]
|
||
conn.commit()
|
||
print(f"Translation queued #{tid}: Creative #{source_creative_id} ({source_lang} → {target_lang})")
|
||
|
||
cur.close(); conn.close()
|
||
return tid
|
||
|
||
|
||
def show_status():
|
||
"""Show full optimizer status"""
|
||
conn = psycopg2.connect(**DB)
|
||
cur = conn.cursor()
|
||
|
||
print("=== BRAIN CREATIVE OPTIMIZER STATUS ===\n")
|
||
|
||
# Performance
|
||
cur.execute("SELECT COUNT(*), SUM(sends), SUM(opens), SUM(clicks) FROM admin.creative_performance")
|
||
r = cur.fetchone()
|
||
print(f"Performance tracking: {r[0]} entries | {r[1] or 0} sends | {r[2] or 0} opens | {r[3] or 0} clicks")
|
||
|
||
cur.execute("SELECT COUNT(*) FROM admin.creative_performance WHERE is_winner = true")
|
||
print(f"Winners identified: {cur.fetchone()[0]}")
|
||
|
||
cur.execute("SELECT COUNT(*) FROM admin.creative_performance WHERE is_paused = true")
|
||
print(f"Low performers paused: {cur.fetchone()[0]}")
|
||
|
||
# Translations
|
||
for st in ['pending', 'translated', 'deployed']:
|
||
cur.execute("SELECT COUNT(*) FROM admin.creative_translations WHERE status = %s", (st,))
|
||
print(f"Translations {st}: {cur.fetchone()[0]}")
|
||
|
||
# Recommendations
|
||
for st in ['pending', 'approved', 'rejected', 'imported']:
|
||
cur.execute("SELECT COUNT(*) FROM admin.offer_recommendations WHERE status = %s", (st,))
|
||
cnt = cur.fetchone()[0]
|
||
if cnt > 0:
|
||
print(f"Offer recommendations {st}: {cnt}")
|
||
|
||
cur.close(); conn.close()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
cmd = sys.argv[1] if len(sys.argv) > 1 else 'status'
|
||
|
||
if cmd == 'init':
|
||
init_schema()
|
||
elif cmd == 'calculate':
|
||
calculate_performance()
|
||
elif cmd == 'scan':
|
||
scan_everflow_offers()
|
||
elif cmd == 'approve':
|
||
if len(sys.argv) < 3:
|
||
print("Usage: approve <recommendation_id>")
|
||
else:
|
||
approve_offer(int(sys.argv[2]))
|
||
elif cmd == 'translate':
|
||
if len(sys.argv) < 4:
|
||
print("Usage: translate <creative_id> <target_lang> [target_offer_id]")
|
||
else:
|
||
target_oid = int(sys.argv[4]) if len(sys.argv) > 4 else None
|
||
generate_translation(int(sys.argv[2]), sys.argv[3], target_oid)
|
||
elif cmd == 'best':
|
||
if len(sys.argv) < 3:
|
||
print("Usage: best <offer_id> [country]")
|
||
else:
|
||
country = sys.argv[3] if len(sys.argv) > 3 else 'WW'
|
||
best = get_best_creative(int(sys.argv[2]), country)
|
||
print(f"Best creative for offer #{sys.argv[2]} ({country}): #{best}")
|
||
elif cmd == 'status':
|
||
show_status()
|
||
else:
|
||
print("Commands: init | calculate | scan | approve <id> | translate <cr_id> <lang> | best <offer> [country] | status")
|
||
|