203 lines
8.7 KiB
PHP
203 lines
8.7 KiB
PHP
<?php
|
|
/**
|
|
* PIPELINE BACKGROUND FEEDERS
|
|
* Run via cron to continuously supply the pipeline
|
|
* Usage: php pipeline-feeder.php [feeder_name]
|
|
*
|
|
* Feeders:
|
|
* warmup - Advance warmup daily + reset counters
|
|
* supply - Auto-provision missing resources
|
|
* offers - Sync CX3 approved offers
|
|
* dns - Push DNS for unconfigured domains
|
|
* bounce - Process new bounces
|
|
* healing - Check/repair broken servers
|
|
* analytics - Aggregate daily analytics
|
|
* all - Run all feeders in sequence
|
|
*/
|
|
|
|
$feeder = $argv[1] ?? $_GET['feeder'] ?? 'status';
|
|
$base = 'http://127.0.0.1:5890/api';
|
|
$results = [];
|
|
|
|
function api_call($url, $timeout = 15) {
|
|
$ctx = stream_context_create(['http' => ['timeout' => $timeout, 'ignore_errors' => true]]);
|
|
$r = @file_get_contents($url, false, $ctx);
|
|
return $r ? json_decode($r, true) : null;
|
|
}
|
|
|
|
function log_feeder($name, $result) {
|
|
$ts = date('Y-m-d H:i:s');
|
|
$msg = is_string($result) ? $result : json_encode($result);
|
|
@file_put_contents('/var/log/pipeline-feeder.log', "[$ts] $name: $msg\n", FILE_APPEND);
|
|
echo " [$name] $msg\n";
|
|
}
|
|
|
|
switch($feeder) {
|
|
|
|
case 'warmup':
|
|
// Advance warmup day + reset sent counters
|
|
$r = api_call("$base/warmup-engine.php?action=advance_day");
|
|
log_feeder('warmup', "Advanced: ".($r['advanced']??0).", Graduated: ".($r['graduated']??0));
|
|
|
|
// Reset sent_today
|
|
$db = pg_connect("host=localhost dbname=adx_system user=admin password=admin123");
|
|
pg_query($db, "UPDATE admin.warmup_accounts SET sent_today=0 WHERE status='warming'");
|
|
log_feeder('warmup', 'sent_today reset');
|
|
break;
|
|
|
|
case 'supply':
|
|
// Check what's missing and auto-provision
|
|
$r = api_call("$base/supply-chain-factory.php?action=dashboard");
|
|
if($r && $r['status'] === 'success') {
|
|
$alerts = $r['alerts'] ?? [];
|
|
log_feeder('supply', "Alerts: " . count($alerts));
|
|
|
|
// If low on resources, auto-fill
|
|
if(count($alerts) > 0) {
|
|
$fill = api_call("$base/supply-chain-factory.php?action=auto_fill");
|
|
log_feeder('supply', "Auto-fill: " . json_encode($fill));
|
|
}
|
|
}
|
|
break;
|
|
|
|
case 'offers':
|
|
// Sync offers from CX3
|
|
$r = api_call("$base/affiliate-monitor.php?action=sync_cx3");
|
|
log_feeder('offers', $r ?: 'sync_attempted');
|
|
|
|
// Run offer quality scoring
|
|
$q = api_call("$base/offer-quality-guard.php?action=score_all");
|
|
log_feeder('offers', "Quality: " . json_encode($q));
|
|
break;
|
|
|
|
case 'dns':
|
|
// Find domains without DNS and auto-configure
|
|
$db = pg_connect("host=localhost dbname=adx_system user=admin password=admin123");
|
|
pg_query($db, "SET search_path TO admin,public");
|
|
|
|
// Count unconfigured domains
|
|
$r = pg_query($db, "SELECT COUNT(*) FROM domain_pool WHERE dns_configured IS NOT TRUE AND has_spf IS NOT TRUE");
|
|
$count = pg_fetch_result($r, 0, 0);
|
|
log_feeder('dns', "Unconfigured domains: $count");
|
|
|
|
if($count > 0) {
|
|
// Push DNS for first 5 unconfigured
|
|
$domains = pg_fetch_all(pg_query($db, "SELECT id, domain FROM domain_pool WHERE dns_configured IS NOT TRUE LIMIT 5"));
|
|
foreach(($domains ?: []) as $d) {
|
|
$push = api_call("$base/dns-push.php?action=generate&domain_id={$d['id']}");
|
|
log_feeder('dns', "Generated DNS for {$d['domain']}: " . ($push ? 'ok' : 'fail'));
|
|
}
|
|
}
|
|
break;
|
|
|
|
case 'bounce':
|
|
// Process new bounces from send log
|
|
$r = api_call("$base/bounce-processor.php?action=process");
|
|
log_feeder('bounce', $r ?: 'processed');
|
|
break;
|
|
|
|
case 'healing':
|
|
// Check server health and auto-repair
|
|
$r = api_call("$base/mta.php?action=status");
|
|
if($r) {
|
|
$inactive = ($r['total_servers'] ?? 0) - ($r['active'] ?? 0);
|
|
log_feeder('healing', "Servers: {$r['total_servers']}, Active: {$r['active']}, Inactive: $inactive");
|
|
|
|
if(($r['blacklisted'] ?? 0) > 0) {
|
|
// Trigger IP rotation for blacklisted servers
|
|
log_feeder('healing', "Blacklisted IPs found: {$r['blacklisted']}");
|
|
}
|
|
}
|
|
|
|
// Check reputation
|
|
$rep = api_call("$base/reputation.php?action=check");
|
|
log_feeder('healing', "Reputation: " . json_encode($rep));
|
|
break;
|
|
|
|
case 'analytics':
|
|
// Aggregate daily analytics
|
|
$db = pg_connect("host=localhost dbname=adx_system user=admin password=admin123");
|
|
pg_query($db, "SET search_path TO admin,public");
|
|
|
|
// Today's stats
|
|
$today = date('Y-m-d');
|
|
$sent = pg_fetch_result(pg_query($db, "SELECT COUNT(*) FROM unified_send_log WHERE created_at::date = '$today'"), 0, 0);
|
|
$ok = pg_fetch_result(pg_query($db, "SELECT COUNT(*) FROM unified_send_log WHERE status='sent' AND created_at::date = '$today'"), 0, 0);
|
|
$opens = pg_fetch_result(pg_query($db, "SELECT COUNT(*) FROM open_log WHERE opened_at::date = '$today'"), 0, 0);
|
|
$clicks = pg_fetch_result(pg_query($db, "SELECT COUNT(*) FROM click_log WHERE clicked_at::date = '$today'"), 0, 0);
|
|
$convs = pg_fetch_result(pg_query($db, "SELECT COUNT(*) FROM conversion_log WHERE converted_at::date = '$today'"), 0, 0);
|
|
$rev = pg_fetch_result(pg_query($db, "SELECT COALESCE(SUM(payout),0) FROM conversion_log WHERE converted_at::date = '$today'"), 0, 0);
|
|
|
|
// Update revenue table
|
|
pg_query($db, "INSERT INTO revenue (date, emails_sent, clicks, conversions, revenue)
|
|
VALUES ('$today', $sent, $clicks, $convs, $rev)
|
|
ON CONFLICT (date) DO UPDATE SET
|
|
emails_sent = $sent, clicks = $clicks, conversions = $convs, revenue = $rev");
|
|
|
|
log_feeder('analytics', "Sent:$sent OK:$ok Opens:$opens Clicks:$clicks Conv:$convs Rev:$$rev");
|
|
break;
|
|
|
|
case 'status':
|
|
// Show all feeders status
|
|
echo "PIPELINE FEEDERS STATUS\n";
|
|
echo "=======================\n";
|
|
|
|
$db = pg_connect("host=localhost dbname=adx_system user=admin password=admin123");
|
|
pg_query($db, "SET search_path TO admin,public");
|
|
|
|
$w = api_call("$base/warmup-engine.php?action=status");
|
|
echo "Warmup: {$w['warmup']['warming']} warming, day {$w['warmup']['avg_warmup_day']}, cap {$w['warmup']['warming_daily_capacity']}/j\n";
|
|
|
|
$m = api_call("$base/mta.php?action=status");
|
|
echo "MTA: {$m['total_servers']} servers, {$m['send_methods']} methods, {$m['send_accounts']} accounts\n";
|
|
|
|
$today = date('Y-m-d');
|
|
$sent = pg_fetch_result(pg_query($db, "SELECT COUNT(*) FROM unified_send_log WHERE created_at::date = '$today'"), 0, 0);
|
|
$ok = pg_fetch_result(pg_query($db, "SELECT COUNT(*) FROM unified_send_log WHERE status='sent' AND created_at::date = '$today'"), 0, 0);
|
|
echo "Today: $sent sent, $ok delivered\n";
|
|
|
|
$supp = pg_fetch_result(pg_query($db, "SELECT COUNT(*) FROM suppression_list"), 0, 0);
|
|
echo "Suppressed: $supp emails\n";
|
|
|
|
$learns = pg_fetch_result(pg_query($db, "SELECT COUNT(*) FROM brain_learning_log"), 0, 0);
|
|
echo "Brain learning entries: $learns\n";
|
|
|
|
echo json_encode(['status' => 'operational', 'feeders' => ['warmup','supply','offers','dns','bounce','healing','analytics','crm_sync','scraping','n8n_health']]);
|
|
break;
|
|
|
|
case 'all':
|
|
echo "=== RUNNING ALL FEEDERS ===\n";
|
|
foreach(['warmup', 'supply', 'offers', 'dns', 'bounce', 'healing', 'analytics', 'crm_sync', 'scraping', 'n8n_health'] as $f) {
|
|
echo "\n--- $f ---\n";
|
|
$output = shell_exec("php " . __FILE__ . " $f 2>&1");
|
|
echo $output;
|
|
}
|
|
break;
|
|
|
|
|
|
case 'crm_sync':
|
|
$r = api_call("$base/crm-api.php?action=sync_tracking");
|
|
log_feeder('crm_sync', $r ?: 'synced');
|
|
$s = api_call("$base/data-intelligence.php?action=auto_segment");
|
|
log_feeder('crm_sync', 'Segments: ' . json_encode($s));
|
|
break;
|
|
|
|
case 'scraping':
|
|
$db2 = pg_connect('host=localhost dbname=adx_system user=admin password=admin123');
|
|
pg_query($db2, 'SET search_path TO admin,public');
|
|
$r1 = @pg_affected_rows(@pg_query($db2, "INSERT INTO leads (email, source, segment) SELECT email, 'scraping', 'cold' FROM scrapping_results WHERE email IS NOT NULL AND email LIKE '%@%' AND email NOT IN (SELECT email FROM leads) ON CONFLICT DO NOTHING"));
|
|
$r2 = @pg_affected_rows(@pg_query($db2, "INSERT INTO leads (email, source, segment) SELECT email, 'dark_scout', 'cold' FROM dark_scout_results WHERE email IS NOT NULL AND email LIKE '%@%' AND email NOT IN (SELECT email FROM leads) ON CONFLICT DO NOTHING"));
|
|
log_feeder('scraping', "Bridged: scraping=$r1 dark=$r2");
|
|
break;
|
|
|
|
case 'n8n_health':
|
|
$health = @file_get_contents('http://127.0.0.1:5678/healthz');
|
|
log_feeder('n8n_health', 'status=' . ($health !== false ? 'up' : 'down'));
|
|
break;
|
|
|
|
default:
|
|
echo json_encode(['error' => "Unknown feeder: $feeder",
|
|
'available' => ['warmup','supply','offers','dns','bounce','healing','analytics','status','all']]);
|
|
}
|
|
|