Web scraping masivo en Python para el CRM de Odoo (+2M referencias)

Arquitectura real de un sistema de extracción, normalización e ingesta masiva de datos en Odoo CRM: de las fuentes externas al lead cualificado, con más de dos millones de referencias gestionadas en producción

De dos millones de referencias dispersas a un CRM unificado

En Cymit Química gestionamos un catálogo de más de dos millones de referencias procedentes de decenas de proveedores nacionales e internacionales. La alternativa a automatizar esa gestión era un equipo dedicado a la actualización manual del catálogo: un equipo que nunca habría podido competir en velocidad con los ciclos de actualización de precios y disponibilidad que el mercado de distribución química exige. El sistema de crawling y scraping que diseñé e implementé no fue un proyecto de IT: fue una decisión de negocio que convirtió ese volumen de datos en una ventaja competitiva real frente a distribuidoras que no tenían esa automatización.

Esta guía recoge la arquitectura técnica completa de ese sistema, adaptada como referencia para cualquier proyecto que necesite alimentar el CRM o el catálogo de Odoo con datos extraídos de fuentes externas a escala.

Arquitectura general: de la fuente al CRM

Un pipeline de scraping industrial tiene cuatro capas bien diferenciadas. Mezclarlas en un único script es el error más frecuente y el que hace que los proyectos de scraping mueran a los pocos meses:

  1. Extracción: obtener el HTML o los datos en bruto de la fuente.
  2. Transformación y normalización: limpiar, estructurar y unificar el formato de los datos.
  3. Deduplicación y validación: asegurar que no se insertan registros repetidos ni datos incorrectos.
  4. Carga en Odoo: escribir en el CRM vía API con garantías de idempotencia.

Cada capa tiene sus propias herramientas, sus propios modos de fallo y sus propias estrategias de escalado. A continuación detallo cada una.

Capa 1: extracción — elegir la herramienta correcta

No existe una herramienta universal para scraping. La elección depende de lo que hace la fuente:

  • Requests + BeautifulSoup / lxml: para páginas HTML estáticas o APIs REST no documentadas que devuelven JSON. La opción más ligera y más rápida. El 60-70 % de los casos de un distribuidor B2B se resuelven aquí.
  • Scrapy: cuando el volumen es alto (miles o millones de URLs) y necesitas control fino sobre concurrencia, middlewares, pipelines y reintentos. Scrapy es un framework, no una librería: tiene curva de aprendizaje, pero la diferencia en mantenibilidad a largo plazo lo justifica.
  • Playwright (o Selenium): para páginas que requieren JavaScript para renderizar el contenido. Playwright es la opción moderna: más rápido que Selenium, mejor API asíncrona, soporte de Chromium/Firefox/WebKit y modo headless nativo.

En el proyecto de Cymit usamos los tres en paralelo, asignados por tipo de fuente. El orquestador de tareas decidía qué motor usar para cada proveedor.

Ejemplo: extracción con Requests y lxml para un catálogo HTML estático

import requests
from lxml import html
import time
import random

SESSION_HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
        "(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
    ),
    "Accept-Language": "es-ES,es;q=0.9",
    "Accept-Encoding": "gzip, deflate, br",
}


def fetch_product_page(url: str, session: requests.Session) -> dict | None:
    """Extrae datos de una página de producto individual."""
    try:
        response = session.get(url, headers=SESSION_HEADERS, timeout=15)
        response.raise_for_status()
    except requests.RequestException as exc:
        print(f"[ERROR] {url}: {exc}")
        return None

    tree = html.fromstring(response.content)

    # XPath adaptado al sitio objetivo
    name = tree.xpath("//h1[@class='product-title']/text()")
    ref = tree.xpath("//span[@data-field='product_ref']/text()")
    price = tree.xpath("//span[@class='price-value']/text()")
    availability = tree.xpath("//div[@class='stock-info']//text()")

    return {
        "url": url,
        "name": name[0].strip() if name else None,
        "ref": ref[0].strip() if ref else None,
        "price_raw": price[0].strip() if price else None,
        "availability": " ".join(availability).strip() if availability else None,
    }


