#!/usr/bin/env python3 """ Implement API-based scraping for Pokemon Discovery """ import json import requests import sys from datetime import datetime from urllib.parse import urljoin class DollarGeneralAPIScaper: def __init__(self): self.base_url = "https://www.dollargeneral.com" self.api_base = "https://dggo.dollargeneral.com" self.session = requests.Session() # Headers that mimic a real browser session self.headers = { 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:148.0) Gecko/20100101 Firefox/148.0', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'DNT': '1', 'Connection': 'keep-alive', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'cross-site', } self.session.headers.update(self.headers) self.auth_token = None def get_auth_token(self): """Try multiple methods to get authentication token""" print("πŸ”‘ Attempting to get authentication token...") # Method 1: Get token from main page try: print(" - Visiting main Pokemon page...") pokemon_url = f"{self.base_url}/c/toys/pokemon?q=&soldAtStore=true" response = self.session.get(pokemon_url, timeout=30) if response.status_code == 200: # Look for embedded tokens in the page import re # Look for bearer tokens in script tags token_patterns = [ r'Bearer\s+([A-Za-z0-9\-_\.]+)', r'"access_token":\s*"([^"]+)"', r'"token":\s*"([^"]+)"', r'authorization:\s*["\'](Bearer\s+[^"\']+)["\']' ] for pattern in token_patterns: matches = re.findall(pattern, response.text, re.IGNORECASE) if matches: token = matches[0] if token.startswith('Bearer '): token = token[7:] # Remove 'Bearer ' prefix print(f" βœ… Found token via pattern: {token[:50]}...") self.auth_token = token return token except Exception as e: print(f" ❌ Main page method failed: {e}") # Method 2: Try token endpoint try: print(" - Trying token endpoint...") token_url = f"{self.base_url}/bin/omni/userTokens" response = self.session.get(token_url, timeout=30) if response.status_code == 200: try: data = response.json() if 'access_token' in data: token = data['access_token'] print(f" βœ… Got token from endpoint: {token[:50]}...") self.auth_token = token return token except: pass except Exception as e: print(f" ❌ Token endpoint failed: {e}") # Method 3: Try CSRF token endpoint try: print(" - Trying CSRF token...") csrf_url = f"{self.base_url}/libs/granite/csrf/token.json" response = self.session.get(csrf_url, timeout=30) if response.status_code == 200: data = response.json() if 'token' in data: # This might not be the right token, but let's try print(f" ⚠️ Got CSRF token (may not work for API): {str(data)[:100]}...") except Exception as e: print(f" ❌ CSRF method failed: {e}") print(" ❌ Could not obtain authentication token") return None def search_products_api(self, store_nbr=17506, category_id=723960, include_out_of_stock=True): """Search for products using the API endpoint""" print(f"πŸ” Searching products via API...") print(f" Store: {store_nbr}, Category: {category_id}") if not self.auth_token: print(" ❌ No authentication token available") return [] endpoint = f"{self.api_base}/omni/api/v2/category/search/provider" # Headers for API request api_headers = self.headers.copy() api_headers.update({ 'Content-Type': 'application/json', 'Authorization': f'Bearer {self.auth_token}', 'Referer': f'{self.base_url}/', 'Origin': self.base_url, }) # Request payload based on HAR analysis payload = { "StoreNbr": store_nbr, "SearchTerm": None, "PageSize": 48, # Request more items "PageStartRecordIndex": 0, "Filters": { "category": [], "brand": [], "dgDelivery": False, "dgPickUp": False, "dgShipTohome": False, "soldAtStore": True, "inStock": not include_out_of_stock, # False = include out of stock "onlyActivatedDeals": False }, "IncludeSponsored": True, "IncludeShipToHome": True, "IncludeDeals": True, "offerSourceType": 0, "Id": category_id, "IncludeProducts": False, "DoNotSave": False, "OptOut": False, "SearchType": 1 } try: print(f" POST {endpoint}") response = self.session.post(endpoint, headers=api_headers, json=payload, timeout=30) print(f" Status: {response.status_code}") print(f" Response size: {len(response.text)} characters") if response.status_code == 200: if len(response.text) == 0: print(" ⚠️ Empty response (token may be expired)") return [] try: data = response.json() items = data.get('ItemList', {}).get('Items', []) print(f" βœ… Found {len(items)} total items") return items except Exception as e: print(f" ❌ JSON parsing error: {e}") print(f" Response preview: {response.text[:200]}...") return [] elif response.status_code == 401: print(" ❌ Authentication failed - token expired or invalid") return [] else: print(f" ❌ API error: {response.status_code}") print(f" Response: {response.text[:200]}...") return [] except Exception as e: print(f" ❌ Request failed: {e}") return [] def filter_pokemon_products(self, items): """Filter for Pokemon TCG products""" pokemon_products = [] for item in items: title = item.get('Title', '').lower() description = item.get('Description', '').lower() brand = item.get('Brand', '').lower() # Check if this is a Pokemon TCG product pokemon_keywords = ['pokemon', 'pokΓ©mon'] tcg_keywords = ['trading card', 'tcg', 'cards', 'pack', 'tin', 'box', 'collection'] has_pokemon = any(keyword in title or keyword in description for keyword in pokemon_keywords) has_tcg = any(keyword in title or keyword in description for keyword in tcg_keywords) if has_pokemon and has_tcg: product = { 'title': item.get('Title'), 'sku': item.get('ItemNbr'), 'upc': item.get('UPC'), 'price': f"${item.get('Price', {}).get('Amount', 0):.2f}", 'url': urljoin(self.base_url, item.get('ProductUrl', '')), 'stock': 'In Stock' if item.get('Inventory', {}).get('InStock') else 'Out of Stock', 'image_url': item.get('ImageURL'), 'description': item.get('Description', ''), 'brand': item.get('Brand', '') } pokemon_products.append(product) print(f" 🎯 Found: {product['title']}") print(f" SKU: {product['sku']}, Price: {product['price']}") print(f" Stock: {product['stock']}") return pokemon_products def scrape_pokemon_products(self): """Main scraping method""" print("Pokemon Discovery - API-based Scraping") print("="*60) # Get authentication token if not self.get_auth_token(): print("❌ Authentication failed - cannot access API") print() print("πŸ’‘ Alternative approaches:") print(" 1. Use browser automation with proper session") print(" 2. Extract products manually from individual pages") print(" 3. Use the working individual product scraper") return [] print() # Search for products all_items = self.search_products_api() if not all_items: print("❌ No items returned from API") return [] print() # Filter for Pokemon products pokemon_products = self.filter_pokemon_products(all_items) print() print(f"πŸŽ‰ SUCCESS! Found {len(pokemon_products)} Pokemon TCG products") if pokemon_products: # Save results timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'pokemon_tcg_api_scrape_{timestamp}.json' with open(filename, 'w') as f: json.dump(pokemon_products, f, indent=2) print(f"πŸ’Ύ Saved to: {filename}") # Show summary print() print("πŸ“‹ Product Summary:") for i, product in enumerate(pokemon_products, 1): print(f" {i}. {product['title']}") print(f" SKU: {product['sku']} | Price: {product['price']} | {product['stock']}") return pokemon_products def main(): scraper = DollarGeneralAPIScaper() products = scraper.scrape_pokemon_products() if products: print() print("πŸš€ Ready for PDF generation!") print("Run: python pdf_generator.py pokemon_tcg_api_scrape_[timestamp].json") else: print() print("πŸ“ Note: Individual product scraping still works perfectly!") print("The issue is authentication for bulk API access.") if __name__ == "__main__": main()