-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcrawl.py
More file actions
184 lines (159 loc) · 7 KB
/
Copy pathcrawl.py
File metadata and controls
184 lines (159 loc) · 7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
from urllib.parse import urlsplit, urljoin, urlparse
from bs4 import BeautifulSoup
import asyncio
import aiohttp
def normalize_url(url):
split_url = urlsplit(url.lower().strip('/'))
return f"{split_url.netloc}{split_url.path}"
def get_h1_from_html(html):
soup = BeautifulSoup(html, 'html.parser')
h1_tag = soup.find('h1')
if h1_tag:
return h1_tag.get_text()
return ""
def get_first_paragraph_from_html(html):
soup = BeautifulSoup(html, 'html.parser')
p_tag = soup.find('p')
if p_tag:
return p_tag.get_text()
else:
return ""
def get_urls_from_html(html, base_url):
urls = []
soup = BeautifulSoup(html, 'html.parser')
url_tags = soup.find_all('a', href=True)
for tag in url_tags:
urls.append(urljoin(base_url, tag.get('href')))
return urls
def get_images_from_html(html, base_url):
images = []
soup = BeautifulSoup(html, 'html.parser')
image_tags = soup.find_all('img', src=True)
for tag in image_tags:
images.append(urljoin(base_url, tag.get('src')))
return images
def get_base_url(url):
parsed = urlparse(url)
return f"{parsed.scheme}://{parsed.netloc}/"
def extract_page_data(html, page_url):
"""_summary_
Args:
html (String):
page_url (String): is the absolute URL of the page (used for converting relative URLs)
Returns:
data (Dict): which contains the following keys; url, heading, first_paragraph, outgoing_links, image_urls
and the data from the html tags as values.
"""
base_url = get_base_url(page_url)
data = {
'url': page_url,
'heading': get_h1_from_html(html),
'first_paragraph': get_first_paragraph_from_html(html),
'outgoing_links': get_urls_from_html(html, base_url),
'image_urls': get_images_from_html(html, base_url)
}
return data
"""_summary_
This class manages the shared state for my crawler
"""
class AsyncCrawler:
def __init__(self, base_url, max_concurrency, max_pages):
"""_summary_
Args:
base_url (str): The starting URL
max_concurrency (int): Allows us to limit the amount of requests made at once
"""
self.base_url = base_url
self.max_concurrency = max_concurrency
self.max_pages = max_pages
self.base_domain = urlparse(self.base_url).netloc # base_domain (str): The domain name
self.page_data = {} # page_data (dict):A dictionary of page data keyed by each normalised page URL
self.lock = asyncio.Lock() # lock (asyncio.Lock): an asyncio.lock to safely update page data
self.semaphore = asyncio.Semaphore(self.max_concurrency) # semaphore (asyncio.Semaphore): Passes it the value of max concurrency
self.session = None # session (None -> aiohttp.ClientSession): Allows us to make HTTP requests
self.should_stop = False
self.all_tasks = set()
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc, tb):
await self.session.close()
async def add_page_visit(self, normalized_url):
async with self.lock:
if self.should_stop:
return False
if len(self.page_data) >= self.max_pages:
self.should_stop = True
print("Reached maximum number of pages to crawl.")
for task in self.all_tasks:
task.cancel()
return False
if normalized_url in self.page_data:
return False
self.page_data[normalized_url] = None
return True
async def get_html(self, url):
try:
async with self.session.get(url, headers={"User-Agent": "BootCrawler/1.0"}) as response:
print(response.status)
if response.status >= 400:
# raise Exception("System Error: Status code is greater than 400")
print(f"Error: HTTP {response.status} for {url}")
return None
elif 'text/html' not in response.headers.get('content-type', ""):
# content_type = response.headers.get('content-type', "")
# print(content_type)
# raise Exception(f"System Error: is a non html response {content_type}")
print(f"Error: Non-HTML content for {url}")
return None
else:
response_text = await response.text()
print(response_text)
return response_text
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
async def crawl_page(self, current_url):
if self.should_stop:
return
if urlparse(current_url).netloc != self.base_domain:
return
normalized_url = normalize_url(current_url)
# Call and await your new add_page_visit method, if it is not a new page return early
is_new = await self.add_page_visit(normalized_url)
if not is_new:
return
# Use async with self.semaphore to limit the number of concurrent requests
async with self.semaphore:
# Fetch the page's HTML and extract page data using extract_page_data
html = await self.get_html(current_url)
if html is None:
return
new_data = extract_page_data(html, normalized_url)
# Add the page data to page_data dictionary using the normalized URL as the key (use the lock to do this safely)
async with self.lock:
self.page_data[normalized_url] = new_data
# Extract new URLs from the page
new_urls = get_urls_from_html(html, self.base_url)
# For each URL, create a task to crawl it using asyncio.create_task
background_tasks = set()
for url in new_urls:
task = asyncio.create_task(self.crawl_page(url))
background_tasks.add(task)
self.all_tasks.add(task)
# Wait for all tasks with await asyncio.gather(*tasks)
try:
await asyncio.gather(*background_tasks, return_exceptions=True)
finally:
for task in background_tasks:
self.all_tasks.discard(task)
# Add an async crawl method to AsyncCrawler that starts by crawling base_url and returns the page_data dictionary
async def crawl(self, max_con, max_pages):
await self.crawl_page(self.base_url)
self.max_concurrency = max_con
self.max_pages = max_pages
return self.page_data
async def crawl_site_async(base_url, max_con, max_pages):
async with AsyncCrawler(base_url, max_con, max_pages) as crawler:
crawled = await crawler.crawl(max_con, max_pages)
return crawled