Files
html/generated/file_03.py
2026-04-12 22:57:03 +02:00

58 lines
2.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# api.py FastAPI CRUD + JWT auth + stock move auto
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from jose import jwt
from passlib.hash import pbkdf2_sha256
from datetime import datetime as dt
from pydantic import BaseModel
from config import settings
from models import *
app = FastAPI(title=settings.PROJECT_NAME, version="1.0.0")
# ---------- auth ----------
class Token(BaseModel): access_token: str; token_type: str
def authenticate(email: str, pw: str, session: Session):
user = session.exec(select(User).where(User.email == email)).first()
if user and pbkdf2_sha256.verify(pw, user.hashed_pw): return user
@app.post("/token", response_model=Token)
def login(form: OAuth2PasswordRequestForm = Depends(), s: Session = Depends(get_session)):
user = authenticate(form.username, form.password, s)
if not user: raise HTTPException(400, "Bad credentials")
token = jwt.encode({"sub": user.email}, settings.JWT_SECRET, algorithm="HS256")
return Token(access_token=token, token_type="bearer")
# ---------- products ----------
class ProductOut(Product): pass
class ProductCreate(BaseModel): sku: str; name: str; price: Decimal; qty_in_stock: int = 0
@app.post("/products", response_model=ProductOut)
def create_product(p: ProductCreate, _=Depends(get_admin), s: Session = Depends(get_session)):
db = Product(**p.dict())
s.add(db); s.commit(); s.refresh(db); return db
@app.get("/products", response_model=List[ProductOut])
def list_products(s: Session = Depends(get_session)): return s.exec(select(Product)).all()
# ---------- sales ----------
class SaleCreate(BaseModel): customer_id: int; lines: List[dict] # {"product_id":1,"qty":2}
@app.post("/sales")
def create_sale(req: SaleCreate, s: Session = Depends(get_session)):
inv = SaleInvoice(customer_id=req.customer_id, total=0)
s.add(inv); s.flush(); total = Decimal(0)
for l in req.lines:
prod = s.get(Product, l["product_id"])
if prod.qty_in_stock < l["qty"]: raise HTTPException(400, "Stock insuffisant")
prod.qty_in_stock -= l["qty"]
line_total = Decimal(l["qty"]) * prod.price
total += line_total
s.add(SaleLine(invoice_id=inv.id, product_id=prod.id, qty=l["qty"], unit_price=prod.price))
inv.total = total
s.commit()
return {"invoice_id": inv.id, "total": total}
# ---------- utils ----------
def get_admin(token: str = Depends(oauth2_scheme), s: Session = Depends(get_session)):
try: payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
except: raise HTTPException(status.HTTP_401_UNAUTHORIZED)
user = s.exec(select(User).where(User.email == payload["sub"])).first()
if not user or not user.is_admin: raise HTTPException(403, "Admin required")
return user