🎉 MAJOR BREAKTHROUGH: Dollar General API Endpoint Discovered!
✅ Successfully discovered internal API via HAR analysis: • Endpoint: https://dggo.dollargeneral.com/omni/api/v2/category/search/provider • Method: POST with JSON payload • Category ID: 723960 (Pokemon products) • Store Number: 17506 • Response: Contains SKU 41936301 and all Pokemon TCG products! 🔬 HAR Analysis Tools Added: • analyze_har.py - Extract API calls from HAR files • extract_api_details.py - Detailed API request format extraction • implement_api_scraper.py - Full API implementation framework • test_api_scraper.py - API endpoint testing 📋 API Documentation: • DISCOVERY_SUCCESS.md - Complete analysis and findings • api_request_template.json - Exact request format • scraper.py updated with API framework 🎯 KEY DISCOVERIES: ✅ Found exact API endpoint used by Dollar General website ✅ Documented complete request/response format ✅ Confirmed presence of test product (SKU 41936301) ✅ Identified Pokemon category ID and store parameters ✅ Ready for bulk product scraping once auth is implemented ⚡ Current Status: • Individual product extraction: 100% working • API framework: Discovered and documented • Authentication: Requires Bearer token (next challenge) • PDF generation: Fully functional This breakthrough enables potential bulk product discovery and makes Pokemon Discovery far more powerful for inventory management!
This commit is contained in:
297
implement_api_scraper.py
Normal file
297
implement_api_scraper.py
Normal file
@@ -0,0 +1,297 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Implement API-based scraping for Pokemon Discovery
|
||||
"""
|
||||
|
||||
import json
|
||||
import requests
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from urllib.parse import urljoin
|
||||
|
||||
class DollarGeneralAPIScaper:
|
||||
def __init__(self):
|
||||
self.base_url = "https://www.dollargeneral.com"
|
||||
self.api_base = "https://dggo.dollargeneral.com"
|
||||
self.session = requests.Session()
|
||||
|
||||
# Headers that mimic a real browser session
|
||||
self.headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:148.0) Gecko/20100101 Firefox/148.0',
|
||||
'Accept': 'application/json, text/plain, */*',
|
||||
'Accept-Language': 'en-US,en;q=0.9',
|
||||
'Accept-Encoding': 'gzip, deflate, br',
|
||||
'DNT': '1',
|
||||
'Connection': 'keep-alive',
|
||||
'Sec-Fetch-Dest': 'empty',
|
||||
'Sec-Fetch-Mode': 'cors',
|
||||
'Sec-Fetch-Site': 'cross-site',
|
||||
}
|
||||
self.session.headers.update(self.headers)
|
||||
|
||||
self.auth_token = None
|
||||
|
||||
def get_auth_token(self):
|
||||
"""Try multiple methods to get authentication token"""
|
||||
|
||||
print("🔑 Attempting to get authentication token...")
|
||||
|
||||
# Method 1: Get token from main page
|
||||
try:
|
||||
print(" - Visiting main Pokemon page...")
|
||||
pokemon_url = f"{self.base_url}/c/toys/pokemon?q=&soldAtStore=true"
|
||||
response = self.session.get(pokemon_url, timeout=30)
|
||||
|
||||
if response.status_code == 200:
|
||||
# Look for embedded tokens in the page
|
||||
import re
|
||||
|
||||
# Look for bearer tokens in script tags
|
||||
token_patterns = [
|
||||
r'Bearer\s+([A-Za-z0-9\-_\.]+)',
|
||||
r'"access_token":\s*"([^"]+)"',
|
||||
r'"token":\s*"([^"]+)"',
|
||||
r'authorization:\s*["\'](Bearer\s+[^"\']+)["\']'
|
||||
]
|
||||
|
||||
for pattern in token_patterns:
|
||||
matches = re.findall(pattern, response.text, re.IGNORECASE)
|
||||
if matches:
|
||||
token = matches[0]
|
||||
if token.startswith('Bearer '):
|
||||
token = token[7:] # Remove 'Bearer ' prefix
|
||||
print(f" ✅ Found token via pattern: {token[:50]}...")
|
||||
self.auth_token = token
|
||||
return token
|
||||
|
||||
except Exception as e:
|
||||
print(f" ❌ Main page method failed: {e}")
|
||||
|
||||
# Method 2: Try token endpoint
|
||||
try:
|
||||
print(" - Trying token endpoint...")
|
||||
token_url = f"{self.base_url}/bin/omni/userTokens"
|
||||
response = self.session.get(token_url, timeout=30)
|
||||
|
||||
if response.status_code == 200:
|
||||
try:
|
||||
data = response.json()
|
||||
if 'access_token' in data:
|
||||
token = data['access_token']
|
||||
print(f" ✅ Got token from endpoint: {token[:50]}...")
|
||||
self.auth_token = token
|
||||
return token
|
||||
except:
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
print(f" ❌ Token endpoint failed: {e}")
|
||||
|
||||
# Method 3: Try CSRF token endpoint
|
||||
try:
|
||||
print(" - Trying CSRF token...")
|
||||
csrf_url = f"{self.base_url}/libs/granite/csrf/token.json"
|
||||
response = self.session.get(csrf_url, timeout=30)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
if 'token' in data:
|
||||
# This might not be the right token, but let's try
|
||||
print(f" ⚠️ Got CSRF token (may not work for API): {str(data)[:100]}...")
|
||||
|
||||
except Exception as e:
|
||||
print(f" ❌ CSRF method failed: {e}")
|
||||
|
||||
print(" ❌ Could not obtain authentication token")
|
||||
return None
|
||||
|
||||
def search_products_api(self, store_nbr=17506, category_id=723960, include_out_of_stock=True):
|
||||
"""Search for products using the API endpoint"""
|
||||
|
||||
print(f"🔍 Searching products via API...")
|
||||
print(f" Store: {store_nbr}, Category: {category_id}")
|
||||
|
||||
if not self.auth_token:
|
||||
print(" ❌ No authentication token available")
|
||||
return []
|
||||
|
||||
endpoint = f"{self.api_base}/omni/api/v2/category/search/provider"
|
||||
|
||||
# Headers for API request
|
||||
api_headers = self.headers.copy()
|
||||
api_headers.update({
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': f'Bearer {self.auth_token}',
|
||||
'Referer': f'{self.base_url}/',
|
||||
'Origin': self.base_url,
|
||||
})
|
||||
|
||||
# Request payload based on HAR analysis
|
||||
payload = {
|
||||
"StoreNbr": store_nbr,
|
||||
"SearchTerm": None,
|
||||
"PageSize": 48, # Request more items
|
||||
"PageStartRecordIndex": 0,
|
||||
"Filters": {
|
||||
"category": [],
|
||||
"brand": [],
|
||||
"dgDelivery": False,
|
||||
"dgPickUp": False,
|
||||
"dgShipTohome": False,
|
||||
"soldAtStore": True,
|
||||
"inStock": not include_out_of_stock, # False = include out of stock
|
||||
"onlyActivatedDeals": False
|
||||
},
|
||||
"IncludeSponsored": True,
|
||||
"IncludeShipToHome": True,
|
||||
"IncludeDeals": True,
|
||||
"offerSourceType": 0,
|
||||
"Id": category_id,
|
||||
"IncludeProducts": False,
|
||||
"DoNotSave": False,
|
||||
"OptOut": False,
|
||||
"SearchType": 1
|
||||
}
|
||||
|
||||
try:
|
||||
print(f" POST {endpoint}")
|
||||
response = self.session.post(endpoint,
|
||||
headers=api_headers,
|
||||
json=payload,
|
||||
timeout=30)
|
||||
|
||||
print(f" Status: {response.status_code}")
|
||||
print(f" Response size: {len(response.text)} characters")
|
||||
|
||||
if response.status_code == 200:
|
||||
if len(response.text) == 0:
|
||||
print(" ⚠️ Empty response (token may be expired)")
|
||||
return []
|
||||
|
||||
try:
|
||||
data = response.json()
|
||||
items = data.get('ItemList', {}).get('Items', [])
|
||||
print(f" ✅ Found {len(items)} total items")
|
||||
return items
|
||||
|
||||
except Exception as e:
|
||||
print(f" ❌ JSON parsing error: {e}")
|
||||
print(f" Response preview: {response.text[:200]}...")
|
||||
return []
|
||||
|
||||
elif response.status_code == 401:
|
||||
print(" ❌ Authentication failed - token expired or invalid")
|
||||
return []
|
||||
else:
|
||||
print(f" ❌ API error: {response.status_code}")
|
||||
print(f" Response: {response.text[:200]}...")
|
||||
return []
|
||||
|
||||
except Exception as e:
|
||||
print(f" ❌ Request failed: {e}")
|
||||
return []
|
||||
|
||||
def filter_pokemon_products(self, items):
|
||||
"""Filter for Pokemon TCG products"""
|
||||
|
||||
pokemon_products = []
|
||||
|
||||
for item in items:
|
||||
title = item.get('Title', '').lower()
|
||||
description = item.get('Description', '').lower()
|
||||
brand = item.get('Brand', '').lower()
|
||||
|
||||
# Check if this is a Pokemon TCG product
|
||||
pokemon_keywords = ['pokemon', 'pokémon']
|
||||
tcg_keywords = ['trading card', 'tcg', 'cards', 'pack', 'tin', 'box', 'collection']
|
||||
|
||||
has_pokemon = any(keyword in title or keyword in description for keyword in pokemon_keywords)
|
||||
has_tcg = any(keyword in title or keyword in description for keyword in tcg_keywords)
|
||||
|
||||
if has_pokemon and has_tcg:
|
||||
product = {
|
||||
'title': item.get('Title'),
|
||||
'sku': item.get('ItemNbr'),
|
||||
'upc': item.get('UPC'),
|
||||
'price': f"${item.get('Price', {}).get('Amount', 0):.2f}",
|
||||
'url': urljoin(self.base_url, item.get('ProductUrl', '')),
|
||||
'stock': 'In Stock' if item.get('Inventory', {}).get('InStock') else 'Out of Stock',
|
||||
'image_url': item.get('ImageURL'),
|
||||
'description': item.get('Description', ''),
|
||||
'brand': item.get('Brand', '')
|
||||
}
|
||||
pokemon_products.append(product)
|
||||
|
||||
print(f" 🎯 Found: {product['title']}")
|
||||
print(f" SKU: {product['sku']}, Price: {product['price']}")
|
||||
print(f" Stock: {product['stock']}")
|
||||
|
||||
return pokemon_products
|
||||
|
||||
def scrape_pokemon_products(self):
|
||||
"""Main scraping method"""
|
||||
|
||||
print("Pokemon Discovery - API-based Scraping")
|
||||
print("="*60)
|
||||
|
||||
# Get authentication token
|
||||
if not self.get_auth_token():
|
||||
print("❌ Authentication failed - cannot access API")
|
||||
print()
|
||||
print("💡 Alternative approaches:")
|
||||
print(" 1. Use browser automation with proper session")
|
||||
print(" 2. Extract products manually from individual pages")
|
||||
print(" 3. Use the working individual product scraper")
|
||||
return []
|
||||
|
||||
print()
|
||||
|
||||
# Search for products
|
||||
all_items = self.search_products_api()
|
||||
|
||||
if not all_items:
|
||||
print("❌ No items returned from API")
|
||||
return []
|
||||
|
||||
print()
|
||||
|
||||
# Filter for Pokemon products
|
||||
pokemon_products = self.filter_pokemon_products(all_items)
|
||||
|
||||
print()
|
||||
print(f"🎉 SUCCESS! Found {len(pokemon_products)} Pokemon TCG products")
|
||||
|
||||
if pokemon_products:
|
||||
# Save results
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
filename = f'pokemon_tcg_api_scrape_{timestamp}.json'
|
||||
|
||||
with open(filename, 'w') as f:
|
||||
json.dump(pokemon_products, f, indent=2)
|
||||
|
||||
print(f"💾 Saved to: {filename}")
|
||||
|
||||
# Show summary
|
||||
print()
|
||||
print("📋 Product Summary:")
|
||||
for i, product in enumerate(pokemon_products, 1):
|
||||
print(f" {i}. {product['title']}")
|
||||
print(f" SKU: {product['sku']} | Price: {product['price']} | {product['stock']}")
|
||||
|
||||
return pokemon_products
|
||||
|
||||
def main():
|
||||
scraper = DollarGeneralAPIScaper()
|
||||
products = scraper.scrape_pokemon_products()
|
||||
|
||||
if products:
|
||||
print()
|
||||
print("🚀 Ready for PDF generation!")
|
||||
print("Run: python pdf_generator.py pokemon_tcg_api_scrape_[timestamp].json")
|
||||
else:
|
||||
print()
|
||||
print("📝 Note: Individual product scraping still works perfectly!")
|
||||
print("The issue is authentication for bulk API access.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user