Python async scrapper with postgres and sqlalchemy
"""
Async Wikipedia crawler with article-only link filter
====================================================
Script by parsing *infobox* tables **and** restricting the
link queue to real article pages whose paths match the classic “/wiki/Title”
pattern (no `File:`, `Help:`, etc.).
Run with:
python async_crawler_infobox.py
Dependencies (pip install ...): aiohttp, lxml, beautifulsoup4, sqlalchemy, psycopg2-binary
"""
import asyncio
import aiohttp
import json
import re
from lxml import html
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
from sqlalchemy import (
create_engine,
Column,
Integer,
String,
Text,
ForeignKey,
Table,
)
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
# ───── Config ─────
DATABASE_URL = "postgresql+psycopg2://username:password@localhost:5432/pages"
MAX_CONCURRENCY = 10
# only follow links like /wiki/Foo (exclude namespaces containing ':' and non-article paths)
WIKI_PATH_PATTERN = re.compile(r"^/wiki/[^:]+$")
# ───── Database Setup ─────
engine = create_engine(DATABASE_URL, echo=False)
Session = sessionmaker(bind=engine)
session = Session()
Base = declarative_base()
page_children = Table(
"page_children",
Base.metadata,
Column("parent_id", Integer, ForeignKey("pages.id", ondelete="CASCADE"), primary_key=True),
Column("child_id", Integer, ForeignKey("pages.id", ondelete="CASCADE"), primary_key=True),
)
def extract_infobox(content: bytes) -> dict:
"""Return key→value pairs from the first Wikipedia infobox on the page."""
soup = BeautifulSoup(content, "lxml")
table = soup.select_one("table.infobox")
if not table:
return {}
records = []
for tr in table.select("tr"):
hdr = tr.find("th", attrs={"scope": "row"})
if hdr is None:
continue
key = hdr.get_text(" ", strip=True)
td = tr.find("td")
if not td:
continue
for sup in td.select("sup, .reference"):
sup.decompose()
val = td.get_text(" ", strip=True)
records.append((key, val))
return dict(records)
class Page(Base):
__tablename__ = "pages"
id = Column(Integer, primary_key=True)
url = Column(String, unique=True, nullable=False)
title = Column(String)
description = Column(String)
keywords = Column(String)
infobox = Column(Text) # JSON-encoded string
children = relationship(
"Page",
secondary=page_children,
primaryjoin=id == page_children.c.parent_id,
secondaryjoin=id == page_children.c.child_id,
backref="parents",
)
Base.metadata.create_all(engine)
# ───── Async Crawler ─────
visited: set[str] = set()
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
async def fetch_and_parse(session_http: aiohttp.ClientSession, url: str, base_domain: str, depth: int | None = None):
"""Fetch *url*, store metadata + infobox, then crawl its article children."""
# stop conditions
if url in visited or urlparse(url).netloc != base_domain:
return
if depth is not None and depth < 0:
return
visited.add(url)
print(f"[+] Crawling: {url} (depth={'∞' if depth is None else depth})")
# ── Fetch ──
try:
async with semaphore, session_http.get(url, timeout=10) as resp:
if resp.status != 200:
return
content = await resp.read()
except Exception as e:
print(f"[-] Error fetching {url}: {e}")
return
tree = html.fromstring(content)
# ── Extract metadata ──
title_text = tree.findtext(".//title") or ""
meta_desc = tree.xpath("//meta[@name='description']/@content")
meta_keywords = tree.xpath("//meta[@name='keywords']/@content")
# ── Extract infobox ──
infobox_data = extract_infobox(content)
page_obj = session.query(Page).filter_by(url=url).first()
if not page_obj:
page_obj = Page(url=url)
session.add(page_obj)
page_obj.title = title_text.strip()
page_obj.description = meta_desc[0].strip() if meta_desc else ""
page_obj.keywords = meta_keywords[0].strip() if meta_keywords else ""
page_obj.infobox = json.dumps(infobox_data, ensure_ascii=False) if infobox_data else None
session.commit()
# ── Recurse ──
tasks = []
for el, attr, link, _ in tree.iterlinks():
if el.tag != "a" or attr != "href":
continue
child_url = urljoin(url, link).split("#")[0].rstrip("/")
# early rejections
if child_url.startswith(("mailto:", "javascript:")) or child_url in visited:
continue
if urlparse(child_url).netloc != base_domain:
continue
if not WIKI_PATH_PATTERN.match(urlparse(child_url).path):
continue # skip non-article paths
child_page = session.query(Page).filter_by(url=child_url).first()
if not child_page:
child_page = Page(url=child_url)
session.add(child_page)
session.commit()
if child_page not in page_obj.children:
page_obj.children.append(child_page)
next_depth = None if depth is None else depth - 1
tasks.append(fetch_and_parse(session_http, child_url, base_domain, next_depth))
session.commit()
if tasks:
await asyncio.gather(*tasks)
async def crawl(start_url: str, depth: int | None = None):
"""Entry point: crawl starting from *start_url* down to *depth*."""
base_domain = urlparse(start_url).netloc
async with aiohttp.ClientSession(headers={"User-Agent": "Mozilla/5.0"}) as session_http:
await fetch_and_parse(session_http, start_url, base_domain, depth)
# ───── Export to JSON ─────
def export_to_json(filename: str = "pages_export.json"):
rows = []
for p in session.query(Page).all():
rows.append(
{
"url": p.url,
"title": p.title,
"description": p.description,
"keywords": p.keywords,
"infobox": json.loads(p.infobox) if p.infobox else {},
"children": [c.url for c in p.children],
}
)
with open(filename, "w", encoding="utf-8") as fh:
json.dump(rows, fh, ensure_ascii=False, indent=2)
print(f"[✓] Exported to {filename}")
# ───── Main Entrypoint ─────
if __name__ == "__main__":
seed = ""
asyncio.run(crawl(seed, depth=None))
export_to_json()