This repository was archived by the owner on Dec 18, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathingestor.py
More file actions
164 lines (118 loc) · 5.34 KB
/
ingestor.py
File metadata and controls
164 lines (118 loc) · 5.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""
cardStatX - Football Card Data Ingestion System
Author: Samuel Stockstrom
License: CC BY-NC 4.0 (https://creativecommons.org/licenses/by-nc/4.0/)
This work is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License.
"""
from logging_setup import setup_logging
from database import CardDatabase
from typing import Optional
import constants
import asyncio
import aiohttp
import logging
logger = logging.getLogger('async_ingestor')
class AsyncCardIngestor:
def __init__(self, db: CardDatabase):
self.db = db
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def search_ebay(self, keyword: str) -> Optional[dict]:
"""Search eBay API asynchronously"""
url = f"https://api.ebay.com/buy/browse/v1/item_summary/search?q={keyword}&category_ids=261328&limit=200"
headers = {
'X-EBAY-C-MARKETPLACE-ID': 'EBAY_US',
'Authorization': f'Bearer {constants.OAUTH_TOKEN}'
}
try:
async with self.session.get(url, headers=headers) as response:
if response.status == 200:
data = await response.json()
if int(data.get('total', 0)) == 0:
logger.warning(f"Search for '{keyword}' returned 0 results")
return data
else:
logger.error(f"eBay search failed for '{keyword}': {response.status}")
return None
except Exception as e:
logger.error(f"Error searching eBay for '{keyword}': {e}")
return None
def filter_items(self, data: dict) -> Optional[dict]:
"""Filter eBay search results"""
if not data or int(data.get('total', 0)) == 0:
return None
items = {}
found_items = data.get('itemSummaries', [])
for item in found_items:
item_id = item['itemId']
title = item['title']
if "|0" not in item_id:
continue
if item['price']['currency'] != 'USD':
continue
try:
condition = item['condition'] + ':' + item['conditionId']
except KeyError:
continue
price = float(item['price']['value'])
creation_date = item['itemCreationDate']
items[item_id] = (title, condition, price, creation_date)
return items if items else None
async def process_card(self, card_id: str, card_name: str) -> int:
"""Process a single card - search eBay and store results"""
try:
search_data = await self.search_ebay(card_name)
filtered_items = self.filter_items(search_data)
if not filtered_items:
return 0
listings_added = 0
for listing_id, (title, condition, price, listing_date) in filtered_items.items():
success = await self.db.add_listing(
listing_id, card_id, title, condition, price, listing_date
)
if success:
listings_added += 1
logger.info(f"Processed {card_name} ({card_id}) - added {listings_added} listings")
return listings_added
except Exception as e:
logger.error(f"Error processing card {card_name}: {e}")
return 0
async def process_all_cards(self, concurrency_limit: int = 5):
"""Process all cards with controlled concurrency"""
cards = await self.db.get_all_cards()
if not cards:
logger.warning("No cards found in database")
return
logger.info(f"Starting to process {len(cards)} cards with concurrency limit {concurrency_limit}")
semaphore = asyncio.Semaphore(concurrency_limit)
async def process_with_semaphore(card_id: str, card_name: str):
async with semaphore:
result = await self.process_card(card_id, card_name)
await asyncio.sleep(1)
return result
tasks = [
process_with_semaphore(card_id, card_name)
for card_id, card_name in cards.items()
]
total_listings = 0
completed = 0
for task in asyncio.as_completed(tasks):
listings_count = await task
total_listings += listings_count
completed += 1
if completed % 10 == 0:
logger.info(f"Progress: {completed}/{len(cards)} cards processed")
logger.info(f"Completed processing all cards - total {total_listings} listings added")
async def main():
setup_logging()
db = CardDatabase()
await db.initialize()
async with AsyncCardIngestor(db) as ingestor:
await ingestor.process_all_cards(concurrency_limit=3)
if __name__ == "__main__":
asyncio.run(main())