Python async scrapper with postgres and sqlalchemy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 | """
Async Wikipedia crawler with article-only link filter
====================================================
Script by parsing *infobox* tables **and** restricting the
link queue to real article pages whose paths match the classic “/wiki/Title”
pattern (no `File:`, `Help:`, etc.).
Run with:
python async_crawler_infobox.py
Dependencies (pip install ...): aiohttp, lxml, beautifulsoup4, sqlalchemy, psycopg2-binary
"""
import asyncio
import aiohttp
import json
import re
from lxml import html
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
from sqlalchemy import (
create_engine,
Column,
Integer,
String,
Text,
ForeignKey,
Table,
)
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
# ───── Config ─────
DATABASE_URL = "postgresql+psycopg2://username:password@localhost:5432/pages"
MAX_CONCURRENCY = 10
# only follow links like /wiki/Foo (exclude namespaces containing ':' and non-article paths)
WIKI_PATH_PATTERN = re.compile(r"^/wiki/[^:]+$")
# ───── Database Setup ─────
engine = create_engine(DATABASE_URL, echo=False)
Session = sessionmaker(bind=engine)
session = Session()
Base = declarative_base()
page_children = Table(
"page_children",
Base.metadata,
Column("parent_id", Integer, ForeignKey("pages.id", ondelete="CASCADE"), primary_key=True),
Column("child_id", Integer, ForeignKey("pages.id", ondelete="CASCADE"), primary_key=True),
)
def extract_infobox(content: bytes) -> dict:
"""Return key→value pairs from the first Wikipedia infobox on the page."""
soup = BeautifulSoup(content, "lxml")
table = soup.select_one("table.infobox")
if not table:
return {}
records = []
for tr in table.select("tr"):
hdr = tr.find("th", attrs={"scope": "row"})
if hdr is None:
continue
key = hdr.get_text(" ", strip=True)
td = tr.find("td")
if not td:
continue
for sup in td.select("sup, .reference"):
sup.decompose()
val = td.get_text(" ", strip=True)
records.append((key, val))
return dict(records)
class Page(Base):
__tablename__ = "pages"
id = Column(Integer, primary_key=True)
url = Column(String, unique=True, nullable=False)
title = Column(String)
description = Column(String)
keywords = Column(String)
infobox = Column(Text) # JSON-encoded string
children = relationship(
"Page",
secondary=page_children,
primaryjoin=id == page_children.c.parent_id,
secondaryjoin=id == page_children.c.child_id,
backref="parents",
)
Base.metadata.create_all(engine)
# ───── Async Crawler ─────
visited: set[str] = set()
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
async def fetch_and_parse(session_http: aiohttp.ClientSession, url: str, base_domain: str, depth: int | None = None):
"""Fetch *url*, store metadata + infobox, then crawl its article children."""
# stop conditions
if url in visited or urlparse(url).netloc != base_domain:
return
if depth is not None and depth < 0:
return
visited.add(url)
print(f"[+] Crawling: {url} (depth={'∞' if depth is None else depth})")
# ── Fetch ──
try:
async with semaphore, session_http.get(url, timeout=10) as resp:
if resp.status != 200:
return
content = await resp.read()
except Exception as e:
print(f"[-] Error fetching {url}: {e}")
return
tree = html.fromstring(content)
# ── Extract metadata ──
title_text = tree.findtext(".//title") or ""
meta_desc = tree.xpath("//meta[@name='description']/@content")
meta_keywords = tree.xpath("//meta[@name='keywords']/@content")
# ── Extract infobox ──
infobox_data = extract_infobox(content)
page_obj = session.query(Page).filter_by(url=url).first()
if not page_obj:
page_obj = Page(url=url)
session.add(page_obj)
page_obj.title = title_text.strip()
page_obj.description = meta_desc[0].strip() if meta_desc else ""
page_obj.keywords = meta_keywords[0].strip() if meta_keywords else ""
page_obj.infobox = json.dumps(infobox_data, ensure_ascii=False) if infobox_data else None
session.commit()
# ── Recurse ──
tasks = []
for el, attr, link, _ in tree.iterlinks():
if el.tag != "a" or attr != "href":
continue
child_url = urljoin(url, link).split("#")[0].rstrip("/")
# early rejections
if child_url.startswith(("mailto:", "javascript:")) or child_url in visited:
continue
if urlparse(child_url).netloc != base_domain:
continue
if not WIKI_PATH_PATTERN.match(urlparse(child_url).path):
continue # skip non-article paths
child_page = session.query(Page).filter_by(url=child_url).first()
if not child_page:
child_page = Page(url=child_url)
session.add(child_page)
session.commit()
if child_page not in page_obj.children:
page_obj.children.append(child_page)
next_depth = None if depth is None else depth - 1
tasks.append(fetch_and_parse(session_http, child_url, base_domain, next_depth))
session.commit()
if tasks:
await asyncio.gather(*tasks)
async def crawl(start_url: str, depth: int | None = None):
"""Entry point: crawl starting from *start_url* down to *depth*."""
base_domain = urlparse(start_url).netloc
async with aiohttp.ClientSession(headers={"User-Agent": "Mozilla/5.0"}) as session_http:
await fetch_and_parse(session_http, start_url, base_domain, depth)
# ───── Export to JSON ─────
def export_to_json(filename: str = "pages_export.json"):
rows = []
for p in session.query(Page).all():
rows.append(
{
"url": p.url,
"title": p.title,
"description": p.description,
"keywords": p.keywords,
"infobox": json.loads(p.infobox) if p.infobox else {},
"children": [c.url for c in p.children],
}
)
with open(filename, "w", encoding="utf-8") as fh:
json.dump(rows, fh, ensure_ascii=False, indent=2)
print(f"[✓] Exported to {filename}")
# ───── Main Entrypoint ─────
if __name__ == "__main__":
seed = ""
asyncio.run(crawl(seed, depth=None))
export_to_json()
|