def crawl_catalog(base_url: str, page_count: int) -> list[dict]:
    """Recorre las páginas de listado y extrae URLs de producto."""
    session = requests.Session()
    product_data = []

    for page in range(1, page_count + 1):
        url = f"{base_url}?page={page}"
        resp = session.get(url, headers=SESSION_HEADERS, timeout=15)
        tree = html.fromstring(resp.content)
        links = tree.xpath("//a[@class='product-link']/@href")

        for link in links:
            full_url = f"https://proveedor.example.com{link}"
            data = fetch_product_page(full_url, session)
            if data:
                product_data.append(data)

        # Rate limiting: pausa aleatoria entre páginas
        time.sleep(random.uniform(1.5, 3.5))

    return product_data

Scrapy para volumen: spider de catálogo distribuidor

Cuando el catálogo supera las decenas de miles de páginas, la gestión manual de la concurrencia, los reintentos y las colas se vuelve inmanejable. Scrapy resuelve esto con su arquitectura de pipelines y middlewares:

import scrapy
from scrapy.http import Response


class CatalogSpider(scrapy.Spider):
    name = "catalog_spider"
    allowed_domains = ["proveedor.example.com"]
    start_urls = ["https://proveedor.example.com/catalogo/"]

    custom_settings = {
        "CONCURRENT_REQUESTS": 4,          # peticiones simultáneas
        "DOWNLOAD_DELAY": 2,               # delay mínimo entre peticiones
        "RANDOMIZE_DOWNLOAD_DELAY": True,  # jitter
        "AUTOTHROTTLE_ENABLED": True,      # ajuste automático de velocidad
        "AUTOTHROTTLE_TARGET_CONCURRENCY": 2.0,
        "RETRY_TIMES": 3,
        "RETRY_HTTP_CODES": [429, 500, 502, 503, 504],
        "ITEM_PIPELINES": {
            "catalog_scraper.pipelines.NormalizePipeline": 100,
            "catalog_scraper.pipelines.DedupPipeline": 200,
            "catalog_scraper.pipelines.OdooIngestPipeline": 300,
        },
        "ROTATING_PROXY_LIST_PATH": "/etc/scraper/proxies.txt",
    }

    def parse(self, response: Response):
        # Página de listado: seguir links de producto
        for href in response.css("a.product-link::attr(href)").getall():
            yield response.follow(href, callback=self.parse_product)

        # Paginación
        next_page = response.css("a.pagination-next::attr(href)").get()
        if next_page:
            yield response.follow(next_page, callback=self.parse)

    def parse_product(self, response: Response):
        yield {
            "source_url": response.url,
            "name": response.css("h1.product-title::text").get("").strip(),
            "ref_supplier": response.css("[data-field=product_ref]::text").get("").strip(),
            "price_raw": response.css(".price-value::text").get("").strip(),
            "description": response.css(".product-description").get(""),
            "availability": response.css(".stock-info::text").get("").strip(),
        }

Playwright para páginas con JavaScript

Algunos portales de proveedores renderizan el precio y la disponibilidad vía JS. Requests no ve esos datos; Playwright sí:

import asyncio
from playwright.async_api import async_playwright


