Files
wevads-platform/scripts/pipeline-feeder.php
2026-02-26 04:53:11 +01:00

203 lines
8.7 KiB
PHP

<?php
/**
* PIPELINE BACKGROUND FEEDERS
* Run via cron to continuously supply the pipeline
* Usage: php pipeline-feeder.php [feeder_name]
*
* Feeders:
* warmup - Advance warmup daily + reset counters
* supply - Auto-provision missing resources
* offers - Sync CX3 approved offers
* dns - Push DNS for unconfigured domains
* bounce - Process new bounces
* healing - Check/repair broken servers
* analytics - Aggregate daily analytics
* all - Run all feeders in sequence
*/
$feeder = $argv[1] ?? $_GET['feeder'] ?? 'status';
$base = 'http://127.0.0.1:5890/api';
$results = [];
function api_call($url, $timeout = 15) {
$ctx = stream_context_create(['http' => ['timeout' => $timeout, 'ignore_errors' => true]]);
$r = @file_get_contents($url, false, $ctx);
return $r ? json_decode($r, true) : null;
}
function log_feeder($name, $result) {
$ts = date('Y-m-d H:i:s');
$msg = is_string($result) ? $result : json_encode($result);
@file_put_contents('/var/log/pipeline-feeder.log', "[$ts] $name: $msg\n", FILE_APPEND);
echo " [$name] $msg\n";
}
switch($feeder) {
case 'warmup':
// Advance warmup day + reset sent counters
$r = api_call("$base/warmup-engine.php?action=advance_day");
log_feeder('warmup', "Advanced: ".($r['advanced']??0).", Graduated: ".($r['graduated']??0));
// Reset sent_today
$db = pg_connect("host=localhost dbname=adx_system user=admin password=admin123");
pg_query($db, "UPDATE admin.warmup_accounts SET sent_today=0 WHERE status='warming'");
log_feeder('warmup', 'sent_today reset');
break;
case 'supply':
// Check what's missing and auto-provision
$r = api_call("$base/supply-chain-factory.php?action=dashboard");
if($r && $r['status'] === 'success') {
$alerts = $r['alerts'] ?? [];
log_feeder('supply', "Alerts: " . count($alerts));
// If low on resources, auto-fill
if(count($alerts) > 0) {
$fill = api_call("$base/supply-chain-factory.php?action=auto_fill");
log_feeder('supply', "Auto-fill: " . json_encode($fill));
}
}
break;
case 'offers':
// Sync offers from CX3
$r = api_call("$base/affiliate-monitor.php?action=sync_cx3");
log_feeder('offers', $r ?: 'sync_attempted');
// Run offer quality scoring
$q = api_call("$base/offer-quality-guard.php?action=score_all");
log_feeder('offers', "Quality: " . json_encode($q));
break;
case 'dns':
// Find domains without DNS and auto-configure
$db = pg_connect("host=localhost dbname=adx_system user=admin password=admin123");
pg_query($db, "SET search_path TO admin,public");
// Count unconfigured domains
$r = pg_query($db, "SELECT COUNT(*) FROM domain_pool WHERE dns_configured IS NOT TRUE AND has_spf IS NOT TRUE");
$count = pg_fetch_result($r, 0, 0);
log_feeder('dns', "Unconfigured domains: $count");
if($count > 0) {
// Push DNS for first 5 unconfigured
$domains = pg_fetch_all(pg_query($db, "SELECT id, domain FROM domain_pool WHERE dns_configured IS NOT TRUE LIMIT 5"));
foreach(($domains ?: []) as $d) {
$push = api_call("$base/dns-push.php?action=generate&domain_id={$d['id']}");
log_feeder('dns', "Generated DNS for {$d['domain']}: " . ($push ? 'ok' : 'fail'));
}
}
break;
case 'bounce':
// Process new bounces from send log
$r = api_call("$base/bounce-processor.php?action=process");
log_feeder('bounce', $r ?: 'processed');
break;
case 'healing':
// Check server health and auto-repair
$r = api_call("$base/mta.php?action=status");
if($r) {
$inactive = ($r['total_servers'] ?? 0) - ($r['active'] ?? 0);
log_feeder('healing', "Servers: {$r['total_servers']}, Active: {$r['active']}, Inactive: $inactive");
if(($r['blacklisted'] ?? 0) > 0) {
// Trigger IP rotation for blacklisted servers
log_feeder('healing', "Blacklisted IPs found: {$r['blacklisted']}");
}
}
// Check reputation
$rep = api_call("$base/reputation.php?action=check");
log_feeder('healing', "Reputation: " . json_encode($rep));
break;
case 'analytics':
// Aggregate daily analytics
$db = pg_connect("host=localhost dbname=adx_system user=admin password=admin123");
pg_query($db, "SET search_path TO admin,public");
// Today's stats
$today = date('Y-m-d');
$sent = pg_fetch_result(pg_query($db, "SELECT COUNT(*) FROM unified_send_log WHERE created_at::date = '$today'"), 0, 0);
$ok = pg_fetch_result(pg_query($db, "SELECT COUNT(*) FROM unified_send_log WHERE status='sent' AND created_at::date = '$today'"), 0, 0);
$opens = pg_fetch_result(pg_query($db, "SELECT COUNT(*) FROM open_log WHERE opened_at::date = '$today'"), 0, 0);
$clicks = pg_fetch_result(pg_query($db, "SELECT COUNT(*) FROM click_log WHERE clicked_at::date = '$today'"), 0, 0);
$convs = pg_fetch_result(pg_query($db, "SELECT COUNT(*) FROM conversion_log WHERE converted_at::date = '$today'"), 0, 0);
$rev = pg_fetch_result(pg_query($db, "SELECT COALESCE(SUM(payout),0) FROM conversion_log WHERE converted_at::date = '$today'"), 0, 0);
// Update revenue table
pg_query($db, "INSERT INTO revenue (date, emails_sent, clicks, conversions, revenue)
VALUES ('$today', $sent, $clicks, $convs, $rev)
ON CONFLICT (date) DO UPDATE SET
emails_sent = $sent, clicks = $clicks, conversions = $convs, revenue = $rev");
log_feeder('analytics', "Sent:$sent OK:$ok Opens:$opens Clicks:$clicks Conv:$convs Rev:$$rev");
break;
case 'status':
// Show all feeders status
echo "PIPELINE FEEDERS STATUS\n";
echo "=======================\n";
$db = pg_connect("host=localhost dbname=adx_system user=admin password=admin123");
pg_query($db, "SET search_path TO admin,public");
$w = api_call("$base/warmup-engine.php?action=status");
echo "Warmup: {$w['warmup']['warming']} warming, day {$w['warmup']['avg_warmup_day']}, cap {$w['warmup']['warming_daily_capacity']}/j\n";
$m = api_call("$base/mta.php?action=status");
echo "MTA: {$m['total_servers']} servers, {$m['send_methods']} methods, {$m['send_accounts']} accounts\n";
$today = date('Y-m-d');
$sent = pg_fetch_result(pg_query($db, "SELECT COUNT(*) FROM unified_send_log WHERE created_at::date = '$today'"), 0, 0);
$ok = pg_fetch_result(pg_query($db, "SELECT COUNT(*) FROM unified_send_log WHERE status='sent' AND created_at::date = '$today'"), 0, 0);
echo "Today: $sent sent, $ok delivered\n";
$supp = pg_fetch_result(pg_query($db, "SELECT COUNT(*) FROM suppression_list"), 0, 0);
echo "Suppressed: $supp emails\n";
$learns = pg_fetch_result(pg_query($db, "SELECT COUNT(*) FROM brain_learning_log"), 0, 0);
echo "Brain learning entries: $learns\n";
echo json_encode(['status' => 'operational', 'feeders' => ['warmup','supply','offers','dns','bounce','healing','analytics','crm_sync','scraping','n8n_health']]);
break;
case 'all':
echo "=== RUNNING ALL FEEDERS ===\n";
foreach(['warmup', 'supply', 'offers', 'dns', 'bounce', 'healing', 'analytics', 'crm_sync', 'scraping', 'n8n_health'] as $f) {
echo "\n--- $f ---\n";
$output = shell_exec("php " . __FILE__ . " $f 2>&1");
echo $output;
}
break;
case 'crm_sync':
$r = api_call("$base/crm-api.php?action=sync_tracking");
log_feeder('crm_sync', $r ?: 'synced');
$s = api_call("$base/data-intelligence.php?action=auto_segment");
log_feeder('crm_sync', 'Segments: ' . json_encode($s));
break;
case 'scraping':
$db2 = pg_connect('host=localhost dbname=adx_system user=admin password=admin123');
pg_query($db2, 'SET search_path TO admin,public');
$r1 = @pg_affected_rows(@pg_query($db2, "INSERT INTO leads (email, source, segment) SELECT email, 'scraping', 'cold' FROM scrapping_results WHERE email IS NOT NULL AND email LIKE '%@%' AND email NOT IN (SELECT email FROM leads) ON CONFLICT DO NOTHING"));
$r2 = @pg_affected_rows(@pg_query($db2, "INSERT INTO leads (email, source, segment) SELECT email, 'dark_scout', 'cold' FROM dark_scout_results WHERE email IS NOT NULL AND email LIKE '%@%' AND email NOT IN (SELECT email FROM leads) ON CONFLICT DO NOTHING"));
log_feeder('scraping', "Bridged: scraping=$r1 dark=$r2");
break;
case 'n8n_health':
$health = @file_get_contents('http://127.0.0.1:5678/healthz');
log_feeder('n8n_health', 'status=' . ($health !== false ? 'up' : 'down'));
break;
default:
echo json_encode(['error' => "Unknown feeder: $feeder",
'available' => ['warmup','supply','offers','dns','bounce','healing','analytics','status','all']]);
}