feat: add standalone script to upload catalogue images via API

This commit is contained in:
mono
2026-06-14 00:39:33 -05:00
parent 539629076f
commit ac22adb558

View File

@@ -0,0 +1,126 @@
#!/usr/bin/env python3
import argparse
import getpass
import os
import re
import sys
from pathlib import Path
import requests
TOKEN_URL = "/api/token/"
PRODUCTS_URL = "/don_confiao/api/products/"
CATALOGUE_IMAGES_URL = "/don_confiao/api/catalogue_images/"
def get_credentials():
username = input("Usuario: ")
password = getpass.getpass("Contraseña: ")
return username, password
def get_token(domain, username, password):
url = domain.rstrip("/") + TOKEN_URL
response = requests.post(url, json={"username": username, "password": password})
if response.status_code != 200:
print(f"Error al obtener token: {response.status_code} {response.text}", file=sys.stderr)
sys.exit(1)
data = response.json()
return data["access"]
def get_products(domain, token):
url = domain.rstrip("/") + PRODUCTS_URL
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers)
if response.status_code != 200:
print(f"Error al obtener productos: {response.status_code} {response.text}", file=sys.stderr)
sys.exit(1)
return response.json()
MIME_TYPES = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
}
def find_images(image_dir):
images = {}
pattern = re.compile(r"^(\d+)\.(jpg|jpeg|png)$", re.IGNORECASE)
for f in os.listdir(image_dir):
m = pattern.match(f)
if m:
external_id = m.group(1)
images[external_id] = os.path.join(image_dir, f)
return images
def main():
parser = argparse.ArgumentParser(
description="Sube imágenes de catálogo para productos usando el external_id como nombre de archivo."
)
parser.add_argument("image_dir", help="Directorio con imágenes nombradas como ##.jpg")
parser.add_argument("domain", help="Dominio del backend (ej: http://localhost:8000)")
args = parser.parse_args()
if not os.path.isdir(args.image_dir):
print(f"Error: el directorio '{args.image_dir}' no existe.", file=sys.stderr)
sys.exit(1)
username, password = get_credentials()
token = get_token(args.domain, username, password)
print("Token obtenido correctamente.")
products = get_products(args.domain, token)
ext_id_to_product_id = {}
for p in products:
if p.get("external_id"):
ext_id_to_product_id[p["external_id"]] = p["id"]
if not ext_id_to_product_id:
print("No se encontraron productos con external_id.", file=sys.stderr)
sys.exit(1)
images = find_images(args.image_dir)
if not images:
print(f"No se encontraron imágenes con el patrón ##.jpg en '{args.image_dir}'.", file=sys.stderr)
sys.exit(1)
headers = {"Authorization": f"Bearer {token}"}
upload_url = args.domain.rstrip("/") + CATALOGUE_IMAGES_URL
uploaded = 0
skipped = 0
errors = 0
for external_id, img_path in sorted(images.items()):
if external_id not in ext_id_to_product_id:
print(f" [SKIP] {Path(img_path).name}: no hay producto con external_id={external_id}")
skipped += 1
continue
product_id = ext_id_to_product_id[external_id]
filename = Path(img_path).name
ext = Path(img_path).suffix.lower()
mime = MIME_TYPES.get(ext, "application/octet-stream")
with open(img_path, "rb") as f:
files = {"image": (filename, f, mime)}
data = {"product": product_id}
response = requests.post(upload_url, headers=headers, files=files, data=data)
if response.status_code == 201:
print(f" [OK] {filename} -> producto {product_id} (id imagen: {response.json()['id']})")
uploaded += 1
else:
print(f" [ERR] {filename} -> producto {product_id}: {response.status_code} {response.text}")
errors += 1
print(f"\nResumen: {uploaded} subidas, {skipped} saltadas, {errors} errores")
if __name__ == "__main__":
main()