- Comprehensive scraper for Dollar General Pokemon TCG products - Professional PDF catalog generator with UPC-A barcodes - Robust anti-bot handling with requests + Selenium fallback - Automatic image downloading and barcode generation - Unix-friendly timestamped filenames - Virtual environment support and dependency management - Complete documentation and usage guides
329 lines
11 KiB
Python
Executable File
329 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Pokemon Discovery - TCG Product Scraper for Dollar General
|
|
Scrapes product information and saves to JSON for PDF generation
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import time
|
|
import requests
|
|
from datetime import datetime
|
|
from urllib.parse import urljoin, urlparse
|
|
import pandas as pd
|
|
from bs4 import BeautifulSoup
|
|
|
|
# Try selenium imports (fallback for dynamic content)
|
|
try:
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.common.exceptions import TimeoutException
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
SELENIUM_AVAILABLE = True
|
|
except ImportError:
|
|
SELENIUM_AVAILABLE = False
|
|
print("Selenium not available, using requests only")
|
|
|
|
class PokemonTCGScraper:
|
|
def __init__(self):
|
|
self.base_url = "https://www.dollargeneral.com"
|
|
self.search_url = "https://www.dollargeneral.com/c/toys/pokemon?q=&soldAtStore=true"
|
|
self.session = requests.Session()
|
|
|
|
# Headers to appear more like a real browser
|
|
self.headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
|
'Accept-Language': 'en-US,en;q=0.5',
|
|
'Accept-Encoding': 'gzip, deflate',
|
|
'DNT': '1',
|
|
'Connection': 'keep-alive',
|
|
'Upgrade-Insecure-Requests': '1',
|
|
}
|
|
self.session.headers.update(self.headers)
|
|
|
|
self.products = []
|
|
|
|
def get_page_with_requests(self, url):
|
|
"""Try to get page content using requests"""
|
|
try:
|
|
response = self.session.get(url, timeout=30)
|
|
response.raise_for_status()
|
|
return response.text
|
|
except requests.RequestException as e:
|
|
print(f"Requests failed for {url}: {e}")
|
|
return None
|
|
|
|
def get_page_with_selenium(self, url):
|
|
"""Fallback to selenium for dynamic content"""
|
|
if not SELENIUM_AVAILABLE:
|
|
return None
|
|
|
|
options = Options()
|
|
options.add_argument('--headless')
|
|
options.add_argument('--no-sandbox')
|
|
options.add_argument('--disable-dev-shm-usage')
|
|
options.add_argument('--disable-gpu')
|
|
options.add_argument(f'--user-agent={self.headers["User-Agent"]}')
|
|
|
|
try:
|
|
driver = webdriver.Chrome(ChromeDriverManager().install(), options=options)
|
|
driver.get(url)
|
|
|
|
# Wait for content to load
|
|
WebDriverWait(driver, 10).until(
|
|
EC.presence_of_element_located((By.TAG_NAME, "body"))
|
|
)
|
|
|
|
# Additional wait for dynamic content
|
|
time.sleep(3)
|
|
|
|
html = driver.page_source
|
|
driver.quit()
|
|
return html
|
|
|
|
except Exception as e:
|
|
print(f"Selenium failed for {url}: {e}")
|
|
if 'driver' in locals():
|
|
driver.quit()
|
|
return None
|
|
|
|
def get_page_content(self, url):
|
|
"""Get page content, trying requests first, then selenium"""
|
|
print(f"Fetching: {url}")
|
|
|
|
# Try requests first
|
|
content = self.get_page_with_requests(url)
|
|
if content and len(content) > 1000: # Basic content check
|
|
return content
|
|
|
|
# Fallback to selenium
|
|
print("Falling back to Selenium...")
|
|
return self.get_page_with_selenium(url)
|
|
|
|
def extract_product_links(self, html):
|
|
"""Extract product page links from search results"""
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
links = []
|
|
|
|
# Common selectors for product links
|
|
selectors = [
|
|
'a[href*="/p/"]',
|
|
'.product-item a',
|
|
'.product-card a',
|
|
'.product-link',
|
|
'[data-testid*="product"] a'
|
|
]
|
|
|
|
for selector in selectors:
|
|
elements = soup.select(selector)
|
|
for element in elements:
|
|
href = element.get('href')
|
|
if href and '/p/' in href:
|
|
full_url = urljoin(self.base_url, href)
|
|
if full_url not in links:
|
|
links.append(full_url)
|
|
|
|
return links
|
|
|
|
def extract_product_info(self, url, html):
|
|
"""Extract product information from product page"""
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
product = {'url': url}
|
|
|
|
# Extract title
|
|
title_selectors = [
|
|
'h1',
|
|
'.product-title',
|
|
'.product-name',
|
|
'[data-testid="product-title"]',
|
|
'.pdp-product-name'
|
|
]
|
|
|
|
for selector in title_selectors:
|
|
title_elem = soup.select_one(selector)
|
|
if title_elem:
|
|
product['title'] = title_elem.get_text().strip()
|
|
break
|
|
|
|
# Extract price
|
|
price_selectors = [
|
|
'.price',
|
|
'.product-price',
|
|
'[data-testid="price"]',
|
|
'.price-current',
|
|
'.current-price'
|
|
]
|
|
|
|
for selector in price_selectors:
|
|
price_elem = soup.select_one(selector)
|
|
if price_elem:
|
|
price_text = price_elem.get_text().strip()
|
|
product['price'] = price_text
|
|
break
|
|
|
|
# Extract SKU
|
|
sku_selectors = [
|
|
'[data-sku]',
|
|
'.sku',
|
|
'.product-sku',
|
|
'*[text()*="SKU"]',
|
|
'script[type="application/ld+json"]'
|
|
]
|
|
|
|
# Try data attributes first
|
|
for selector in sku_selectors[:-1]:
|
|
elem = soup.select_one(selector)
|
|
if elem:
|
|
sku = elem.get('data-sku') or elem.get_text().strip()
|
|
if sku and sku.lower() != 'sku':
|
|
product['sku'] = sku
|
|
break
|
|
|
|
# Try JSON-LD structured data
|
|
if 'sku' not in product:
|
|
scripts = soup.find_all('script', type='application/ld+json')
|
|
for script in scripts:
|
|
try:
|
|
data = json.loads(script.string)
|
|
if isinstance(data, dict) and 'sku' in data:
|
|
product['sku'] = data['sku']
|
|
break
|
|
elif isinstance(data, list):
|
|
for item in data:
|
|
if isinstance(item, dict) and 'sku' in item:
|
|
product['sku'] = item['sku']
|
|
break
|
|
except:
|
|
continue
|
|
|
|
# Extract stock information
|
|
stock_selectors = [
|
|
'.stock',
|
|
'.inventory',
|
|
'.availability',
|
|
'[data-testid="stock"]',
|
|
'.in-stock',
|
|
'.out-of-stock'
|
|
]
|
|
|
|
for selector in stock_selectors:
|
|
stock_elem = soup.select_one(selector)
|
|
if stock_elem:
|
|
stock_text = stock_elem.get_text().strip().lower()
|
|
if 'in stock' in stock_text:
|
|
product['stock'] = 'In Stock'
|
|
elif 'out of stock' in stock_text:
|
|
product['stock'] = 'Out of Stock'
|
|
else:
|
|
product['stock'] = stock_text
|
|
break
|
|
|
|
# Extract image URL
|
|
img_selectors = [
|
|
'.product-image img',
|
|
'.product-photo img',
|
|
'.pdp-image img',
|
|
'[data-testid="product-image"] img',
|
|
'img[alt*="Pokemon"]',
|
|
'img[alt*="TCG"]'
|
|
]
|
|
|
|
for selector in img_selectors:
|
|
img_elem = soup.select_one(selector)
|
|
if img_elem:
|
|
src = img_elem.get('src') or img_elem.get('data-src')
|
|
if src:
|
|
product['image_url'] = urljoin(self.base_url, src)
|
|
break
|
|
|
|
return product
|
|
|
|
def is_pokemon_tcg_product(self, product):
|
|
"""Check if product is a Pokemon TCG card pack or tin"""
|
|
if not product.get('title'):
|
|
return False
|
|
|
|
title = product['title'].lower()
|
|
pokemon_keywords = ['pokemon', 'tcg', 'trading card', 'cards']
|
|
tcg_keywords = ['pack', 'tin', 'box', 'booster', 'collection']
|
|
|
|
has_pokemon = any(keyword in title for keyword in pokemon_keywords)
|
|
has_tcg = any(keyword in title for keyword in tcg_keywords)
|
|
|
|
return has_pokemon and has_tcg
|
|
|
|
def scrape_products(self):
|
|
"""Main scraping method"""
|
|
print(f"Starting scrape of: {self.search_url}")
|
|
|
|
# Get search results page
|
|
html = self.get_page_content(self.search_url)
|
|
if not html:
|
|
print("Failed to get search results page")
|
|
return []
|
|
|
|
# Extract product links
|
|
product_links = self.extract_product_links(html)
|
|
print(f"Found {len(product_links)} potential product links")
|
|
|
|
if not product_links:
|
|
print("No product links found. The page structure may have changed.")
|
|
print("First 1000 chars of page:")
|
|
print(html[:1000])
|
|
return []
|
|
|
|
# Scrape each product page
|
|
for i, link in enumerate(product_links):
|
|
print(f"Scraping product {i+1}/{len(product_links)}: {link}")
|
|
|
|
product_html = self.get_page_content(link)
|
|
if not product_html:
|
|
continue
|
|
|
|
product = self.extract_product_info(link, product_html)
|
|
|
|
# Filter for Pokemon TCG products
|
|
if self.is_pokemon_tcg_product(product):
|
|
print(f"Found Pokemon TCG product: {product.get('title', 'Unknown')}")
|
|
self.products.append(product)
|
|
else:
|
|
print(f"Skipping non-TCG product: {product.get('title', 'Unknown')}")
|
|
|
|
# Be respectful to the server
|
|
time.sleep(1)
|
|
|
|
return self.products
|
|
|
|
def save_to_json(self, filename=None):
|
|
"""Save scraped products to JSON file"""
|
|
if not filename:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"pokemon_tcg_products_{timestamp}.json"
|
|
|
|
with open(filename, 'w') as f:
|
|
json.dump(self.products, f, indent=2)
|
|
|
|
print(f"Saved {len(self.products)} products to {filename}")
|
|
return filename
|
|
|
|
def main():
|
|
scraper = PokemonTCGScraper()
|
|
products = scraper.scrape_products()
|
|
|
|
if products:
|
|
filename = scraper.save_to_json()
|
|
print(f"\nScraping completed successfully!")
|
|
print(f"Found {len(products)} Pokemon TCG products")
|
|
print(f"Data saved to: {filename}")
|
|
else:
|
|
print("\nNo products found. This could be due to:")
|
|
print("1. No Pokemon TCG products in stock")
|
|
print("2. Website structure changes")
|
|
print("3. Anti-bot protection")
|
|
|
|
if __name__ == "__main__":
|
|
main() |