#!/usr/bin/env python3 """ Checks URLs from a CSV file (e.g. .pdf files on justice.gov) by swapping the file extension with common media formats (video, image, audio, archive, document) and saves any files that respond successfully. Usage: python run.py urls.csv python run.py (prompts for CSV path) CSV Format: Single column with URLs, one per line. Header row is optional. Requirements (pip install): requests """ import sys import os import re import csv import requests from urllib.parse import urlparse MEDIA_EXTENSIONS = { "Video": [ ".mp4", ".avi", ".mkv", ".mov", ".wmv", ".flv", ".webm", ".mpeg", ".mpg", ".3gp", ".ogv", ".ts", ".m4v", ".vob", ".asf", ".divx", ".rm", ".rmvb", ".swf", ".m2ts", ".mts", ".f4v", ".gifv", ".dv", ".mxf", ".roq", ".nsv", ".amv", ".m2v", ".svi", ".3g2", ".mpe", ".yuv", ".wmv9", ], "Image": [ ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".tif", ".webp", ".svg", ".ico", ".heic", ".heif", ".avif", ".raw", ".cr2", ".nef", ".arw", ".dng", ".psd", ".ai", ".eps", ".jfif", ".jp2", ".jpx", ], "Audio": [ ".mp3", ".wav", ".flac", ".aac", ".ogg", ".wma", ".m4a", ".aiff", ".aif", ".opus", ".alac", ".ape", ".mid", ".midi", ".ac3", ".amr", ".au", ".ra", ".mka", ], "Archive": [ ".zip", ".rar", ".7z", ".tar", ".gz", ".tar.gz", ".tgz", ".bz2", ".xz", ".cab", ".iso", ".dmg", ], "Document": [ ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx", ".rtf", ".odt", ".ods", ".odp", ".txt", ".csv", ".epub", ".mobi", ".tex", ".pages", ".numbers", ".key", ], } OUTPUT_DIR = "files" def pass_age_verification(session: requests.Session, base_url: str): """ Handles the justice.gov age verification gate. Sets the required cookie to bypass the age verification. """ # justice.gov uses a cookie named 'justiceGovAgeVerified' for age verification # Setting this cookie allows direct access to age-gated content print("[*] Setting age verification cookie...") session.cookies.set('justiceGovAgeVerified', 'true', domain='www.justice.gov', path='/') print("[+] Age verification cookie set.") def strip_extension(url: str) -> str: """Remove the existing file extension from the URL.""" return re.sub(r"\.[a-zA-Z0-9]+$", "", url) def check_url(session: requests.Session, url: str) -> requests.Response | None: """HEAD then GET a URL. Returns the GET response if it looks like a real file.""" try: head = session.head(url, timeout=15, allow_redirects=True) if head.status_code != 200: return None content_type = head.headers.get("Content-Type", "").lower() # Skip HTML pages (error pages, redirects to age gate, etc.) if "text/html" in content_type: return None # Looks promising - do a full GET resp = session.get(url, timeout=60, stream=True) if resp.status_code == 200 and "text/html" not in resp.headers.get("Content-Type", "").lower(): return resp except requests.RequestException: pass return None def read_urls_from_csv(csv_path: str) -> list[str]: """Read URLs from a CSV file. Expects URLs in the first column.""" urls = [] with open(csv_path, "r", encoding="utf-8") as f: reader = csv.reader(f) for row in reader: if not row: continue url = row[0].strip() # Skip empty rows and potential header rows if url and url.lower() not in ("url", "urls", "link", "links"): urls.append(url) return urls def process_url(session: requests.Session, original_url: str, all_extensions: list[tuple[str, str]]) -> int: """Process a single URL and return the number of files found.""" base_url_no_ext = strip_extension(original_url) print(f"\n[*] Base URL: {base_url_no_ext}") print(f"[*] Testing {len(all_extensions)} file extensions across {len(MEDIA_EXTENSIONS)} categories...\n") found = 0 current_category = None for category, ext in all_extensions: if category != current_category: current_category = category print(f"\n -- {category} --") test_url = base_url_no_ext + ext label = ext.ljust(10) print(f" {label} - checking...", end="", flush=True) resp = check_url(session, test_url) if resp: # Save into a category subfolder cat_dir = os.path.join(OUTPUT_DIR, category.lower()) os.makedirs(cat_dir, exist_ok=True) parsed = urlparse(test_url) filename = os.path.basename(parsed.path) if not filename: filename = f"file_{found}{ext}" filepath = os.path.join(cat_dir, filename) with open(filepath, "wb") as f: for chunk in resp.iter_content(chunk_size=1024 * 256): f.write(chunk) size_mb = os.path.getsize(filepath) / (1024 * 1024) content_type = resp.headers.get("Content-Type", "unknown") print(f" FOUND! Saved -> {filepath} ({size_mb:.2f} MB, {content_type})") found += 1 print(f"\n [*] File found, skipping remaining checks for this URL.") break else: print(" --") return found def main(): # Get CSV path from command line or prompt if len(sys.argv) > 1: csv_path = sys.argv[1] else: csv_path = input("Enter CSV file path: ").strip() if not csv_path: print("No CSV path provided. Exiting.") sys.exit(1) if not os.path.exists(csv_path): print(f"File not found: {csv_path}") sys.exit(1) # Read URLs from CSV urls = read_urls_from_csv(csv_path) if not urls: print("No URLs found in CSV. Exiting.") sys.exit(1) print(f"[*] Loaded {len(urls)} URL(s) from {csv_path}") os.makedirs(OUTPUT_DIR, exist_ok=True) session = requests.Session() session.headers.update({ "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36", }) # Handle age gate pass_age_verification(session, urls[0]) # Build extension list once all_extensions = [] for category, exts in MEDIA_EXTENSIONS.items(): all_extensions.extend((category, ext) for ext in exts) # Process each URL, skipping duplicates total_found = 0 processed_bases = set() skipped = 0 for i, url in enumerate(urls, 1): base_url = strip_extension(url) if base_url in processed_bases: print(f"\n[*] Skipping URL {i}/{len(urls)} (duplicate base URL)") print(f"[*] {url}") skipped += 1 continue processed_bases.add(base_url) print(f"\n{'='*60}") print(f"[*] Processing URL {i}/{len(urls)}") print(f"[*] {url}") print('='*60) found = process_url(session, url, all_extensions) total_found += found print(f"\n[{'+'if found else '!'}] URL complete. {found} file(s) found.") print(f"\n{'='*60}") print(f"[{'+'if total_found else '!'}] All done. {total_found} total file(s) saved to ./{OUTPUT_DIR}/") print(f"[*] Processed {len(urls) - skipped} URL(s), skipped {skipped} duplicate(s)") print('='*60) if __name__ == "__main__": main()