515 lines
18 KiB
Python
515 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Pokemon Discovery (disco.py)
|
|
Scrapes Pokemon TCG pack & tin products from Dollar General and generates a PDF catalog.
|
|
|
|
Usage:
|
|
python disco.py # Full run: scrape + generate PDF
|
|
python disco.py --scrape-only # Just scrape, output JSON
|
|
python disco.py --pdf-only FILE.json # Just generate PDF from existing JSON
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import requests
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from urllib.parse import urljoin, quote
|
|
|
|
import barcode
|
|
from barcode.writer import ImageWriter
|
|
from bs4 import BeautifulSoup
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
HAR_FILE = "www.dollargeneral.com_Archive [26-03-21 15-14-28].har"
|
|
BASE_URL = "https://www.dollargeneral.com"
|
|
OUTPUT_DIR = Path("catalog_output")
|
|
IMAGES_DIR = OUTPUT_DIR / "images"
|
|
BARCODES_DIR = OUTPUT_DIR / "barcodes"
|
|
|
|
HEADERS = {
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:148.0) Gecko/20100101 Firefox/148.0",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
"Accept-Language": "en-US,en;q=0.9",
|
|
}
|
|
|
|
# Keywords that identify card packs and tins (case-insensitive)
|
|
CARD_TIN_KEYWORDS = ["pack", "tin", "booster", "card game", "tcg"]
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Step 1 — Product Discovery (from HAR file API responses)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def extract_products_from_har(har_path: str) -> list[dict]:
|
|
"""Parse HAR file and extract all Pokemon products from API responses."""
|
|
print(f"📦 Reading HAR file: {har_path}")
|
|
|
|
with open(har_path, "r", encoding="utf-8") as f:
|
|
har = json.load(f)
|
|
|
|
api_url = "https://dggo.dollargeneral.com/omni/api/v2/category/search/provider"
|
|
unique: dict[str, dict] = {}
|
|
|
|
for entry in har["log"]["entries"]:
|
|
req = entry["request"]
|
|
resp = entry["response"]
|
|
if req["url"] != api_url or req["method"] != "POST":
|
|
continue
|
|
text = resp.get("content", {}).get("text", "")
|
|
if not text:
|
|
continue
|
|
try:
|
|
data = json.loads(text)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
for item in data.get("ItemList", {}).get("Items", []):
|
|
upc = str(item.get("UPC", ""))
|
|
if upc and upc not in unique:
|
|
unique[upc] = item
|
|
|
|
print(f" Found {len(unique)} unique products in HAR data")
|
|
return list(unique.values())
|
|
|
|
|
|
def rootsv_to_sku(rootsv: str) -> str:
|
|
"""Convert rootSV like '0419363_1' to SKU like '41936301'.
|
|
|
|
The rootSV base (minus leading zero) + '01' gives the DG item number.
|
|
The '_N' suffix is a variant/image index, not part of the SKU.
|
|
"""
|
|
if not rootsv:
|
|
return ""
|
|
base = rootsv.split("_")[0].lstrip("0")
|
|
return base + "01"
|
|
|
|
|
|
def build_product_url(upc: str) -> str:
|
|
"""Construct a Dollar General product page URL from a UPC."""
|
|
return f"{BASE_URL}/p/pokemon-product/{upc}"
|
|
|
|
|
|
def filter_card_and_tin_products(raw_items: list[dict]) -> list[dict]:
|
|
"""Keep only products whose description contains card/pack/tin keywords."""
|
|
filtered = []
|
|
for item in raw_items:
|
|
desc = item.get("Description", "").lower()
|
|
if any(kw in desc for kw in CARD_TIN_KEYWORDS):
|
|
filtered.append(item)
|
|
return filtered
|
|
|
|
|
|
def normalize_product(item: dict) -> dict:
|
|
"""Convert raw API item into a clean product dict."""
|
|
upc = str(item.get("UPC", ""))
|
|
rootsv = item.get("rootSV", "")
|
|
sku = rootsv_to_sku(rootsv)
|
|
qty = item.get("AvailableQty", 0)
|
|
|
|
return {
|
|
"title": item.get("Description", "Unknown Product"),
|
|
"sku": sku,
|
|
"upc": upc,
|
|
"price": f"${item.get('Price', 0):.2f}",
|
|
"stock": f"In Stock ({qty})" if qty and qty > 0 else "Out of Stock",
|
|
"quantity": qty,
|
|
"image_url": item.get("Image", ""),
|
|
"rating": item.get("AverageRating", 0),
|
|
"reviews": item.get("RatingReviewCount", 0),
|
|
"url": build_product_url(upc),
|
|
}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Step 2 — Enrich from product pages (get real URL slug, extra details)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def enrich_from_product_page(product: dict) -> dict:
|
|
"""Visit the actual product page to get the real URL and any missing data."""
|
|
upc = product["upc"]
|
|
sku = product["sku"]
|
|
|
|
# Try to get the real product page
|
|
# DG product pages can be accessed by UPC at search
|
|
search_url = f"{BASE_URL}/search?q={upc}"
|
|
try:
|
|
resp = requests.get(search_url, headers=HEADERS, timeout=15)
|
|
if resp.status_code == 200:
|
|
soup = BeautifulSoup(resp.text, "html.parser")
|
|
# Look for the canonical product link
|
|
links = soup.select(f'a[href*="/p/"][href*="{upc}"]')
|
|
if links:
|
|
href = links[0].get("href", "")
|
|
product["url"] = urljoin(BASE_URL, href)
|
|
except Exception:
|
|
pass
|
|
|
|
# Also try visiting the product page directly by known pattern
|
|
# The image URL contains the DG item number: dg-XXXXXXXX-1
|
|
img_url = product.get("image_url", "")
|
|
match = re.search(r"dg-(\d+)-", img_url)
|
|
if match:
|
|
dg_item = match.group(1)
|
|
# This is the item number used in the SKU
|
|
if not product.get("sku"):
|
|
product["sku"] = dg_item
|
|
|
|
return product
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Step 3 — Download images & generate barcodes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def download_image(url: str, dest: Path) -> Path | None:
|
|
"""Download image from URL, convert to PNG for LaTeX compatibility."""
|
|
if not url:
|
|
return None
|
|
try:
|
|
resp = requests.get(url, headers=HEADERS, timeout=15)
|
|
resp.raise_for_status()
|
|
# Convert to PNG regardless of source format (handles WebP, etc.)
|
|
from io import BytesIO
|
|
img = Image.open(BytesIO(resp.content)).convert("RGB")
|
|
png_dest = dest.with_suffix(".png")
|
|
img.save(png_dest, "PNG")
|
|
return png_dest
|
|
except Exception as e:
|
|
print(f" ⚠ Image download failed: {e}")
|
|
return None
|
|
|
|
|
|
def make_placeholder(dest: Path, text: str = "No Image") -> Path:
|
|
"""Create a simple placeholder image."""
|
|
img = Image.new("RGB", (300, 300), "#e0e0e0")
|
|
draw = ImageDraw.Draw(img)
|
|
try:
|
|
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 20)
|
|
except Exception:
|
|
font = ImageFont.load_default()
|
|
bbox = draw.textbbox((0, 0), text, font=font)
|
|
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
|
|
draw.text(((300 - tw) / 2, (300 - th) / 2), text, fill="#888", font=font)
|
|
img.save(dest)
|
|
return dest
|
|
|
|
|
|
def generate_barcode(upc: str, dest_dir: Path) -> Path | None:
|
|
"""Generate a UPC-A barcode PNG from a UPC number. Returns path to the .png file."""
|
|
digits = re.sub(r"\D", "", upc)
|
|
if not digits:
|
|
return None
|
|
# UPC-A: pass first 11 digits, library auto-calculates the 12th (check digit)
|
|
# A full UPC is 12 digits where the 12th is already the check digit
|
|
digits = digits[:11].zfill(11)
|
|
try:
|
|
upc_cls = barcode.get_barcode_class("upca")
|
|
bc = upc_cls(digits, writer=ImageWriter())
|
|
# barcode lib appends .png automatically
|
|
out = dest_dir / f"barcode_{upc}"
|
|
saved = bc.save(
|
|
str(out),
|
|
options={
|
|
"module_width": 0.3,
|
|
"module_height": 15.0,
|
|
"quiet_zone": 6.5,
|
|
"font_size": 10,
|
|
"text_distance": 5.0,
|
|
},
|
|
)
|
|
return Path(saved)
|
|
except Exception as e:
|
|
print(f" ⚠ Barcode generation failed for {upc}: {e}")
|
|
return None
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Step 4 — Generate PDF via pandoc
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def generate_catalog_pdf(products: list[dict]) -> Path | None:
|
|
"""Build a LaTeX file and convert to PDF with pandoc.
|
|
|
|
Layout per page (matching product.png mockup):
|
|
┌─────────────────────┐
|
|
│ │
|
|
│ Product Image │ ← large, centered, bordered
|
|
│ │
|
|
└─────────────────────┘
|
|
Name ← product title, bold
|
|
Stk ← stock / price info
|
|
┌─────────────────────┐
|
|
│ UPC-A Barcode │ ← centered, bordered
|
|
└─────────────────────┘
|
|
SKU: XXXXXXX ← small text
|
|
UPC: XXXXXXXXXXXX ← small text
|
|
"""
|
|
timestamp_label = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
timestamp_file = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
# Build LaTeX document directly for precise layout control
|
|
latex_lines = [
|
|
r"\documentclass[11pt,letterpaper]{article}",
|
|
r"\usepackage[margin=0.75in]{geometry}",
|
|
r"\usepackage{graphicx}",
|
|
r"\usepackage{fancybox}",
|
|
r"\usepackage{xcolor}",
|
|
r"\usepackage{parskip}",
|
|
r"\usepackage[utf8]{inputenc}",
|
|
r"\usepackage[T1]{fontenc}",
|
|
r"\usepackage{lmodern}",
|
|
r"\usepackage{hyperref}",
|
|
r"\pagestyle{empty}",
|
|
r"\begin{document}",
|
|
"",
|
|
# Manifest page
|
|
r"\begin{center}",
|
|
r"{\Huge\bfseries Pokemon TCG Product Catalog}\\[0.4cm]",
|
|
r"{\Large Dollar General}\\[0.2cm]",
|
|
r"{\large Generated: " + timestamp_label + r"}\\[0.2cm]",
|
|
r"{\large " + str(len(products)) + r" Cards \& Tins}",
|
|
r"\end{center}",
|
|
r"\vspace{0.8cm}",
|
|
r"\begin{tabular}{r l l r r}",
|
|
r"\hline",
|
|
r"\textbf{\#} & \textbf{Product} & \textbf{SKU} & \textbf{Price} & \textbf{Stock} \\",
|
|
r"\hline",
|
|
]
|
|
for i, prod in enumerate(products, 1):
|
|
safe = (
|
|
prod["title"][:50]
|
|
.replace("&", r"\&").replace("%", r"\%").replace("$", r"\$")
|
|
.replace("#", r"\#").replace("_", r"\_").replace("é", r"\'e")
|
|
)
|
|
price = prod["price"].replace("$", r"\$")
|
|
qty = prod.get("quantity", 0)
|
|
stock_short = str(qty) if qty else "---"
|
|
latex_lines.append(
|
|
f"{i} & {safe} & \\texttt{{{prod['sku']}}} & {price} & {stock_short} \\\\"
|
|
)
|
|
latex_lines += [
|
|
r"\hline",
|
|
r"\end{tabular}",
|
|
r"\newpage",
|
|
"",
|
|
]
|
|
|
|
for i, prod in enumerate(products, 1):
|
|
title = prod["title"]
|
|
sku = prod["sku"]
|
|
upc = prod["upc"]
|
|
price = prod["price"]
|
|
stock = prod["stock"]
|
|
|
|
# Download product image
|
|
img_dest = IMAGES_DIR / f"product_{i}_{sku}.jpg"
|
|
img_path = download_image(prod.get("image_url"), img_dest)
|
|
if not img_path:
|
|
img_path = make_placeholder(
|
|
IMAGES_DIR / f"product_{i}_{sku}_placeholder.png", title[:30]
|
|
)
|
|
|
|
# Generate barcode from UPC (not SKU)
|
|
bc_path = generate_barcode(upc, BARCODES_DIR)
|
|
|
|
# Escape LaTeX special characters in text fields
|
|
safe_title = (
|
|
title.replace("&", r"\&")
|
|
.replace("%", r"\%")
|
|
.replace("$", r"\$")
|
|
.replace("#", r"\#")
|
|
.replace("_", r"\_")
|
|
.replace("é", r"\'e")
|
|
)
|
|
safe_stock = stock.replace("&", r"\&")
|
|
safe_price = price.replace("$", r"\$")
|
|
|
|
# Absolute paths for LaTeX
|
|
abs_img = str(img_path.resolve())
|
|
abs_bc = str(bc_path.resolve()) if bc_path else None
|
|
|
|
latex_lines += [
|
|
# Name — bold, large
|
|
r"{\Large\bfseries " + safe_title + r"}",
|
|
"",
|
|
r"\vspace{0.15cm}",
|
|
"",
|
|
# Stock and price
|
|
r"{\large " + safe_stock + r" \hfill " + safe_price + r"}",
|
|
"",
|
|
r"\vspace{0.1cm}",
|
|
"",
|
|
# SKU and UPC
|
|
r"{\small SKU: \texttt{" + sku + r"} \hfill UPC: \texttt{" + upc + r"}}",
|
|
"",
|
|
r"\vspace{0.3cm}",
|
|
"",
|
|
r"\begin{center}",
|
|
# Product image — large, centered, with border
|
|
r"\fbox{\includegraphics[width=0.7\textwidth,height=0.40\textheight,keepaspectratio]{"
|
|
+ abs_img
|
|
+ r"}}",
|
|
r"\end{center}",
|
|
"",
|
|
r"\vfill",
|
|
"",
|
|
]
|
|
|
|
# Barcode — centered, bordered, pushed to bottom
|
|
if abs_bc:
|
|
latex_lines += [
|
|
r"\begin{center}",
|
|
r"\fbox{\includegraphics[width=0.55\textwidth]{"
|
|
+ abs_bc
|
|
+ r"}}",
|
|
r"\end{center}",
|
|
"",
|
|
]
|
|
|
|
# Page break between products (not after last)
|
|
if i < len(products):
|
|
latex_lines.append(r"\newpage")
|
|
latex_lines.append("")
|
|
|
|
print(f" ✅ [{i}/{len(products)}] {title}")
|
|
|
|
latex_lines.append(r"\end{document}")
|
|
|
|
# Write .tex file
|
|
tex_file = OUTPUT_DIR / f"pokemon_catalog_{timestamp_file}.tex"
|
|
tex_file.write_text("\n".join(latex_lines), encoding="utf-8")
|
|
print(f"\n📝 LaTeX source: {tex_file}")
|
|
|
|
# Compile to PDF with pdflatex directly (pandoc strips images from raw .tex)
|
|
pdf_file = OUTPUT_DIR / f"pokemon_catalog_{timestamp_file}.pdf"
|
|
|
|
for engine in ["pdflatex", "xelatex"]:
|
|
try:
|
|
result = subprocess.run(
|
|
[engine, "-interaction=nonstopmode",
|
|
f"-output-directory={OUTPUT_DIR}", str(tex_file)],
|
|
capture_output=True, text=True, timeout=120,
|
|
)
|
|
if pdf_file.exists() and pdf_file.stat().st_size > 1000:
|
|
# Clean up LaTeX temp files
|
|
for ext in [".aux", ".log", ".out"]:
|
|
tmp = pdf_file.with_suffix(ext)
|
|
if tmp.exists():
|
|
tmp.unlink()
|
|
print(
|
|
f"📄 PDF generated: {pdf_file} ({pdf_file.stat().st_size // 1024} KB)"
|
|
)
|
|
return pdf_file
|
|
except FileNotFoundError:
|
|
continue
|
|
except Exception:
|
|
continue
|
|
|
|
print(f"⚠ PDF generation failed. LaTeX source available at: {tex_file}")
|
|
return None
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
args = sys.argv[1:]
|
|
|
|
# Handle --pdf-only mode
|
|
if "--pdf-only" in args:
|
|
idx = args.index("--pdf-only")
|
|
json_file = args[idx + 1] if idx + 1 < len(args) else None
|
|
if not json_file or not Path(json_file).exists():
|
|
print(f"Usage: {sys.argv[0]} --pdf-only <products.json>")
|
|
sys.exit(1)
|
|
products = json.loads(Path(json_file).read_text())
|
|
for d in [OUTPUT_DIR, IMAGES_DIR, BARCODES_DIR]:
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
print(f"\n🖨️ Generating PDF from {json_file} ({len(products)} products)...")
|
|
generate_catalog_pdf(products)
|
|
return
|
|
|
|
scrape_only = "--scrape-only" in args
|
|
|
|
# --- Banner ---
|
|
timestamp_file = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
print("=" * 60)
|
|
print(" 🔍 Pokemon Discovery (pokemon-disco)")
|
|
print(" Dollar General — Pokemon TCG Cards & Tins")
|
|
print(f" {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print("=" * 60)
|
|
|
|
# --- Step 1: Extract from HAR ---
|
|
if not Path(HAR_FILE).exists():
|
|
print(f"\n❌ HAR file not found: {HAR_FILE}")
|
|
print(" Capture a HAR file from the Pokemon page in your browser")
|
|
print(" and place it in the project directory.")
|
|
sys.exit(1)
|
|
|
|
raw_items = extract_products_from_har(HAR_FILE)
|
|
|
|
# --- Step 2: Filter for Cards & Tins ---
|
|
print(f"\n🎯 Filtering for card packs and tins...")
|
|
card_tin_items = filter_card_and_tin_products(raw_items)
|
|
print(f" {len(card_tin_items)} of {len(raw_items)} products match (pack/tin/booster/tcg)")
|
|
|
|
if not card_tin_items:
|
|
print("❌ No card or tin products found.")
|
|
sys.exit(1)
|
|
|
|
# Show what was filtered out
|
|
excluded = [i for i in raw_items if i not in card_tin_items]
|
|
if excluded:
|
|
print(f"\n Excluded {len(excluded)} non-card/tin products:")
|
|
for item in excluded:
|
|
print(f" ✗ {item.get('Description', '?')}")
|
|
|
|
# --- Step 3: Normalize ---
|
|
print(f"\n📋 Processing {len(card_tin_items)} products...")
|
|
products = [normalize_product(item) for item in card_tin_items]
|
|
|
|
# Print summary table
|
|
print()
|
|
print(f" {'#':<3} {'Title':<55} {'SKU':<12} {'Price':<8} {'Stock'}")
|
|
print(f" {'—'*3} {'—'*55} {'—'*12} {'—'*8} {'—'*15}")
|
|
for i, p in enumerate(products, 1):
|
|
title = p['title'][:53]
|
|
print(f" {i:<3} {title:<55} {p['sku']:<12} {p['price']:<8} {p['stock']}")
|
|
|
|
# --- Step 4: Save JSON ---
|
|
json_file = f"pokemon_tcg_products_{timestamp_file}.json"
|
|
Path(json_file).write_text(json.dumps(products, indent=2, ensure_ascii=False))
|
|
print(f"\n💾 Product data: {json_file}")
|
|
|
|
if scrape_only:
|
|
print("\n✅ Scrape complete (--scrape-only). Run with --pdf-only to generate catalog.")
|
|
return
|
|
|
|
# --- Step 5: Generate PDF ---
|
|
for d in [OUTPUT_DIR, IMAGES_DIR, BARCODES_DIR]:
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
|
|
print(f"\n🖨️ Generating PDF catalog...")
|
|
pdf_path = generate_catalog_pdf(products)
|
|
|
|
# --- Done ---
|
|
print("\n" + "=" * 60)
|
|
if pdf_path:
|
|
print(f" ✅ COMPLETE!")
|
|
print(f" 📄 PDF Catalog: {pdf_path}")
|
|
print(f" 💾 Product JSON: {json_file}")
|
|
print(f" 🏷️ Barcodes: {BARCODES_DIR}/")
|
|
print(f" 🖼️ Images: {IMAGES_DIR}/")
|
|
else:
|
|
print(f" ⚠ PDF generation failed — markdown file available in {OUTPUT_DIR}/")
|
|
print(f" 💾 Product JSON: {json_file}")
|
|
print("=" * 60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|