diff --git a/scripts/upload_catalogue_images.py b/scripts/upload_catalogue_images.py new file mode 100755 index 0000000..145a06f --- /dev/null +++ b/scripts/upload_catalogue_images.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +import argparse +import getpass +import os +import re +import sys +from pathlib import Path + +import requests + + +TOKEN_URL = "/api/token/" +PRODUCTS_URL = "/don_confiao/api/products/" +CATALOGUE_IMAGES_URL = "/don_confiao/api/catalogue_images/" + + +def get_credentials(): + username = input("Usuario: ") + password = getpass.getpass("Contraseña: ") + return username, password + + +def get_token(domain, username, password): + url = domain.rstrip("/") + TOKEN_URL + response = requests.post(url, json={"username": username, "password": password}) + if response.status_code != 200: + print(f"Error al obtener token: {response.status_code} {response.text}", file=sys.stderr) + sys.exit(1) + data = response.json() + return data["access"] + + +def get_products(domain, token): + url = domain.rstrip("/") + PRODUCTS_URL + headers = {"Authorization": f"Bearer {token}"} + response = requests.get(url, headers=headers) + if response.status_code != 200: + print(f"Error al obtener productos: {response.status_code} {response.text}", file=sys.stderr) + sys.exit(1) + return response.json() + + +MIME_TYPES = { + ".jpg": "image/jpeg", + ".jpeg": "image/jpeg", + ".png": "image/png", +} + + +def find_images(image_dir): + images = {} + pattern = re.compile(r"^(\d+)\.(jpg|jpeg|png)$", re.IGNORECASE) + for f in os.listdir(image_dir): + m = pattern.match(f) + if m: + external_id = m.group(1) + images[external_id] = os.path.join(image_dir, f) + return images + + +def main(): + parser = argparse.ArgumentParser( + description="Sube imágenes de catálogo para productos usando el external_id como nombre de archivo." + ) + parser.add_argument("image_dir", help="Directorio con imágenes nombradas como ##.jpg") + parser.add_argument("domain", help="Dominio del backend (ej: http://localhost:8000)") + args = parser.parse_args() + + if not os.path.isdir(args.image_dir): + print(f"Error: el directorio '{args.image_dir}' no existe.", file=sys.stderr) + sys.exit(1) + + username, password = get_credentials() + token = get_token(args.domain, username, password) + print("Token obtenido correctamente.") + + products = get_products(args.domain, token) + ext_id_to_product_id = {} + for p in products: + if p.get("external_id"): + ext_id_to_product_id[p["external_id"]] = p["id"] + + if not ext_id_to_product_id: + print("No se encontraron productos con external_id.", file=sys.stderr) + sys.exit(1) + + images = find_images(args.image_dir) + if not images: + print(f"No se encontraron imágenes con el patrón ##.jpg en '{args.image_dir}'.", file=sys.stderr) + sys.exit(1) + + headers = {"Authorization": f"Bearer {token}"} + upload_url = args.domain.rstrip("/") + CATALOGUE_IMAGES_URL + uploaded = 0 + skipped = 0 + errors = 0 + + for external_id, img_path in sorted(images.items()): + if external_id not in ext_id_to_product_id: + print(f" [SKIP] {Path(img_path).name}: no hay producto con external_id={external_id}") + skipped += 1 + continue + + product_id = ext_id_to_product_id[external_id] + filename = Path(img_path).name + + ext = Path(img_path).suffix.lower() + mime = MIME_TYPES.get(ext, "application/octet-stream") + + with open(img_path, "rb") as f: + files = {"image": (filename, f, mime)} + data = {"product": product_id} + response = requests.post(upload_url, headers=headers, files=files, data=data) + + if response.status_code == 201: + print(f" [OK] {filename} -> producto {product_id} (id imagen: {response.json()['id']})") + uploaded += 1 + else: + print(f" [ERR] {filename} -> producto {product_id}: {response.status_code} {response.text}") + errors += 1 + + print(f"\nResumen: {uploaded} subidas, {skipped} saltadas, {errors} errores") + + +if __name__ == "__main__": + main()