async def scrape_dynamic_page(url: str) -> dict:
    async with async_playwright() as pw:
        browser = await pw.chromium.launch(headless=True)
        context = await browser.new_context(
            user_agent=(
                "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
                "Chrome/124.0.0.0 Safari/537.36"
            )
        )
        page = await context.new_page()

        # Interceptar peticiones de API internas del portal
        api_responses = []

        async def handle_response(response):
            if "/api/product" in response.url:
                try:
                    data = await response.json()
                    api_responses.append(data)
                except Exception:
                    pass

        page.on("response", handle_response)

        await page.goto(url, wait_until="networkidle", timeout=30_000)

        # Esperar a que el precio esté visible en el DOM
        await page.wait_for_selector(".price-value", timeout=10_000)

        name = await page.text_content("h1.product-title")
        price = await page.text_content(".price-value")

        await browser.close()

        return {
            "name": name.strip() if name else None,
            "price_raw": price.strip() if price else None,
            "api_data": api_responses[0] if api_responses else None,
        }

Gestión de escala y anti-bloqueo

Un scraper que funciona con cien peticiones falla con un millón. La diferencia entre un sistema de scraping artesanal y uno industrial está en cómo gestiona los rechazos, los rate limits y la detección.

Rate limiting y jitter

La regla de oro es no parecer un robot. Los robots hacen peticiones a intervalos exactos; los humanos no. Añadir un componente aleatorio al delay entre peticiones (jitter) reduce drásticamente la tasa de detección. En Scrapy, RANDOMIZE_DOWNLOAD_DELAY = True lo hace automáticamente. En scripts con Requests, time.sleep(random.uniform(1.5, 4.0)) entre peticiones es el mínimo.

Rotación de proxies y User-Agents

Para volúmenes de millones de peticiones, una sola IP será bloqueada inevitablemente. La solución es un pool de proxies residenciales o datacenter rotativos. En Scrapy, scrapy-rotating-proxies gestiona el pool automáticamente, marcando los proxies bloqueados y redistribuyendo el tráfico entre los activos:

# settings.py de Scrapy
DOWNLOADER_MIDDLEWARES = {
    "rotating_proxies.middlewares.RotatingProxyMiddleware": 610,
    "rotating_proxies.middlewares.BanDetectionMiddleware": 620,
}
ROTATING_PROXY_LIST_PATH = "/etc/scraper/proxies.txt"
ROTATING_PROXY_PAGE_RETRY_TIMES = 5

# proxies.txt: una por línea
# http://user:pass@proxy1.example.com:8080
# http://user:pass@proxy2.example.com:8080

La rotación de User-Agents complementa la de proxies. El middleware scrapy-fake-useragent extrae listas actualizadas de User-Agents reales de navegadores para cada petición.

Colas y workers distribuidos con Celery + Redis

Para un catálogo de dos millones de referencias que se actualiza periódicamente, un único proceso secuencial tarda días. La solución es distribuir el trabajo en workers paralelos con una cola de tareas. Celery con Redis como broker es el estándar de facto en Python:

from celery import Celery

app = Celery(
    "scraper",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_acks_late=True,          # no marcar como completada hasta que se procese
    worker_prefetch_multiplier=1, # un task por worker a la vez
    task_routes={
        "scraper.tasks.scrape_product": {"queue": "scraping"},
        "scraper.tasks.ingest_odoo": {"queue": "odoo_ingest"},
    },
)


@app.task(bind=True, max_retries=3, default_retry_delay=60)
def scrape_product(self, url: str, supplier_id: int) -> dict:
    try:
        data = fetch_product_page(url, requests.Session())
        if data:
            ingest_odoo.delay(data, supplier_id)
        return data
    except Exception as exc:
        raise self.retry(exc=exc)

Con 8 workers dedicados al scraping, el sistema de Cymit procesaba decenas de miles de referencias por hora en las actualizaciones masivas periódicas.

Capa 2: normalización de datos

Los datos extraídos de distintos proveedores llegan en formatos heterogéneos: precios con símbolo de moneda y coma decimal, disponibilidad como «En stock» / «Out of stock» / «Disponible en 3-5 días» / «S/D», referencias con o sin prefijo de proveedor. El pipeline de normalización debe convertir todo esto a un esquema unificado antes de tocar la base de datos de Odoo.

import re
from decimal import Decimal, InvalidOperation


