Files
wevads-platform/scripts/multi_sender.py
2026-02-26 04:53:11 +01:00

120 lines
4.3 KiB
Python

#!/usr/bin/env python3
"""
WEVADS Multi-Method Sender
Methods: GSUITE_RELAY, DIRECT_MX, O365, BCG_LOCAL
"""
import smtplib, socket, json, sys, email.utils, uuid, os
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# Force IPv4
orig = socket.getaddrinfo
def ipv4(*a, **k): return [r for r in orig(*a, **k) if r[0] == socket.AF_INET]
socket.getaddrinfo = ipv4
def send_direct_mx(to_email, from_email, from_name, subject, html_body, domain):
"""Send directly to Gmail MX servers"""
# Determine MX host based on recipient
rcpt_domain = to_email.split("@")[1].lower()
mx_hosts = {
"gmail.com": "gmail-smtp-in.l.google.com",
"googlemail.com": "gmail-smtp-in.l.google.com",
"hotmail.com": "hotmail-com.olc.protection.outlook.com",
"outlook.com": "hotmail-com.olc.protection.outlook.com",
"live.com": "hotmail-com.olc.protection.outlook.com",
}
mx = mx_hosts.get(rcpt_domain)
if not mx:
# DNS MX lookup
import subprocess
r = subprocess.run(["dig","+short","MX",rcpt_domain], capture_output=True, text=True, timeout=5)
lines = [l.strip() for l in r.stdout.strip().split("\n") if l.strip()]
if lines:
mx = lines[0].split()[-1].rstrip(".")
if not mx:
return False, "No MX found"
msg = MIMEMultipart("alternative")
msg["From"] = f"{from_name} <{from_email}>"
msg["To"] = to_email
msg["Subject"] = subject
msg["Message-ID"] = email.utils.make_msgid(domain=domain)
msg["Date"] = email.utils.formatdate(localtime=True)
msg["List-Unsubscribe"] = f"<mailto:unsub@{domain}>"
msg["List-Unsubscribe-Post"] = "List-Unsubscribe=One-Click"
msg.attach(MIMEText(html_body, "html"))
try:
s = smtplib.SMTP(mx, 25, timeout=15)
s.ehlo(domain)
try: s.starttls(); s.ehlo(domain)
except: pass
s.sendmail(from_email, to_email, msg.as_string())
s.quit()
return True, "OK_DIRECT_MX"
except Exception as e:
return False, str(e)[:100]
def send_gsuite_relay(to_email, from_email, from_name, subject, html_body, domain):
"""Send via GSuite SMTP Relay (IP auth)"""
msg = MIMEMultipart("alternative")
msg["From"] = f"{from_name} <{from_email}>"
msg["To"] = to_email
msg["Subject"] = subject
msg["Message-ID"] = email.utils.make_msgid(domain=domain)
msg["Date"] = email.utils.formatdate(localtime=True)
msg.attach(MIMEText(html_body, "html"))
try:
s = smtplib.SMTP("smtp-relay.gmail.com", 587, timeout=15)
s.starttls()
# Try IP-only first, then auth if available
app_pwd = os.environ.get("GSUITE_APP_PWD","")
if app_pwd:
s.login(from_email, app_pwd)
s.sendmail(from_email, to_email, msg.as_string())
s.quit()
return True, "OK_GSUITE_RELAY"
except Exception as e:
return False, str(e)[:100]
def send_bcg(to_email, from_email, from_name, subject, html_body, domain):
"""Send via bcg_local.py (DKIM + Mailchimp headers)"""
import tempfile
jf = tempfile.mktemp(suffix=".json")
with open(jf,"w") as f:
json.dump({"to":to_email,"from_email":from_email,"from_name":from_name,"subject":subject,"body":html_body,"domain":domain},f)
import subprocess
r = subprocess.run(["python3","/opt/wevads/scripts/bcg_local.py",jf], capture_output=True, text=True, timeout=30)
os.unlink(jf)
if "OK" in r.stdout:
return True, "OK_BCG"
return False, r.stdout[:100]
# Main
if __name__ == "__main__":
data = json.load(open(sys.argv[1]))
method = data.get("method","auto")
to = data["to"]
from_e = data["from_email"]
from_n = data.get("from_name", from_e)
subj = data["subject"]
body = data["body"]
domain = data.get("domain","culturellemejean.charity")
methods_order = {
"auto": [send_direct_mx, send_gsuite_relay, send_bcg],
"direct_mx": [send_direct_mx],
"gsuite": [send_gsuite_relay],
"bcg": [send_bcg],
}
for fn in methods_order.get(method, methods_order["auto"]):
ok, msg = fn(to, from_e, from_n, subj, body, domain)
if ok:
print(f"OK|{fn.__name__}|{msg}")
sys.exit(0)
print(f"FAIL|all_methods|{msg}")
sys.exit(1)