#!/usr/bin/env python3
"""
Checks URLs from a CSV file (e.g. .pdf files on justice.gov) by swapping the file extension
with common media formats (video, image, audio, archive, document) and saves
any files that respond successfully.
Usage:
python run.py urls.csv
python run.py (prompts for CSV path)
CSV Format:
Single column with URLs, one per line. Header row is optional.
Requirements (pip install):
requests
"""
import sys
import os
import re
import csv
import requests
from urllib.parse import urlparse
MEDIA_EXTENSIONS = {
"Video": [
".mp4", ".avi", ".mkv", ".mov", ".wmv", ".flv", ".webm",
".mpeg", ".mpg", ".3gp", ".ogv", ".ts", ".m4v", ".vob",
".asf", ".divx", ".rm", ".rmvb", ".swf", ".m2ts", ".mts",
".f4v", ".gifv", ".dv", ".mxf", ".roq", ".nsv", ".amv",
".m2v", ".svi", ".3g2", ".mpe", ".yuv", ".wmv9",
],
"Image": [
".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".tif",
".webp", ".svg", ".ico", ".heic", ".heif", ".avif",
".raw", ".cr2", ".nef", ".arw", ".dng", ".psd", ".ai",
".eps", ".jfif", ".jp2", ".jpx",
],
"Audio": [
".mp3", ".wav", ".flac", ".aac", ".ogg", ".wma", ".m4a",
".aiff", ".aif", ".opus", ".alac", ".ape", ".mid", ".midi",
".ac3", ".amr", ".au", ".ra", ".mka",
],
"Archive": [
".zip", ".rar", ".7z", ".tar", ".gz", ".tar.gz", ".tgz",
".bz2", ".xz", ".cab", ".iso", ".dmg",
],
"Document": [
".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx",
".rtf", ".odt", ".ods", ".odp", ".txt", ".csv",
".epub", ".mobi", ".tex", ".pages", ".numbers", ".key",
],
}
OUTPUT_DIR = "files"
def pass_age_verification(session: requests.Session, base_url: str):
"""
Handles the justice.gov age verification gate.
Sets the required cookie to bypass the age verification.
"""
# justice.gov uses a cookie named 'justiceGovAgeVerified' for age verification
# Setting this cookie allows direct access to age-gated content
print("[*] Setting age verification cookie...")
session.cookies.set('justiceGovAgeVerified', 'true', domain='www.justice.gov', path='/')
print("[+] Age verification cookie set.")
def strip_extension(url: str) -> str:
"""Remove the existing file extension from the URL."""
return re.sub(r"\.[a-zA-Z0-9]+$", "", url)
def check_url(session: requests.Session, url: str) -> requests.Response | None:
"""HEAD then GET a URL. Returns the GET response if it looks like a real file."""
try:
head = session.head(url, timeout=15, allow_redirects=True)
if head.status_code != 200:
return None
content_type = head.headers.get("Content-Type", "").lower()
# Skip HTML pages (error pages, redirects to age gate, etc.)
if "text/html" in content_type:
return None
# Looks promising - do a full GET
resp = session.get(url, timeout=60, stream=True)
if resp.status_code == 200 and "text/html" not in resp.headers.get("Content-Type", "").lower():
return resp
except requests.RequestException:
pass
return None
def read_urls_from_csv(csv_path: str) -> list[str]:
"""Read URLs from a CSV file. Expects URLs in the first column."""
urls = []
with open(csv_path, "r", encoding="utf-8") as f:
reader = csv.reader(f)
for row in reader:
if not row:
continue
url = row[0].strip()
# Skip empty rows and potential header rows
if url and url.lower() not in ("url", "urls", "link", "links"):
urls.append(url)
return urls
def process_url(session: requests.Session, original_url: str, all_extensions: list[tuple[str, str]]) -> int:
"""Process a single URL and return the number of files found."""
base_url_no_ext = strip_extension(original_url)
print(f"\n[*] Base URL: {base_url_no_ext}")
print(f"[*] Testing {len(all_extensions)} file extensions across {len(MEDIA_EXTENSIONS)} categories...\n")
found = 0
current_category = None
for category, ext in all_extensions:
if category != current_category:
current_category = category
print(f"\n -- {category} --")
test_url = base_url_no_ext + ext
label = ext.ljust(10)
print(f" {label} - checking...", end="", flush=True)
resp = check_url(session, test_url)
if resp:
# Save into a category subfolder
cat_dir = os.path.join(OUTPUT_DIR, category.lower())
os.makedirs(cat_dir, exist_ok=True)
parsed = urlparse(test_url)
filename = os.path.basename(parsed.path)
if not filename:
filename = f"file_{found}{ext}"
filepath = os.path.join(cat_dir, filename)
with open(filepath, "wb") as f:
for chunk in resp.iter_content(chunk_size=1024 * 256):
f.write(chunk)
size_mb = os.path.getsize(filepath) / (1024 * 1024)
content_type = resp.headers.get("Content-Type", "unknown")
print(f" FOUND! Saved -> {filepath} ({size_mb:.2f} MB, {content_type})")
found += 1
print(f"\n [*] File found, skipping remaining checks for this URL.")
break
else:
print(" --")
return found
def main():
# Get CSV path from command line or prompt
if len(sys.argv) > 1:
csv_path = sys.argv[1]
else:
csv_path = input("Enter CSV file path: ").strip()
if not csv_path:
print("No CSV path provided. Exiting.")
sys.exit(1)
if not os.path.exists(csv_path):
print(f"File not found: {csv_path}")
sys.exit(1)
# Read URLs from CSV
urls = read_urls_from_csv(csv_path)
if not urls:
print("No URLs found in CSV. Exiting.")
sys.exit(1)
print(f"[*] Loaded {len(urls)} URL(s) from {csv_path}")
os.makedirs(OUTPUT_DIR, exist_ok=True)
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36",
})
# Handle age gate
pass_age_verification(session, urls[0])
# Build extension list once
all_extensions = []
for category, exts in MEDIA_EXTENSIONS.items():
all_extensions.extend((category, ext) for ext in exts)
# Process each URL, skipping duplicates
total_found = 0
processed_bases = set()
skipped = 0
for i, url in enumerate(urls, 1):
base_url = strip_extension(url)
if base_url in processed_bases:
print(f"\n[*] Skipping URL {i}/{len(urls)} (duplicate base URL)")
print(f"[*] {url}")
skipped += 1
continue
processed_bases.add(base_url)
print(f"\n{'='*60}")
print(f"[*] Processing URL {i}/{len(urls)}")
print(f"[*] {url}")
print('='*60)
found = process_url(session, url, all_extensions)
total_found += found
print(f"\n[{'+'if found else '!'}] URL complete. {found} file(s) found.")
print(f"\n{'='*60}")
print(f"[{'+'if total_found else '!'}] All done. {total_found} total file(s) saved to ./{OUTPUT_DIR}/")
print(f"[*] Processed {len(urls) - skipped} URL(s), skipped {skipped} duplicate(s)")
print('='*60)
if __name__ == "__main__":
main()