Python async scrapper with postgres and sqlalchemy

Python -- Posted on June 6, 2025

Python async scrapper with postgres and sqlalchemy

              
                """
Async Wikipedia crawler with article-only link filter
====================================================
Script by parsing *infobox* tables **and** restricting the
link queue to real article pages whose paths match the classic “/wiki/Title”
pattern (no `File:`, `Help:`, etc.).

Run with:
    python async_crawler_infobox.py

Dependencies (pip install ...): aiohttp, lxml, beautifulsoup4, sqlalchemy, psycopg2-binary
"""

import asyncio
import aiohttp
import json
import re
from lxml import html
from urllib.parse import urljoin, urlparse

from bs4 import BeautifulSoup

from sqlalchemy import (
    create_engine,
    Column,
    Integer,
    String,
    Text,
    ForeignKey,
    Table,
)
from sqlalchemy.orm import declarative_base, relationship, sessionmaker

# ───── Config ─────
DATABASE_URL = "postgresql+psycopg2://username:password@localhost:5432/pages"
MAX_CONCURRENCY = 10

# only follow links like /wiki/Foo (exclude namespaces containing ':' and non-article paths)
WIKI_PATH_PATTERN = re.compile(r"^/wiki/[^:]+$")

# ───── Database Setup ─────
engine = create_engine(DATABASE_URL, echo=False)
Session = sessionmaker(bind=engine)
session = Session()
Base = declarative_base()

page_children = Table(
    "page_children",
    Base.metadata,
    Column("parent_id", Integer, ForeignKey("pages.id", ondelete="CASCADE"), primary_key=True),
    Column("child_id", Integer, ForeignKey("pages.id", ondelete="CASCADE"), primary_key=True),
)


def extract_infobox(content: bytes) -> dict:
    """Return key→value pairs from the first Wikipedia infobox on the page."""
    soup = BeautifulSoup(content, "lxml")
    table = soup.select_one("table.infobox")
    if not table:
        return {}

    records = []
    for tr in table.select("tr"):
        hdr = tr.find("th", attrs={"scope": "row"})
        if hdr is None:
            continue
        key = hdr.get_text(" ", strip=True)
        td = tr.find("td")
        if not td:
            continue
        for sup in td.select("sup, .reference"):
            sup.decompose()
        val = td.get_text(" ", strip=True)
        records.append((key, val))
    return dict(records)


class Page(Base):
    __tablename__ = "pages"

    id = Column(Integer, primary_key=True)
    url = Column(String, unique=True, nullable=False)
    title = Column(String)
    description = Column(String)
    keywords = Column(String)
    infobox = Column(Text)  # JSON-encoded string

    children = relationship(
        "Page",
        secondary=page_children,
        primaryjoin=id == page_children.c.parent_id,
        secondaryjoin=id == page_children.c.child_id,
        backref="parents",
    )


Base.metadata.create_all(engine)

# ───── Async Crawler ─────
visited: set[str] = set()
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)


async def fetch_and_parse(session_http: aiohttp.ClientSession, url: str, base_domain: str, depth: int | None = None):
    """Fetch *url*, store metadata + infobox, then crawl its article children."""

    # stop conditions
    if url in visited or urlparse(url).netloc != base_domain:
        return
    if depth is not None and depth < 0:
        return

    visited.add(url)
    print(f"[+] Crawling: {url} (depth={'∞' if depth is None else depth})")

    # ── Fetch ──
    try:
        async with semaphore, session_http.get(url, timeout=10) as resp:
            if resp.status != 200:
                return
            content = await resp.read()
    except Exception as e:
        print(f"[-] Error fetching {url}: {e}")
        return

    tree = html.fromstring(content)

    # ── Extract metadata ──
    title_text = tree.findtext(".//title") or ""
    meta_desc = tree.xpath("//meta[@name='description']/@content")
    meta_keywords = tree.xpath("//meta[@name='keywords']/@content")

    # ── Extract infobox ──
    infobox_data = extract_infobox(content)

    page_obj = session.query(Page).filter_by(url=url).first()
    if not page_obj:
        page_obj = Page(url=url)
        session.add(page_obj)

    page_obj.title = title_text.strip()
    page_obj.description = meta_desc[0].strip() if meta_desc else ""
    page_obj.keywords = meta_keywords[0].strip() if meta_keywords else ""
    page_obj.infobox = json.dumps(infobox_data, ensure_ascii=False) if infobox_data else None

    session.commit()

    # ── Recurse ──
    tasks = []
    for el, attr, link, _ in tree.iterlinks():
        if el.tag != "a" or attr != "href":
            continue
        child_url = urljoin(url, link).split("#")[0].rstrip("/")

        # early rejections
        if child_url.startswith(("mailto:", "javascript:")) or child_url in visited:
            continue
        if urlparse(child_url).netloc != base_domain:
            continue
        if not WIKI_PATH_PATTERN.match(urlparse(child_url).path):
            continue  # skip non-article paths

        child_page = session.query(Page).filter_by(url=child_url).first()
        if not child_page:
            child_page = Page(url=child_url)
            session.add(child_page)
            session.commit()

        if child_page not in page_obj.children:
            page_obj.children.append(child_page)

        next_depth = None if depth is None else depth - 1
        tasks.append(fetch_and_parse(session_http, child_url, base_domain, next_depth))

    session.commit()
    if tasks:
        await asyncio.gather(*tasks)


async def crawl(start_url: str, depth: int | None = None):
    """Entry point: crawl starting from *start_url* down to *depth*."""
    base_domain = urlparse(start_url).netloc
    async with aiohttp.ClientSession(headers={"User-Agent": "Mozilla/5.0"}) as session_http:
        await fetch_and_parse(session_http, start_url, base_domain, depth)


# ───── Export to JSON ─────

def export_to_json(filename: str = "pages_export.json"):
    rows = []
    for p in session.query(Page).all():
        rows.append(
            {
                "url": p.url,
                "title": p.title,
                "description": p.description,
                "keywords": p.keywords,
                "infobox": json.loads(p.infobox) if p.infobox else {},
                "children": [c.url for c in p.children],
            }
        )
    with open(filename, "w", encoding="utf-8") as fh:
        json.dump(rows, fh, ensure_ascii=False, indent=2)
    print(f"[✓] Exported to {filename}")


# ───── Main Entrypoint ─────
if __name__ == "__main__":
    seed = ""
    asyncio.run(crawl(seed, depth=None))
    export_to_json()
                  
   
            

Related Posts