De dos millones de referencias dispersas a un CRM unificado
En Cymit Química gestionamos un catálogo de más de dos millones de referencias procedentes de decenas de proveedores nacionales e internacionales. La alternativa a automatizar esa gestión era un equipo dedicado a la actualización manual del catálogo: un equipo que nunca habría podido competir en velocidad con los ciclos de actualización de precios y disponibilidad que el mercado de distribución química exige. El sistema de crawling y scraping que diseñé e implementé no fue un proyecto de IT: fue una decisión de negocio que convirtió ese volumen de datos en una ventaja competitiva real frente a distribuidoras que no tenían esa automatización.
Esta guía recoge la arquitectura técnica completa de ese sistema, adaptada como referencia para cualquier proyecto que necesite alimentar el CRM o el catálogo de Odoo con datos extraídos de fuentes externas a escala.
Arquitectura general: de la fuente al CRM
Un pipeline de scraping industrial tiene cuatro capas bien diferenciadas. Mezclarlas en un único script es el error más frecuente y el que hace que los proyectos de scraping mueran a los pocos meses:
- Extracción: obtener el HTML o los datos en bruto de la fuente.
- Transformación y normalización: limpiar, estructurar y unificar el formato de los datos.
- Deduplicación y validación: asegurar que no se insertan registros repetidos ni datos incorrectos.
- Carga en Odoo: escribir en el CRM vía API con garantías de idempotencia.
Cada capa tiene sus propias herramientas, sus propios modos de fallo y sus propias estrategias de escalado. A continuación detallo cada una.
Capa 1: extracción — elegir la herramienta correcta
No existe una herramienta universal para scraping. La elección depende de lo que hace la fuente:
- Requests + BeautifulSoup / lxml: para páginas HTML estáticas o APIs REST no documentadas que devuelven JSON. La opción más ligera y más rápida. El 60-70 % de los casos de un distribuidor B2B se resuelven aquí.
- Scrapy: cuando el volumen es alto (miles o millones de URLs) y necesitas control fino sobre concurrencia, middlewares, pipelines y reintentos. Scrapy es un framework, no una librería: tiene curva de aprendizaje, pero la diferencia en mantenibilidad a largo plazo lo justifica.
- Playwright (o Selenium): para páginas que requieren JavaScript para renderizar el contenido. Playwright es la opción moderna: más rápido que Selenium, mejor API asíncrona, soporte de Chromium/Firefox/WebKit y modo headless nativo.
En el proyecto de Cymit usamos los tres en paralelo, asignados por tipo de fuente. El orquestador de tareas decidía qué motor usar para cada proveedor.
Ejemplo: extracción con Requests y lxml para un catálogo HTML estático
import requests
from lxml import html
import time
import random
SESSION_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
),
"Accept-Language": "es-ES,es;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
}
def fetch_product_page(url: str, session: requests.Session) -> dict | None:
"""Extrae datos de una página de producto individual."""
try:
response = session.get(url, headers=SESSION_HEADERS, timeout=15)
response.raise_for_status()
except requests.RequestException as exc:
print(f"[ERROR] {url}: {exc}")
return None
tree = html.fromstring(response.content)
# XPath adaptado al sitio objetivo
name = tree.xpath("//h1[@class='product-title']/text()")
ref = tree.xpath("//span[@data-field='product_ref']/text()")
price = tree.xpath("//span[@class='price-value']/text()")
availability = tree.xpath("//div[@class='stock-info']//text()")
return {
"url": url,
"name": name[0].strip() if name else None,
"ref": ref[0].strip() if ref else None,
"price_raw": price[0].strip() if price else None,
"availability": " ".join(availability).strip() if availability else None,
}
def crawl_catalog(base_url: str, page_count: int) -> list[dict]:
"""Recorre las páginas de listado y extrae URLs de producto."""
session = requests.Session()
product_data = []
for page in range(1, page_count + 1):
url = f"{base_url}?page={page}"
resp = session.get(url, headers=SESSION_HEADERS, timeout=15)
tree = html.fromstring(resp.content)
links = tree.xpath("//a[@class='product-link']/@href")
for link in links:
full_url = f"https://proveedor.example.com{link}"
data = fetch_product_page(full_url, session)
if data:
product_data.append(data)
# Rate limiting: pausa aleatoria entre páginas
time.sleep(random.uniform(1.5, 3.5))
return product_data
Scrapy para volumen: spider de catálogo distribuidor
Cuando el catálogo supera las decenas de miles de páginas, la gestión manual de la concurrencia, los reintentos y las colas se vuelve inmanejable. Scrapy resuelve esto con su arquitectura de pipelines y middlewares:
import scrapy
from scrapy.http import Response
class CatalogSpider(scrapy.Spider):
name = "catalog_spider"
allowed_domains = ["proveedor.example.com"]
start_urls = ["https://proveedor.example.com/catalogo/"]
custom_settings = {
"CONCURRENT_REQUESTS": 4, # peticiones simultáneas
"DOWNLOAD_DELAY": 2, # delay mínimo entre peticiones
"RANDOMIZE_DOWNLOAD_DELAY": True, # jitter
"AUTOTHROTTLE_ENABLED": True, # ajuste automático de velocidad
"AUTOTHROTTLE_TARGET_CONCURRENCY": 2.0,
"RETRY_TIMES": 3,
"RETRY_HTTP_CODES": [429, 500, 502, 503, 504],
"ITEM_PIPELINES": {
"catalog_scraper.pipelines.NormalizePipeline": 100,
"catalog_scraper.pipelines.DedupPipeline": 200,
"catalog_scraper.pipelines.OdooIngestPipeline": 300,
},
"ROTATING_PROXY_LIST_PATH": "/etc/scraper/proxies.txt",
}
def parse(self, response: Response):
# Página de listado: seguir links de producto
for href in response.css("a.product-link::attr(href)").getall():
yield response.follow(href, callback=self.parse_product)
# Paginación
next_page = response.css("a.pagination-next::attr(href)").get()
if next_page:
yield response.follow(next_page, callback=self.parse)
def parse_product(self, response: Response):
yield {
"source_url": response.url,
"name": response.css("h1.product-title::text").get("").strip(),
"ref_supplier": response.css("[data-field=product_ref]::text").get("").strip(),
"price_raw": response.css(".price-value::text").get("").strip(),
"description": response.css(".product-description").get(""),
"availability": response.css(".stock-info::text").get("").strip(),
}
Playwright para páginas con JavaScript
Algunos portales de proveedores renderizan el precio y la disponibilidad vía JS. Requests no ve esos datos; Playwright sí:
import asyncio
from playwright.async_api import async_playwright
async def scrape_dynamic_page(url: str) -> dict:
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
context = await browser.new_context(
user_agent=(
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"Chrome/124.0.0.0 Safari/537.36"
)
)
page = await context.new_page()
# Interceptar peticiones de API internas del portal
api_responses = []
async def handle_response(response):
if "/api/product" in response.url:
try:
data = await response.json()
api_responses.append(data)
except Exception:
pass
page.on("response", handle_response)
await page.goto(url, wait_until="networkidle", timeout=30_000)
# Esperar a que el precio esté visible en el DOM
await page.wait_for_selector(".price-value", timeout=10_000)
name = await page.text_content("h1.product-title")
price = await page.text_content(".price-value")
await browser.close()
return {
"name": name.strip() if name else None,
"price_raw": price.strip() if price else None,
"api_data": api_responses[0] if api_responses else None,
}
Gestión de escala y anti-bloqueo
Un scraper que funciona con cien peticiones falla con un millón. La diferencia entre un sistema de scraping artesanal y uno industrial está en cómo gestiona los rechazos, los rate limits y la detección.
Rate limiting y jitter
La regla de oro es no parecer un robot. Los robots hacen peticiones a intervalos exactos; los humanos no. Añadir un componente aleatorio al delay entre peticiones (jitter) reduce drásticamente la tasa de detección. En Scrapy, RANDOMIZE_DOWNLOAD_DELAY = True lo hace automáticamente. En scripts con Requests, time.sleep(random.uniform(1.5, 4.0)) entre peticiones es el mínimo.
Rotación de proxies y User-Agents
Para volúmenes de millones de peticiones, una sola IP será bloqueada inevitablemente. La solución es un pool de proxies residenciales o datacenter rotativos. En Scrapy, scrapy-rotating-proxies gestiona el pool automáticamente, marcando los proxies bloqueados y redistribuyendo el tráfico entre los activos:
# settings.py de Scrapy
DOWNLOADER_MIDDLEWARES = {
"rotating_proxies.middlewares.RotatingProxyMiddleware": 610,
"rotating_proxies.middlewares.BanDetectionMiddleware": 620,
}
ROTATING_PROXY_LIST_PATH = "/etc/scraper/proxies.txt"
ROTATING_PROXY_PAGE_RETRY_TIMES = 5
# proxies.txt: una por línea
# http://user:pass@proxy1.example.com:8080
# http://user:pass@proxy2.example.com:8080
La rotación de User-Agents complementa la de proxies. El middleware scrapy-fake-useragent extrae listas actualizadas de User-Agents reales de navegadores para cada petición.
Colas y workers distribuidos con Celery + Redis
Para un catálogo de dos millones de referencias que se actualiza periódicamente, un único proceso secuencial tarda días. La solución es distribuir el trabajo en workers paralelos con una cola de tareas. Celery con Redis como broker es el estándar de facto en Python:
from celery import Celery
app = Celery(
"scraper",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
task_acks_late=True, # no marcar como completada hasta que se procese
worker_prefetch_multiplier=1, # un task por worker a la vez
task_routes={
"scraper.tasks.scrape_product": {"queue": "scraping"},
"scraper.tasks.ingest_odoo": {"queue": "odoo_ingest"},
},
)
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def scrape_product(self, url: str, supplier_id: int) -> dict:
try:
data = fetch_product_page(url, requests.Session())
if data:
ingest_odoo.delay(data, supplier_id)
return data
except Exception as exc:
raise self.retry(exc=exc)
Con 8 workers dedicados al scraping, el sistema de Cymit procesaba decenas de miles de referencias por hora en las actualizaciones masivas periódicas.
Capa 2: normalización de datos
Los datos extraídos de distintos proveedores llegan en formatos heterogéneos: precios con símbolo de moneda y coma decimal, disponibilidad como «En stock» / «Out of stock» / «Disponible en 3-5 días» / «S/D», referencias con o sin prefijo de proveedor. El pipeline de normalización debe convertir todo esto a un esquema unificado antes de tocar la base de datos de Odoo.
import re
from decimal import Decimal, InvalidOperation
AVAILABILITY_MAP = {
"en stock": "available",
"disponible": "available",
"out of stock": "out_of_stock",
"no disponible": "out_of_stock",
"agotado": "out_of_stock",
"s/d": "out_of_stock",
# Textos con plazo de entrega
r"disponible en \d+-\d+ días": "on_demand",
r"\d+ semanas": "on_demand",
}
def normalize_price(raw: str) -> Decimal | None:
"""Convierte '12,50 €', '$12.50', '12.50EUR' a Decimal."""
if not raw:
return None
cleaned = re.sub(r"[^\d.,]", "", raw).strip()
# Normalizar separadores europeos (1.234,56 -> 1234.56)
if "," in cleaned and "." in cleaned:
cleaned = cleaned.replace(".", "").replace(",", ".")
elif "," in cleaned:
cleaned = cleaned.replace(",", ".")
try:
return Decimal(cleaned)
except InvalidOperation:
return None
def normalize_availability(raw: str) -> str:
"""Normaliza textos de disponibilidad a valores de enum."""
normalized = raw.lower().strip()
for pattern, status in AVAILABILITY_MAP.items():
if re.search(pattern, normalized):
return status
return "unknown"
def normalize_product(raw: dict, supplier_code: str) -> dict:
"""Aplica todas las normalizaciones y genera el external_id canónico."""
ref = (raw.get("ref_supplier") or "").strip().upper()
return {
"external_id": f"{supplier_code}_{ref}", # clave de idempotencia
"name": (raw.get("name") or "").strip(),
"default_code": ref,
"list_price": normalize_price(raw.get("price_raw", "")),
"availability": normalize_availability(raw.get("availability", "")),
"description": (raw.get("description") or "").strip(),
"source_url": raw.get("source_url"),
}
Capa 3: deduplicación
Con dos millones de referencias y múltiples proveedores, la misma referencia puede aparecer en distintas fuentes bajo nombres ligeramente distintos. La estrategia de deduplicación tiene dos niveles:
- Dedup exacto por
external_id: el identificador canónico{supplier_code}_{ref}garantiza que la misma referencia del mismo proveedor nunca se inserta dos veces. - Dedup fuzzy entre proveedores: para detectar la misma referencia vendida por dos distribuidores distintos bajo nombres diferentes, se usan técnicas de comparación de strings como
rapidfuzzsobre el nombre normalizado y el CAS number cuando está disponible.
from rapidfuzz import fuzz
def find_duplicate_candidate(
product: dict,
existing_refs: list[dict],
threshold: int = 90
) -> dict | None:
"""Busca duplicado fuzzy entre referencias ya existentes."""
name = product["name"].lower()
for existing in existing_refs:
score = fuzz.token_sort_ratio(name, existing["name"].lower())
if score >= threshold:
return existing
return None
Capa 4: ingesta en Odoo vía API
Odoo expone dos interfaces de API: XML-RPC (clásica, disponible desde Odoo 6) y JSON-RPC (más moderna, mismas capacidades). Ambas permiten autenticación, búsqueda, creación y actualización de registros de cualquier modelo del ERP. Para ingesta masiva desde un scraper externo, JSON-RPC con create_or_write via external_id es la combinación correcta.
Conexión y autenticación
import xmlrpc.client
class OdooClient:
"""Cliente XML-RPC para Odoo con soporte de external_id e ingesta masiva."""
def __init__(self, url: str, db: str, username: str, password: str):
self.url = url
self.db = db
self.uid = None
self._models = xmlrpc.client.ServerProxy(f"{url}/xmlrpc/2/object")
self._common = xmlrpc.client.ServerProxy(f"{url}/xmlrpc/2/common")
self.password = password
self._authenticate(username)
def _authenticate(self, username: str) -> None:
self.uid = self._common.authenticate(
self.db, username, self.password, {}
)
if not self.uid:
raise ValueError(f"Autenticación fallida para usuario {username}")
print(f"[Odoo] Autenticado como UID {self.uid}")
def execute(self, model: str, method: str, *args, **kwargs):
return self._models.execute_kw(
self.db, self.uid, self.password,
model, method, list(args), kwargs
)
def search_read(self, model: str, domain: list, fields: list) -> list:
return self.execute(model, "search_read", domain, fields=fields)
def upsert_by_external_id(
self, model: str, external_id: str, values: dict
) -> int:
"""
Crea el registro si no existe; actualiza si ya existe.
external_id es la clave de idempotencia (formato: module.xml_id).
"""
# Buscar si ya existe el external_id
existing = self.execute(
"ir.model.data", "search_read",
[["name", "=", external_id], ["model", "=", model]],
fields=["res_id"],
)
if existing:
record_id = existing[0]["res_id"]
self.execute(model, "write", [record_id], values)
return record_id
else:
record_id = self.execute(model, "create", values)
# Registrar el external_id
self.execute("ir.model.data", "create", {
"name": external_id,
"model": model,
"res_id": record_id,
"module": "scraper_import",
"noupdate": False,
})
return record_id
Ingesta de leads y productos a escala
Para la ingesta masiva, la operación individual de upsert_by_external_id por cada referencia es demasiado lenta. Odoo permite create en batch pasando una lista de dicts. Combinado con el patrón de carga en chunks, el rendimiento mejora drásticamente:
def ingest_products_bulk(
client: OdooClient,
products: list[dict],
chunk_size: int = 100
) -> dict:
"""Ingesta en batch con upsert por external_id. Retorna stats."""
created = updated = errors = 0
# Pre-cargar external_ids existentes en memoria para reducir round-trips
ext_ids_in_odoo = {
row["name"]: row["res_id"]
for row in client.execute(
"ir.model.data", "search_read",
[["module", "=", "scraper_import"], ["model", "=", "product.template"]],
fields=["name", "res_id"],
)
}
for i in range(0, len(products), chunk_size):
chunk = products[i:i + chunk_size]
to_create = []
to_update = {} # {record_id: values}
for p in chunk:
ext_id = p.pop("external_id")
values = {
"name": p["name"],
"default_code": p["default_code"],
"list_price": float(p["list_price"]) if p["list_price"] else 0.0,
"description_sale": p.get("description", ""),
"type": "product",
}
if ext_id in ext_ids_in_odoo:
to_update[ext_ids_in_odoo[ext_id]] = values
else:
to_create.append((ext_id, values))
# Crear los nuevos
if to_create:
ids = client.execute(
"product.template", "create",
[v for _, v in to_create]
)
# Registrar external_ids
for (ext_id, _), record_id in zip(to_create, ids):
client.execute("ir.model.data", "create", {
"name": ext_id, "model": "product.template",
"res_id": record_id, "module": "scraper_import",
})
created += len(to_create)
# Actualizar los existentes
for record_id, values in to_update.items():
client.execute("product.template", "write", [record_id], values)
updated += len(to_update)
print(f"[Ingest] Chunk {i // chunk_size + 1}: +{len(to_create)} created, ~{len(to_update)} updated")
return {"created": created, "updated": updated, "errors": errors}
Ingesta de leads en crm.lead
Para alimentar el CRM directamente con leads extraídos de directorios, formularios o agregadores, el modelo objetivo es crm.lead. El mismo patrón de external_id garantiza idempotencia: el mismo lead detectado en dos ejecuciones distintas del scraper no se duplica:
def ingest_lead(client: OdooClient, lead_data: dict) -> int:
"""Crea o actualiza un lead en el CRM de Odoo."""
ext_id = lead_data["external_id"] # ej: "directorio_empresa_123"
values = {
"name": lead_data["company_name"],
"partner_name": lead_data.get("contact_name", ""),
"email_from": lead_data.get("email", ""),
"phone": lead_data.get("phone", ""),
"website": lead_data.get("website", ""),
"street": lead_data.get("address", ""),
"city": lead_data.get("city", ""),
"country_id": 69, # ID de España en Odoo (res.country)
"type": "lead",
"source_id": lead_data.get("odoo_source_id"), # utm.source
"description": lead_data.get("raw_description", ""),
}
return client.upsert_by_external_id("crm.lead", ext_id, values)
Aspectos legales y éticos del scraping
Ninguna guía técnica de scraping está completa sin este apartado, especialmente en el contexto europeo. Estos son los puntos que evalúo en cada proyecto antes de escribir una sola línea de código de extracción:
- Términos de uso de la fuente: muchos sitios prohíben explícitamente el scraping automatizado en sus ToS. No respetar esto expone a responsabilidad civil y posibles acciones legales.
- Datos personales y RGPD: si los datos extraídos incluyen información de personas físicas (nombres, emails, teléfonos), aplica el RGPD. Necesitas base legal para el tratamiento y debes garantizar los derechos ARCO. Esto no es negociable en el mercado español.
- Robots.txt: aunque no tiene fuerza legal directa en España, ignorar
robots.txtpuede agravar la posición en un litigio y es una señal de mala fe. Respetarlo es la práctica ética mínima. - Contenido con copyright: copiar y almacenar textos completos de productos puede implicar infracción de derechos de autor. Extraer datos estructurados (precio, disponibilidad, referencia) está en una zona gris más favorable que copiar descripciones literales.
- Carga sobre el servidor objetivo: un scraper sin rate limiting puede causar una degradación del servicio en el sitio objetivo, lo que en casos extremos puede calificarse como DoS negligente.
En Cymit, la extracción se realizaba de catálogos de proveedores con los que teníamos acuerdos comerciales que incluían acceso a la información de producto. Esa relación contractual previa es la forma más segura de operar a escala.
Orquestación y monitorización del pipeline
Un pipeline de scraping en producción necesita observabilidad. Las métricas mínimas que monitorizamos en Cymit incluían: tasa de éxito por proveedor, número de referencias actualizadas por ejecución, tiempo de ejecución por crawler, y alertas cuando un proveedor cambiaba su estructura de HTML (lo que rompía los selectores). Con alertas en Telegram vía un bot Python sencillo, el equipo recibía notificación en menos de cinco minutos cuando cualquier crawler dejaba de funcionar, sin tener que revisar logs manualmente.
Conclusión
Un sistema de scraping masivo bien diseñado no es un conjunto de scripts: es una arquitectura de datos con capas claramente separadas, garantías de idempotencia, gestión de escala y observabilidad. La diferencia entre un script que funciona el primer día y un pipeline que sigue funcionando dos años después es exactamente esa: la arquitectura. En Cymit, este sistema convirtió dos millones de referencias de un problema operativo en una ventaja competitiva que contribuyó al exit al Grupo PALEX. El mismo planteamiento es aplicable a cualquier proyecto que necesite alimentar Odoo con datos del mundo exterior a escala.