-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvideo_scraper.py
More file actions
475 lines (397 loc) · 19.6 KB
/
video_scraper.py
File metadata and controls
475 lines (397 loc) · 19.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
import requests
from bs4 import BeautifulSoup
import os
import re
from urllib.parse import urljoin, urlparse, unquote
import time
from pathlib import Path
from datetime import datetime
import json
class VideoScraper:
def __init__(self, max_depth=2):
self.max_depth = max_depth
self.visited_urls = set()
self.found_videos = []
# Create unique output directory with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.output_dir = Path(f"video_links_{timestamp}")
self.output_dir.mkdir(parents=True, exist_ok=True)
self.html_dir = self.output_dir / "html_source"
self.html_dir.mkdir(parents=True, exist_ok=True)
self.file_counter = 0
# Headers to mimic a real browser
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
# Video extensions to search for
self.video_extensions = {'.mp4', '.mov', '.avi', '.mkv', '.wmv', '.flv', '.m3u8', '.webm', '.mpg', '.mpeg'}
# Common video URL patterns - expanded for better detection
self.video_patterns = [
# Direct video file URLs
r'https?://[^\s<>"{}|\\^`\[\]]+\.(?:mp4|mov|avi|mkv|wmv|flv|m3u8|webm|mpg|mpeg|3gp|ogv)',
# Video URLs with query parameters
r'https?://[^\s<>"{}|\\^`\[\]]+\.(?:mp4|mov|avi|mkv|wmv|flv|m3u8|webm)\?[^\s<>"{}|\\^`\[\]]*',
# Common video paths
r'https?://[^\s<>"{}|\\^`\[\]]+/video/[^\s<>"{}|\\^`\[\]]+',
r'https?://[^\s<>"{}|\\^`\[\]]+/stream/[^\s<>"{}|\\^`\[\]]+',
r'https?://[^\s<>"{}|\\^`\[\]]+/media/[^\s<>"{}|\\^`\[\]]+\.(?:mp4|mov|avi|mkv|wmv|flv|m3u8|webm)',
r'https?://[^\s<>"{}|\\^`\[\]]+/content/[^\s<>"{}|\\^`\[\]]+\.(?:mp4|mov|avi|mkv|wmv|flv|m3u8|webm)',
# M3U8 streaming
r'https?://[^\s<>"{}|\\^`\[\]]+\.m3u8[^\s<>"{}|\\^`\[\]]*',
# Blob URLs
r'blob:https?://[^\s<>"{}|\\^`\[\]]+',
# CDN patterns
r'https?://[^\s<>"{}|\\^`\[\]]+\.cloudfront\.net/[^\s<>"{}|\\^`\[\]]+\.(?:mp4|mov|avi|mkv|wmv|flv|m3u8|webm)',
r'https?://[^\s<>"{}|\\^`\[\]]+\.amazonaws\.com/[^\s<>"{}|\\^`\[\]]+\.(?:mp4|mov|avi|mkv|wmv|flv|m3u8|webm)',
]
def get_user_urls(self):
"""Get URLs from user input"""
urls = []
print("=" * 60)
print("Video Link Scraper")
print("=" * 60)
print("\nThis tool will search for video files and streaming links:")
print("Supported formats: .MP4, .MOV, .AVI, .MKV, .WMV, .FLV, .M3U8, etc.")
print("\nEnter URLs to scan (one per line).")
print("Press Enter twice when done, or type 'file' to load from Sources.txt\n")
while True:
url = input("URL: ").strip()
if url.lower() == 'file':
# Load from file
return self.read_sources_file()
if not url:
if urls:
break
else:
print("Please enter at least one URL.")
continue
# Clean and validate URL
if not url.startswith(('http://', 'https://')):
if re.match(r'^[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', url):
url = 'https://' + url
else:
print(f"Invalid URL format: {url}")
continue
urls.append(url)
print(f"Added: {url}")
return urls
def read_sources_file(self):
"""Read URLs from the sources file"""
sources_file = "E:\\Users\\Admin\\OneDrive\\Desktop\\Sources.txt"
urls = []
try:
with open(sources_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
# Extract URL from markdown-style links [text](url)
markdown_link_pattern = r'\[.*?\]\((https?://[^\)]+)\)'
markdown_match = re.search(markdown_link_pattern, line)
if markdown_match:
url = markdown_match.group(1)
urls.append(url)
elif line.startswith(('http://', 'https://')):
urls.append(line)
elif re.match(r'^[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', line):
urls.append('https://' + line)
except FileNotFoundError:
print(f"Sources file not found: {sources_file}")
return urls
def clean_filename(self, text):
"""Clean text to be used as filename"""
# Remove or replace invalid filename characters
text = re.sub(r'[<>:"/\\|?*]', '_', text)
text = re.sub(r'\s+', '_', text)
text = text[:100] # Limit length
return text
def save_html_source(self, url, html_content, depth):
"""Save the raw HTML source to a text file"""
try:
# Generate filename based on URL
parsed_url = urlparse(url)
domain = parsed_url.netloc.replace('.', '_')
path_part = self.clean_filename(parsed_url.path.replace('/', '_'))
self.file_counter += 1
filename = f"{self.file_counter:04d}_D{depth}_{domain}_{path_part}.html"
filepath = self.html_dir / filename
# Save HTML content
with open(filepath, 'w', encoding='utf-8') as f:
f.write(f"<!-- Source URL: {url} -->\n")
f.write(f"<!-- Scraped on: {time.strftime('%Y-%m-%d %H:%M:%S')} -->\n")
f.write(f"<!-- Depth: {depth} -->\n\n")
f.write(html_content)
print(f"{' ' * depth}Saved HTML source: {filename}")
except Exception as e:
print(f"{' ' * depth}Error saving HTML source: {str(e)}")
def extract_video_urls_from_html(self, html_content, base_url):
"""Extract video URLs from HTML content"""
video_urls = set()
# Search in HTML attributes
soup = BeautifulSoup(html_content, 'html.parser')
# Look for video tags
for video in soup.find_all(['video', 'source']):
# Check multiple attributes
for attr in ['src', 'data-src', 'data-source', 'data-video-src']:
src = video.get(attr)
if src:
video_urls.add(urljoin(base_url, src))
# Look for links to video files
for link in soup.find_all('a', href=True):
href = link['href']
if any(ext in href.lower() for ext in self.video_extensions):
video_urls.add(urljoin(base_url, href))
# Look for iframes (might contain video players)
for iframe in soup.find_all('iframe'):
src = iframe.get('src')
if src and any(provider in src.lower() for provider in ['youtube', 'vimeo', 'dailymotion', 'video', 'player', 'embed']):
video_urls.add(urljoin(base_url, src))
# Search in JavaScript content
scripts = soup.find_all('script')
for script in scripts:
if script.string:
# Look for video URLs in JavaScript
for pattern in self.video_patterns:
matches = re.findall(pattern, script.string, re.IGNORECASE)
for match in matches:
video_urls.add(match)
# Look for JSON objects containing video URLs
json_pattern = r'\{[^{}]*"(?:url|src|source|video|file|stream)"[^{}]*:[^{}]*"([^"]+\.(?:mp4|mov|avi|mkv|wmv|flv|m3u8|webm)[^"]*)"[^{}]*\}'
json_matches = re.findall(json_pattern, script.string, re.IGNORECASE)
for match in json_matches:
video_urls.add(urljoin(base_url, match))
# Search in all data attributes
for element in soup.find_all(True): # All elements
for attr, value in element.attrs.items():
if isinstance(value, str):
# Check if attribute name suggests video
if any(keyword in attr.lower() for keyword in ['video', 'media', 'src', 'source', 'file', 'url']):
if any(ext in value.lower() for ext in self.video_extensions):
video_urls.add(urljoin(base_url, value))
# Also check attribute values for video URLs
for pattern in self.video_patterns:
matches = re.findall(pattern, value, re.IGNORECASE)
for match in matches:
video_urls.add(match)
# Look for meta tags with video content
for meta in soup.find_all('meta'):
content = meta.get('content', '')
if any(ext in content.lower() for ext in self.video_extensions):
video_urls.add(urljoin(base_url, content))
# Search in style attributes for background videos
for element in soup.find_all(style=True):
style = element['style']
for pattern in self.video_patterns:
matches = re.findall(pattern, style, re.IGNORECASE)
for match in matches:
video_urls.add(match)
return video_urls
def extract_video_urls_from_text(self, text):
"""Extract video URLs from plain text using regex"""
video_urls = set()
# Apply each video pattern
for pattern in self.video_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
video_urls.update(matches)
# Look for URLs in quotes (single or double)
quoted_url_pattern = r'["\']([^"\']+\.(?:mp4|mov|avi|mkv|wmv|flv|m3u8|webm)[^"\']*)["\']'
quoted_matches = re.findall(quoted_url_pattern, text, re.IGNORECASE)
video_urls.update(quoted_matches)
# Look for URLs in JSON-style strings
json_url_pattern = r'["\']\s*:\s*["\']([^"\']+\.(?:mp4|mov|avi|mkv|wmv|flv|m3u8|webm)[^"\']*)["\']'
json_matches = re.findall(json_url_pattern, text, re.IGNORECASE)
video_urls.update(json_matches)
# Look for base64 encoded URLs (sometimes used for video sources)
base64_pattern = r'data:video/[^;]+;base64,[A-Za-z0-9+/=]+'
base64_matches = re.findall(base64_pattern, text)
video_urls.update(base64_matches)
# Also look for any URL ending with video extensions
url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+'
all_urls = re.findall(url_pattern, text)
for url in all_urls:
if any(ext in url.lower() for ext in self.video_extensions):
video_urls.add(url)
# Look for relative paths to video files
relative_pattern = r'["\']([^"\']*\.(?:mp4|mov|avi|mkv|wmv|flv|m3u8|webm)[^"\']*)["\']'
relative_matches = re.findall(relative_pattern, text, re.IGNORECASE)
for match in relative_matches:
if not match.startswith(('http://', 'https://', 'data:')):
video_urls.add(match) # Will be converted to absolute URL later
return video_urls
def scrape_url(self, url, depth=0):
"""Scrape a single URL for video links"""
try:
# Skip if already visited
if url in self.visited_urls:
return None
print(f"{' ' * depth}Scanning: {url}")
self.visited_urls.add(url)
response = requests.get(url, headers=self.headers, timeout=30)
response.raise_for_status()
# Get the content
content = response.text
# Save HTML source
self.save_html_source(url, content, depth)
# Extract video URLs from HTML
video_urls = self.extract_video_urls_from_html(content, url)
# Also extract from raw text (catches things BeautifulSoup might miss)
text_videos = self.extract_video_urls_from_text(content)
video_urls.update(text_videos)
# Clean and validate URLs
cleaned_urls = set()
for video_url in video_urls:
# Skip data URLs and blob URLs for now
if video_url.startswith(('data:', 'blob:')):
cleaned_urls.add(video_url)
continue
# Convert relative URLs to absolute
if not video_url.startswith(('http://', 'https://')):
video_url = urljoin(url, video_url)
# Validate URL
try:
parsed = urlparse(video_url)
if parsed.scheme in ['http', 'https']:
cleaned_urls.add(video_url)
except:
pass
video_urls = cleaned_urls
# Parse page for more links to follow
soup = BeautifulSoup(content, 'html.parser')
page_links = set()
for link in soup.find_all('a', href=True):
href = link['href']
absolute_url = urljoin(url, href)
parsed = urlparse(absolute_url)
if parsed.scheme in ['http', 'https'] and parsed.netloc:
page_links.add(absolute_url)
# Store found videos
for video_url in video_urls:
# Check if we've already found this video
if not any(v['url'] == video_url for v in self.found_videos):
# Determine video type
video_type = 'unknown'
if video_url.startswith('data:'):
video_type = 'base64'
elif video_url.startswith('blob:'):
video_type = 'blob'
elif '.m3u8' in video_url.lower():
video_type = 'streaming'
else:
for ext in self.video_extensions:
if ext in video_url.lower():
video_type = ext[1:] # Remove the dot
break
video_info = {
'url': video_url,
'found_on': url,
'depth': depth,
'type': video_type,
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
}
self.found_videos.append(video_info)
print(f"{' ' * (depth + 1)}Found {video_type} video: {video_url[:100]}...")
return {
'url': url,
'video_count': len(video_urls),
'links': list(page_links),
'success': True
}
except Exception as e:
print(f"{' ' * depth}Error scanning {url}: {str(e)}")
return {
'url': url,
'video_count': 0,
'links': [],
'success': False
}
def scrape_recursive(self, url, depth=0):
"""Recursively scrape URL and its links"""
if depth > self.max_depth:
return
# Scrape the current URL
result = self.scrape_url(url, depth)
if not result:
return
# Be respectful - add delay
time.sleep(1)
# Recursively scrape links if we haven't reached max depth
if depth < self.max_depth and result.get('success') and result.get('links'):
# Limit number of links to follow
links_to_follow = result['links'][:20]
for link in links_to_follow:
# Only follow links from the same domain
if urlparse(link).netloc == urlparse(url).netloc:
self.scrape_recursive(link, depth + 1)
def save_results(self):
"""Save found video URLs to files"""
# Save as JSON
json_file = self.output_dir / 'video_links.json'
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(self.found_videos, f, indent=2)
# Save as text file
txt_file = self.output_dir / 'video_links.txt'
with open(txt_file, 'w', encoding='utf-8') as f:
f.write(f"Video Links Found - {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("=" * 60 + "\n\n")
# Group by source page
by_source = {}
for video in self.found_videos:
source = video['found_on']
if source not in by_source:
by_source[source] = []
by_source[source].append(video['url'])
for source, videos in by_source.items():
f.write(f"\nSource: {source}\n")
f.write("-" * 40 + "\n")
for video_url in videos:
f.write(f"{video_url}\n")
# Save as CSV
csv_file = self.output_dir / 'video_links.csv'
with open(csv_file, 'w', encoding='utf-8') as f:
f.write("Video URL,Found On,Type,Depth,Timestamp\n")
for video in self.found_videos:
f.write(f'"{video["url"]}","{video["found_on"]}",{video.get("type", "unknown")},{video["depth"]},{video["timestamp"]}\n')
# Save summary
summary_file = self.output_dir / 'summary.txt'
with open(summary_file, 'w', encoding='utf-8') as f:
f.write(f"Video Scraping Summary - {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("=" * 60 + "\n\n")
f.write(f"Total videos found: {len(self.found_videos)}\n")
f.write(f"URLs scanned: {len(self.visited_urls)}\n\n")
# Count by type
type_counts = {}
for video in self.found_videos:
vtype = video.get('type', 'unknown')
type_counts[vtype] = type_counts.get(vtype, 0) + 1
f.write("Videos by type:\n")
for vtype, count in sorted(type_counts.items()):
f.write(f" {vtype}: {count}\n")
f.write(f"\nHTML source files saved: {self.file_counter}\n")
print(f"\nResults saved to: {self.output_dir.absolute()}")
def run(self):
"""Main scraping process"""
urls = self.get_user_urls()
if not urls:
print("No URLs provided.")
return
print(f"\nStarting video scan of {len(urls)} URLs")
print(f"Max depth: {self.max_depth}")
print(f"Output directory: {self.output_dir.absolute()}")
print("-" * 60)
for url in urls:
self.scrape_recursive(url, depth=0)
print("-" * 60)
print(f"\nScan complete!")
print(f"Total video links found: {len(self.found_videos)}")
print(f"URLs scanned: {len(self.visited_urls)}")
if self.found_videos:
self.save_results()
else:
print("No video links were found.")
if __name__ == "__main__":
try:
scraper = VideoScraper(max_depth=1)
scraper.run()
except KeyboardInterrupt:
print("\n\nScanning interrupted by user.")
except Exception as e:
print(f"\n\nError: {str(e)}")