AVAILABILITY_MAP = {
    "en stock": "available",
    "disponible": "available",
    "out of stock": "out_of_stock",
    "no disponible": "out_of_stock",
    "agotado": "out_of_stock",
    "s/d": "out_of_stock",
    # Textos con plazo de entrega
    r"disponible en \d+-\d+ días": "on_demand",
    r"\d+ semanas": "on_demand",
}


def normalize_price(raw: str) -> Decimal | None:
    """Convierte '12,50 €', '$12.50', '12.50EUR' a Decimal."""
    if not raw:
        return None
    cleaned = re.sub(r"[^\d.,]", "", raw).strip()
    # Normalizar separadores europeos (1.234,56 -> 1234.56)
    if "," in cleaned and "." in cleaned:
        cleaned = cleaned.replace(".", "").replace(",", ".")
    elif "," in cleaned:
        cleaned = cleaned.replace(",", ".")
    try:
        return Decimal(cleaned)
    except InvalidOperation:
        return None


def normalize_availability(raw: str) -> str:
    """Normaliza textos de disponibilidad a valores de enum."""
    normalized = raw.lower().strip()
    for pattern, status in AVAILABILITY_MAP.items():
        if re.search(pattern, normalized):
            return status
    return "unknown"


def normalize_product(raw: dict, supplier_code: str) -> dict:
    """Aplica todas las normalizaciones y genera el external_id canónico."""
    ref = (raw.get("ref_supplier") or "").strip().upper()
    return {
        "external_id": f"{supplier_code}_{ref}",  # clave de idempotencia
        "name": (raw.get("name") or "").strip(),
        "default_code": ref,
        "list_price": normalize_price(raw.get("price_raw", "")),
        "availability": normalize_availability(raw.get("availability", "")),
        "description": (raw.get("description") or "").strip(),
        "source_url": raw.get("source_url"),
    }

Capa 3: deduplicación

Con dos millones de referencias y múltiples proveedores, la misma referencia puede aparecer en distintas fuentes bajo nombres ligeramente distintos. La estrategia de deduplicación tiene dos niveles:

  • Dedup exacto por external_id: el identificador canónico {supplier_code}_{ref} garantiza que la misma referencia del mismo proveedor nunca se inserta dos veces.
  • Dedup fuzzy entre proveedores: para detectar la misma referencia vendida por dos distribuidores distintos bajo nombres diferentes, se usan técnicas de comparación de strings como rapidfuzz sobre el nombre normalizado y el CAS number cuando está disponible.
from rapidfuzz import fuzz


def find_duplicate_candidate(
    product: dict,
    existing_refs: list[dict],
    threshold: int = 90
) -> dict | None:
    """Busca duplicado fuzzy entre referencias ya existentes."""
    name = product["name"].lower()
    for existing in existing_refs:
        score = fuzz.token_sort_ratio(name, existing["name"].lower())
        if score >= threshold:
            return existing
    return None

Capa 4: ingesta en Odoo vía API

Odoo expone dos interfaces de API: XML-RPC (clásica, disponible desde Odoo 6) y JSON-RPC (más moderna, mismas capacidades). Ambas permiten autenticación, búsqueda, creación y actualización de registros de cualquier modelo del ERP. Para ingesta masiva desde un scraper externo, JSON-RPC con create_or_write via external_id es la combinación correcta.

Conexión y autenticación

import xmlrpc.client


