Files
wevads-platform/scripts/sponsor-import-universal.py
2026-02-26 04:53:11 +01:00

736 lines
29 KiB
Python
Executable File

#!/usr/bin/env python3
"""
=============================================================================
UNIVERSAL SPONSOR IMPORT SYSTEM
=============================================================================
Supports: CAKE (CX3 Ads), Everflow (Double M), HitPath (future)
Target tables: affiliate_offers, offer_creatives, sponsor_config
=============================================================================
"""
import requests
import psycopg2
import psycopg2.extras
import json
import re
import sys
import xml.etree.ElementTree as ET
from datetime import datetime
from html import unescape
DB = {
'host': 'localhost',
'database': 'adx_system',
'user': 'admin',
'password': 'admin123'
}
# =====================================================================
# BASE SPONSOR ADAPTER
# =====================================================================
class SponsorAdapter:
"""Base class for all sponsor API adapters"""
def __init__(self, config):
self.config = config
self.name = config['sponsor_name']
self.api_url = config['api_url']
self.api_key = config['api_key']
self.affiliate_id = config.get('affiliate_id', '')
self.requires_unsub = config.get('requires_unsub_link', False)
self.network_id = config['id']
def fetch_offers(self):
"""Return list of normalized offer dicts"""
raise NotImplementedError
def fetch_creatives(self, offer_id, campaign_id=None):
"""Return list of normalized creative dicts for an offer"""
return []
def fetch_suppression(self, offer_id):
"""Return suppression download URL if available"""
return None
def _parse_country(self, name):
"""Extract country code from offer name patterns like 'DE | Offer' or 'FR - Offer'"""
patterns = [
r'^([A-Z]{2})\s*[\|\-\s]', # "DE | Offer" or "DE - Offer"
r'\[([A-Z]{2})\]', # "[DE]"
r'^\s*(INTL)\s*[\|\-\s]', # "INTL | Offer"
r'^\s*(AU/US)\s*[\|\-\s]', # "AU/US | Offer"
r'^\s*(UK)\s*[\|\-\s]', # "UK | Offer"
]
for p in patterns:
m = re.search(p, name)
if m:
code = m.group(1)
if code == 'INTL': return 'INTL'
if '/' in code: return code.split('/')[0]
return code
return 'US' # Default
def _parse_vertical(self, raw):
"""Normalize vertical names"""
mapping = {
'health/wellness': 'health',
'health': 'health',
'financial': 'finance',
'finance': 'finance',
'insurance': 'insurance',
'senior': 'senior',
'consumer goods': 'consumer',
'gaming': 'gaming',
'sweepstakes': 'sweepstakes',
'dating': 'dating',
'education': 'education',
'legal': 'legal',
'home services': 'home_services',
'auto': 'auto',
}
return mapping.get(raw.lower().strip(), raw.lower().strip() if raw else 'general')
# =====================================================================
# CAKE ADAPTER (CX3 Ads)
# =====================================================================
class CakeAdapter(SponsorAdapter):
"""CAKE API v1 (XML) - Used by CX3 Ads"""
NS = {'c': 'http://cakemarketing.com/affiliates/api/1/'}
def fetch_offers(self):
"""Fetch all offers from CAKE OfferFeed API"""
url = f"{self.api_url}/1/offers.asmx/OfferFeed"
params = {
'api_key': self.api_key,
'affiliate_id': self.affiliate_id,
'offer_status_id': 0, # All statuses
'media_type_category_id': 0,
'vertical_category_id': 0,
'tag_id': 0,
'start_at_row': 1,
'row_limit': 0, # 0 = all
'sort_field': 'offer_id',
'sort_descending': 'false'
}
print(f" [{self.name}] Fetching CAKE OfferFeed...")
try:
r = requests.get(url, params=params, timeout=60)
if r.status_code != 200:
print(f" [{self.name}] HTTP {r.status_code}")
return []
root = ET.fromstring(r.text)
xml_offers = root.findall('c:Offer', self.NS)
print(f" [{self.name}] Found {len(xml_offers)} offers in feed")
offers = []
for xo in xml_offers:
offer_id = xo.findtext('c:offer_id', '', self.NS)
campaign_id = xo.findtext('c:campaign_id', '', self.NS)
status_name = xo.findtext('c:status_name', '', self.NS)
offer_name = xo.findtext('c:offer_name', '', self.NS).strip()
payout_raw = xo.findtext('c:payout', '$0', self.NS)
preview_link = xo.findtext('c:preview_link', '', self.NS)
vertical = xo.findtext('c:vertical', '', self.NS)
description = xo.findtext('c:description', '', self.NS)
restrictions = xo.findtext('c:restrictions', '', self.NS)
# Parse payout
payout = 0.0
payout_type = 'cpa'
if '%' in payout_raw:
payout = float(re.sub(r'[^\d.]', '', payout_raw) or '0')
payout_type = 'revshare'
else:
payout = float(re.sub(r'[^\d.]', '', payout_raw) or '0')
# Determine approval status
has_campaign = bool(campaign_id and campaign_id.strip())
if has_campaign:
approval = 'approved'
elif status_name == 'Active':
approval = 'active'
elif status_name == 'Apply To Run':
approval = 'apply'
elif status_name == 'Public':
approval = 'public'
else:
approval = status_name.lower() if status_name else 'unknown'
country = self._parse_country(offer_name)
offers.append({
'offer_id': offer_id,
'network_id': self.network_id,
'offer_name': offer_name[:255],
'country_code': country,
'vertical': self._parse_vertical(vertical),
'payout': payout,
'payout_type': payout_type,
'status': status_name.lower() if status_name else 'unknown',
'approval_status': approval,
'preview_url': preview_link or '',
'tracking_url': '', # Will be set from creatives
'campaign_id': campaign_id.strip() if campaign_id else None,
'description': unescape(description) if description else '',
'restrictions': restrictions or '',
'network': self.name,
'category': self._parse_vertical(vertical),
'is_active': has_campaign,
'requires_unsub': self.requires_unsub,
})
return offers
except Exception as e:
print(f" [{self.name}] Error fetching offers: {e}")
return []
def fetch_creatives(self, offer_id, campaign_id=None):
"""Fetch creatives for an offer via GetCampaign API"""
if not campaign_id:
return []
url = f"{self.api_url}/1/offers.asmx/GetCampaign"
params = {
'api_key': self.api_key,
'affiliate_id': self.affiliate_id,
'campaign_id': campaign_id
}
try:
r = requests.get(url, params=params, timeout=30)
if r.status_code != 200:
return []
root = ET.fromstring(r.text)
# Get creatives_download_url
creatives_download = root.findtext('c:creatives_download_url', '', self.NS)
# Parse individual creatives
creatives = []
for xc in root.findall('.//c:Creative', self.NS):
cid = xc.findtext('c:creative_id', '', self.NS)
cname = xc.findtext('c:creative_name', '', self.NS)
ctype = xc.findtext('c:creative_type', '', self.NS)
unique_link = xc.findtext('c:unique_link', '', self.NS)
download_url = xc.findtext('c:creative_download_url', '', self.NS)
width = xc.findtext('c:width', '', self.NS)
height = xc.findtext('c:height', '', self.NS)
# Determine creative source type
if ctype == 'Email':
source = 'email_html'
elif ctype == 'Link':
source = 'link'
elif ctype == 'Image':
source = 'image'
else:
source = ctype.lower() if ctype else 'unknown'
# Parse creative name for subject line hints
# Format: "86600 - Unlock Savings Now 1.29.26"
subject_hint = ''
name_match = re.match(r'\d+\s*-\s*(?:Lite|Text|WTR|VDAY|FB/IG|Flash Sale)?\s*-?\s*(.+?)(?:\s+\d+\.\d+\.\d+)?$', cname)
if name_match:
subject_hint = name_match.group(1).strip()
creatives.append({
'creative_id': cid,
'creative_name': cname[:255] if cname else f'Creative {cid}',
'source': source,
'tracking_url': unique_link,
'download_url': download_url,
'subject_hint': subject_hint,
'width': width if width and width != '' else None,
'height': height if height and height != '' else None,
'creatives_zip_url': creatives_download,
})
return creatives
except Exception as e:
print(f" [{self.name}] Error fetching creatives for campaign {campaign_id}: {e}")
return []
# =====================================================================
# EVERFLOW ADAPTER (Double M)
# =====================================================================
class EverflowAdapter(SponsorAdapter):
"""Everflow API v1 - Used by Double M"""
def _headers(self):
return {'X-Eflow-API-Key': self.api_key}
def fetch_offers(self):
"""Fetch all offers from Everflow"""
url = f"{self.api_url}/v1/affiliates/alloffers"
print(f" [{self.name}] Fetching Everflow offers...")
try:
r = requests.get(url, headers=self._headers(), timeout=60)
if r.status_code != 200:
print(f" [{self.name}] HTTP {r.status_code}")
return []
data = r.json()
ef_offers = data.get('offers', [])
print(f" [{self.name}] Found {len(ef_offers)} offers")
offers = []
for o in ef_offers:
name = o.get('name', 'Unknown')
offer_status = o.get('offer_status', 'unknown')
rel_status = o.get('relationship', {}).get('offer_affiliate_status', 'unknown')
tracking_url = o.get('tracking_url', '')
preview_url = o.get('preview_url', '')
thumbnail = o.get('thumbnail_url', '')
description = o.get('html_description', '')
terms = o.get('terms_and_conditions', '')
# Map Everflow status to our schema
if rel_status == 'approved':
approval = 'approved'
is_active = True
elif rel_status == 'active':
approval = 'approved'
is_active = True
elif rel_status == 'require_approval':
approval = 'apply'
is_active = False
elif rel_status == 'pending':
approval = 'pending'
is_active = False
else:
approval = rel_status
is_active = False
country = self._parse_country(name)
offers.append({
'offer_id': str(o.get('network_offer_id', '')),
'network_id': self.network_id,
'offer_name': name[:255],
'country_code': country,
'vertical': 'sweepstakes', # Everflow doesn't always have vertical
'payout': 0, # Payout not in alloffers endpoint
'payout_type': 'cpa',
'status': offer_status,
'approval_status': approval,
'preview_url': preview_url or '',
'tracking_url': tracking_url or '',
'campaign_id': None,
'description': description or '',
'restrictions': terms or '',
'network': self.name,
'category': 'sweepstakes',
'is_active': is_active,
'requires_unsub': self.requires_unsub,
'thumbnail_url': thumbnail,
})
return offers
except Exception as e:
print(f" [{self.name}] Error: {e}")
return []
def fetch_creatives(self, offer_id, campaign_id=None):
"""Everflow: tracking URL IS the creative link for link-only offers"""
# Everflow offers come with tracking_url directly
# No separate creative feed in affiliate API
return []
# =====================================================================
# HITPATH ADAPTER (Future)
# =====================================================================
class HitPathAdapter(SponsorAdapter):
"""HitPath API - Prepared for future sponsors"""
def fetch_offers(self):
"""
HitPath API uses REST endpoints:
GET /api/affiliate/offers - List offers
GET /api/affiliate/offers/{id} - Offer detail
GET /api/affiliate/offers/{id}/creatives - Creatives
Auth: API key in header or query param depending on network config
"""
print(f" [{self.name}] HitPath adapter - fetching offers...")
# HitPath endpoints vary by implementation
# Common patterns:
# 1. REST: GET {api_url}/api/affiliate/offers?apikey={key}
# 2. XML: GET {api_url}/xml/affiliate/offers?key={key}
headers = {}
params = {}
# Try REST JSON first
try:
# Pattern 1: API key in header
headers = {'Authorization': f'Bearer {self.api_key}'}
url = f"{self.api_url}/api/affiliate/offers"
r = requests.get(url, headers=headers, params=params, timeout=60)
if r.status_code == 401:
# Pattern 2: API key as query param
params['apikey'] = self.api_key
headers = {}
r = requests.get(url, headers=headers, params=params, timeout=60)
if r.status_code != 200:
print(f" [{self.name}] HTTP {r.status_code} - HitPath may need custom config")
return []
data = r.json()
offers_raw = data if isinstance(data, list) else data.get('offers', data.get('data', []))
offers = []
for o in offers_raw:
# HitPath field mapping (varies by implementation)
oid = str(o.get('offer_id', o.get('id', '')))
name = o.get('name', o.get('offer_name', 'Unknown'))
offers.append({
'offer_id': oid,
'network_id': self.network_id,
'offer_name': name[:255],
'country_code': self._parse_country(name),
'vertical': self._parse_vertical(o.get('vertical', o.get('category', 'general'))),
'payout': float(o.get('payout', o.get('default_payout', 0)) or 0),
'payout_type': o.get('payout_type', 'cpa'),
'status': o.get('status', 'active').lower(),
'approval_status': o.get('approval_status', 'approved').lower(),
'preview_url': o.get('preview_url', o.get('preview_link', '')),
'tracking_url': o.get('tracking_url', o.get('click_url', '')),
'campaign_id': str(o.get('campaign_id', '')) or None,
'description': o.get('description', ''),
'restrictions': o.get('restrictions', o.get('terms', '')),
'network': self.name,
'category': self._parse_vertical(o.get('vertical', 'general')),
'is_active': o.get('status', '').lower() in ('active', 'live'),
'requires_unsub': self.requires_unsub,
})
print(f" [{self.name}] Imported {len(offers)} HitPath offers")
return offers
except Exception as e:
print(f" [{self.name}] HitPath error: {e}")
return []
def fetch_creatives(self, offer_id, campaign_id=None):
"""HitPath creative fetch"""
try:
headers = {'Authorization': f'Bearer {self.api_key}'}
url = f"{self.api_url}/api/affiliate/offers/{offer_id}/creatives"
r = requests.get(url, headers=headers, timeout=30)
if r.status_code != 200:
return []
data = r.json()
creatives_raw = data if isinstance(data, list) else data.get('creatives', data.get('data', []))
creatives = []
for c in creatives_raw:
creatives.append({
'creative_id': str(c.get('creative_id', c.get('id', ''))),
'creative_name': c.get('name', c.get('creative_name', ''))[:255],
'source': c.get('type', 'link').lower(),
'tracking_url': c.get('tracking_url', c.get('click_url', '')),
'download_url': c.get('download_url', ''),
'subject_hint': c.get('subject_line', ''),
'width': c.get('width'),
'height': c.get('height'),
})
return creatives
except:
return []
# =====================================================================
# ADAPTER FACTORY
# =====================================================================
ADAPTERS = {
'CAKE': CakeAdapter,
'cake': CakeAdapter,
'Everflow': EverflowAdapter,
'everflow': EverflowAdapter,
'HitPath': HitPathAdapter,
'hitpath': HitPathAdapter,
}
def get_adapter(config):
"""Get the right adapter based on api_type"""
api_type = config.get('api_type', '')
cls = ADAPTERS.get(api_type)
if not cls:
print(f" Unknown API type: {api_type}")
return None
return cls(config)
# =====================================================================
# DATABASE OPERATIONS
# =====================================================================
def upsert_offer(cur, offer):
"""Insert or update an offer in affiliate_offers"""
cur.execute("""
SELECT id FROM admin.affiliate_offers
WHERE offer_id = %s AND network_id = %s
""", (offer['offer_id'], offer['network_id']))
existing = cur.fetchone()
if existing:
cur.execute("""
UPDATE admin.affiliate_offers SET
offer_name = %s,
country_code = %s,
vertical = %s,
payout = %s,
payout_currency = %s,
status = %s,
approval_status = %s,
preview_url = %s,
tracking_url = CASE WHEN %s != '' THEN %s ELSE tracking_url END,
restrictions = %s,
network = %s,
category = %s,
is_active = %s,
updated_at = NOW()
WHERE offer_id = %s AND network_id = %s
RETURNING id
""", (
offer['offer_name'], offer['country_code'], offer['vertical'],
offer['payout'], offer.get('payout_type', 'cpa'),
offer['status'], offer['approval_status'],
offer['preview_url'],
offer['tracking_url'], offer['tracking_url'],
offer['restrictions'], offer['network'], offer['category'],
offer['is_active'],
offer['offer_id'], offer['network_id']
))
return existing[0], 'updated'
else:
cur.execute("""
INSERT INTO admin.affiliate_offers (
offer_id, network_id, offer_name, country_code, vertical,
payout, payout_currency, status, approval_status,
preview_url, tracking_url, restrictions, network, category,
is_active, created_at, updated_at
) VALUES (
%s, %s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, NOW(), NOW()
) RETURNING id
""", (
offer['offer_id'], offer['network_id'], offer['offer_name'],
offer['country_code'], offer['vertical'],
offer['payout'], offer.get('payout_type', 'cpa'),
offer['status'], offer['approval_status'],
offer['preview_url'], offer['tracking_url'],
offer['restrictions'], offer['network'], offer['category'],
offer['is_active']
))
return cur.fetchone()[0], 'inserted'
def upsert_creative(cur, offer_db_id, offer_id, creative, sponsor_name):
"""Insert or update a creative in offer_creatives"""
cur.execute("""
SELECT id FROM admin.offer_creatives
WHERE offer_id = %s AND creative_name = %s
""", (offer_db_id, creative['creative_name']))
existing = cur.fetchone()
tracking = creative.get('tracking_url', '')
subject = creative.get('subject_hint', '')
source = creative.get('source', 'link')
download = creative.get('download_url', '')
if existing:
cur.execute("""
UPDATE admin.offer_creatives SET
source = %s,
landing_url = %s,
s3_image_url = %s,
subject_line = CASE WHEN %s != '' THEN %s ELSE subject_line END,
status = 'active'
WHERE id = %s
""", (source, tracking, download, subject, subject, existing[0]))
return existing[0], 'updated'
else:
cur.execute("""
INSERT INTO admin.offer_creatives (
offer_id, creative_name, source, landing_url, s3_image_url,
subject_line, status, created_at
) VALUES (%s, %s, %s, %s, %s, %s, 'active', NOW())
RETURNING id
""", (
offer_db_id, creative['creative_name'], source,
tracking, download, subject
))
return cur.fetchone()[0], 'inserted'
def update_offer_tracking_from_creatives(cur, offer_db_id):
"""Set offer tracking_url from first Link creative if not set"""
cur.execute("""
UPDATE admin.affiliate_offers
SET tracking_url = (
SELECT landing_url FROM admin.offer_creatives
WHERE offer_id = %s AND source = 'link' AND landing_url IS NOT NULL AND landing_url != ''
LIMIT 1
)
WHERE id = %s AND (tracking_url IS NULL OR tracking_url = '')
""", (offer_db_id, offer_db_id))
# =====================================================================
# MAIN IMPORT FLOW
# =====================================================================
def run_import(sponsor_filter=None):
"""
Main import function.
sponsor_filter: None = all, or sponsor name like 'CX3 Ads', 'Double M'
"""
conn = psycopg2.connect(**DB)
conn.autocommit = False
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
print("=" * 70)
print("UNIVERSAL SPONSOR IMPORT SYSTEM")
print(f"Started: {datetime.now()}")
print("=" * 70)
# Load sponsor configs
query = "SELECT * FROM admin.sponsor_config WHERE status = 'active'"
if sponsor_filter:
query += f" AND sponsor_name = '{sponsor_filter}'"
cur.execute(query)
sponsors = cur.fetchall()
if not sponsors:
print("No active sponsors found!")
return
# Also need network IDs from affiliate_networks
cur.execute("SELECT id, name, api_url, api_key, affiliate_id FROM admin.affiliate_networks WHERE status = 'active'")
networks = {row['name']: dict(row) for row in cur.fetchall()}
total_stats = {'offers_inserted': 0, 'offers_updated': 0, 'creatives_inserted': 0, 'creatives_updated': 0, 'errors': 0}
for sp in sponsors:
sp_name = sp['sponsor_name']
print(f"\n{'' * 50}")
print(f"SPONSOR: {sp_name} ({sp['api_type']})")
print(f"{'' * 50}")
# Build config from sponsor_config + affiliate_networks
net = networks.get(sp_name, {})
config = {
'id': net.get('id', sp['id']),
'sponsor_name': sp_name,
'api_type': sp['api_type'],
'api_url': sp['api_url'],
'api_key': sp['api_key'],
'affiliate_id': sp['affiliate_id'] or net.get('affiliate_id', ''),
'requires_unsub_link': sp['requires_unsub_link'],
'tracking_domain': sp['tracking_domain'],
}
adapter = get_adapter(config)
if not adapter:
print(f" SKIP: No adapter for {sp['api_type']}")
continue
# ── STEP 1: Fetch all offers ──
offers = adapter.fetch_offers()
if not offers:
print(f" No offers returned")
continue
stats = {'inserted': 0, 'updated': 0, 'creatives': 0, 'creative_updates': 0}
for offer in offers:
try:
# ── STEP 2: Upsert offer ──
db_id, action = upsert_offer(cur, offer)
if action == 'inserted':
stats['inserted'] += 1
else:
stats['updated'] += 1
# ── STEP 3: Fetch & upsert creatives (for approved offers with campaign) ──
campaign_id = offer.get('campaign_id')
if campaign_id:
creatives = adapter.fetch_creatives(offer['offer_id'], campaign_id)
for creative in creatives:
try:
_, c_action = upsert_creative(cur, db_id, offer['offer_id'], creative, sp_name)
if c_action == 'inserted':
stats['creatives'] += 1
else:
stats['creative_updates'] += 1
except Exception as ce:
print(f" Creative error ({creative.get('creative_name','')}): {ce}")
total_stats['errors'] += 1
# Set tracking_url from Link creative if needed
update_offer_tracking_from_creatives(cur, db_id)
except Exception as e:
print(f" Error importing offer {offer.get('offer_id','?')}: {e}")
total_stats['errors'] += 1
conn.rollback()
continue
conn.commit()
# Update last_sync
cur.execute("UPDATE admin.affiliate_networks SET last_sync = NOW() WHERE name = %s", (sp_name,))
conn.commit()
total_stats['offers_inserted'] += stats['inserted']
total_stats['offers_updated'] += stats['updated']
total_stats['creatives_inserted'] += stats['creatives']
total_stats['creatives_updated'] += stats['creative_updates']
print(f" ✅ Offers: {stats['inserted']} new, {stats['updated']} updated")
print(f" ✅ Creatives: {stats['creatives']} new, {stats['creative_updates']} updated")
print(f"\n{'=' * 70}")
print("IMPORT COMPLETE")
print(f" Total offers inserted: {total_stats['offers_inserted']}")
print(f" Total offers updated: {total_stats['offers_updated']}")
print(f" Total creatives added: {total_stats['creatives_inserted']}")
print(f" Total creatives updated: {total_stats['creatives_updated']}")
print(f" Errors: {total_stats['errors']}")
print(f"{'=' * 70}")
cur.close()
conn.close()
# =====================================================================
# CLI
# =====================================================================
if __name__ == '__main__':
sponsor = None
if len(sys.argv) > 1:
sponsor = ' '.join(sys.argv[1:])
print(f"Filtering to sponsor: {sponsor}")
run_import(sponsor)