class OdooClient:
    """Cliente XML-RPC para Odoo con soporte de external_id e ingesta masiva."""

    def __init__(self, url: str, db: str, username: str, password: str):
        self.url = url
        self.db = db
        self.uid = None
        self._models = xmlrpc.client.ServerProxy(f"{url}/xmlrpc/2/object")
        self._common = xmlrpc.client.ServerProxy(f"{url}/xmlrpc/2/common")
        self.password = password
        self._authenticate(username)

    def _authenticate(self, username: str) -> None:
        self.uid = self._common.authenticate(
            self.db, username, self.password, {}
        )
        if not self.uid:
            raise ValueError(f"Autenticación fallida para usuario {username}")
        print(f"[Odoo] Autenticado como UID {self.uid}")

    def execute(self, model: str, method: str, *args, **kwargs):
        return self._models.execute_kw(
            self.db, self.uid, self.password,
            model, method, list(args), kwargs
        )

    def search_read(self, model: str, domain: list, fields: list) -> list:
        return self.execute(model, "search_read", domain, fields=fields)

    def upsert_by_external_id(
        self, model: str, external_id: str, values: dict
    ) -> int:
        """
        Crea el registro si no existe; actualiza si ya existe.
        external_id es la clave de idempotencia (formato: module.xml_id).
        """
        # Buscar si ya existe el external_id
        existing = self.execute(
            "ir.model.data", "search_read",
            [["name", "=", external_id], ["model", "=", model]],
            fields=["res_id"],
        )
        if existing:
            record_id = existing[0]["res_id"]
            self.execute(model, "write", [record_id], values)
            return record_id
        else:
            record_id = self.execute(model, "create", values)
            # Registrar el external_id
            self.execute("ir.model.data", "create", {
                "name": external_id,
                "model": model,
                "res_id": record_id,
                "module": "scraper_import",
                "noupdate": False,
            })
            return record_id

Ingesta de leads y productos a escala

Para la ingesta masiva, la operación individual de upsert_by_external_id por cada referencia es demasiado lenta. Odoo permite create en batch pasando una lista de dicts. Combinado con el patrón de carga en chunks, el rendimiento mejora drásticamente:

def ingest_products_bulk(
    client: OdooClient,
    products: list[dict],
    chunk_size: int = 100
) -> dict:
    """Ingesta en batch con upsert por external_id. Retorna stats."""
    created = updated = errors = 0

    # Pre-cargar external_ids existentes en memoria para reducir round-trips
    ext_ids_in_odoo = {
        row["name"]: row["res_id"]
        for row in client.execute(
            "ir.model.data", "search_read",
            [["module", "=", "scraper_import"], ["model", "=", "product.template"]],
            fields=["name", "res_id"],
        )
    }

    for i in range(0, len(products), chunk_size):
        chunk = products[i:i + chunk_size]
        to_create = []
        to_update = {}  # {record_id: values}

        for p in chunk:
            ext_id = p.pop("external_id")
            values = {
                "name": p["name"],
                "default_code": p["default_code"],
                "list_price": float(p["list_price"]) if p["list_price"] else 0.0,
                "description_sale": p.get("description", ""),
                "type": "product",
            }
            if ext_id in ext_ids_in_odoo:
                to_update[ext_ids_in_odoo[ext_id]] = values
            else:
                to_create.append((ext_id, values))

        # Crear los nuevos
        if to_create:
            ids = client.execute(
                "product.template", "create",
                [v for _, v in to_create]
            )
            # Registrar external_ids
            for (ext_id, _), record_id in zip(to_create, ids):
                client.execute("ir.model.data", "create", {
                    "name": ext_id, "model": "product.template",
                    "res_id": record_id, "module": "scraper_import",
                })
            created += len(to_create)

        # Actualizar los existentes
        for record_id, values in to_update.items():
            client.execute("product.template", "write", [record_id], values)
        updated += len(to_update)

        print(f"[Ingest] Chunk {i // chunk_size + 1}: +{len(to_create)} created, ~{len(to_update)} updated")

    return {"created": created, "updated": updated, "errors": errors}

Ingesta de leads en crm.lead

Para alimentar el CRM directamente con leads extraídos de directorios, formularios o agregadores, el modelo objetivo es crm.lead. El mismo patrón de external_id garantiza idempotencia: el mismo lead detectado en dos ejecuciones distintas del scraper no se duplica:

def ingest_lead(client: OdooClient, lead_data: dict) -> int:
    """Crea o actualiza un lead en el CRM de Odoo."""
    ext_id = lead_data["external_id"]  # ej: "directorio_empresa_123"
    values = {
        "name": lead_data["company_name"],
        "partner_name": lead_data.get("contact_name", ""),
        "email_from": lead_data.get("email", ""),
        "phone": lead_data.get("phone", ""),
        "website": lead_data.get("website", ""),
        "street": lead_data.get("address", ""),
        "city": lead_data.get("city", ""),
        "country_id": 69,  # ID de España en Odoo (res.country)
        "type": "lead",
        "source_id": lead_data.get("odoo_source_id"),  # utm.source
        "description": lead_data.get("raw_description", ""),
    }
    return client.upsert_by_external_id("crm.lead", ext_id, values)

Aspectos legales y éticos del scraping

Ninguna guía técnica de scraping está completa sin este apartado, especialmente en el contexto europeo. Estos son los puntos que evalúo en cada proyecto antes de escribir una sola línea de código de extracción:

  • Términos de uso de la fuente: muchos sitios prohíben explícitamente el scraping automatizado en sus ToS. No respetar esto expone a responsabilidad civil y posibles acciones legales.
  • Datos personales y RGPD: si los datos extraídos incluyen información de personas físicas (nombres, emails, teléfonos), aplica el RGPD. Necesitas base legal para el tratamiento y debes garantizar los derechos ARCO. Esto no es negociable en el mercado español.
  • Robots.txt: aunque no tiene fuerza legal directa en España, ignorar robots.txt puede agravar la posición en un litigio y es una señal de mala fe. Respetarlo es la práctica ética mínima.
  • Contenido con copyright: copiar y almacenar textos completos de productos puede implicar infracción de derechos de autor. Extraer datos estructurados (precio, disponibilidad, referencia) está en una zona gris más favorable que copiar descripciones literales.
  • Carga sobre el servidor objetivo: un scraper sin rate limiting puede causar una degradación del servicio en el sitio objetivo, lo que en casos extremos puede calificarse como DoS negligente.

En Cymit, la extracción se realizaba de catálogos de proveedores con los que teníamos acuerdos comerciales que incluían acceso a la información de producto. Esa relación contractual previa es la forma más segura de operar a escala.

Orquestación y monitorización del pipeline

Un pipeline de scraping en producción necesita observabilidad. Las métricas mínimas que monitorizamos en Cymit incluían: tasa de éxito por proveedor, número de referencias actualizadas por ejecución, tiempo de ejecución por crawler, y alertas cuando un proveedor cambiaba su estructura de HTML (lo que rompía los selectores). Con alertas en Telegram vía un bot Python sencillo, el equipo recibía notificación en menos de cinco minutos cuando cualquier crawler dejaba de funcionar, sin tener que revisar logs manualmente.

Conclusión

Un sistema de scraping masivo bien diseñado no es un conjunto de scripts: es una arquitectura de datos con capas claramente separadas, garantías de idempotencia, gestión de escala y observabilidad. La diferencia entre un script que funciona el primer día y un pipeline que sigue funcionando dos años después es exactamente esa: la arquitectura. En Cymit, este sistema convirtió dos millones de referencias de un problema operativo en una ventaja competitiva que contribuyó al exit al Grupo PALEX. El mismo planteamiento es aplicable a cualquier proyecto que necesite alimentar Odoo con datos del mundo exterior a escala.

¿Necesitas automatizar la captación de datos en tu Odoo?

Solicitar auditoría técnica gratuita

Seguridad en Odoo: hardening del servidor y Nginx para producción
Guía técnica completa para reducir la superficie de ataque de Odoo: usuario no-root, firewall, fail2ban, TLS, HSTS, rate limiting, headers de seguridad, ocultar el gestor de bases de datos y backups cifrados.