# /// script # requires-python = ">=3.14" # dependencies = ["falcon>=4.0", "uvicorn[standard]", "typer", "argon2-cffi", "lxml"] # /// """RFC 4918 compliant WebDAV server using Falcon ASGI.""" from __future__ import annotations import asyncio import base64 import hashlib import datetime import html as _html import mimetypes import os import pathlib import re import shutil import time import urllib.parse import uuid from lxml import etree from lxml.builder import ElementMaker from dataclasses import dataclass, field from email.utils import formatdate import falcon import argon2 import argon2.exceptions import falcon.asgi import typer import uvicorn # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- D = "{DAV:}" E = ElementMaker(namespace="DAV:", nsmap={"D": "DAV:"}) _SECURE_PARSER = etree.XMLParser(resolve_entities=False, no_network=True) WRITE_METHODS = frozenset( {"PUT", "DELETE", "MKCOL", "COPY", "MOVE", "PROPPATCH", "LOCK", "UNLOCK"} ) ALL_METHODS = ( "GET, HEAD, OPTIONS, PUT, DELETE, MKCOL, COPY, MOVE, " "PROPFIND, PROPPATCH, LOCK, UNLOCK" ) LOCK_DEFAULT_TIMEOUT = 600 # seconds MAX_UPLOAD_BYTES = 8 * 1024 * 1024 * 1024 # 8 GiB hard ceiling per PUT MAX_LOCKS = 2048 # per-server lock table ceiling MAX_PROPS_PER_RESOURCE = 256 # dead property ceiling per resource MAX_PROP_VALUE_BYTES = 65_536 # 64 KiB per property value # --------------------------------------------------------------------------- # Data classes # --------------------------------------------------------------------------- @dataclass class LockInfo: token: str path: str owner_xml: str depth: str scope: str # "exclusive" or "shared" timeout: float # absolute time.time() when lock expires # --------------------------------------------------------------------------- # LockManager # --------------------------------------------------------------------------- class LockManager: def __init__(self) -> None: self._locks: dict[str, LockInfo] = {} # token -> LockInfo def cleanup(self) -> None: now = time.time() expired = [t for t, lk in self._locks.items() if lk.timeout <= now] for t in expired: del self._locks[t] def acquire( self, path: str, owner_xml: str, depth: str, scope: str, timeout_secs: int = LOCK_DEFAULT_TIMEOUT, ) -> LockInfo | None: self.cleanup() if len(self._locks) >= MAX_LOCKS: return None # server-wide lock table full # Check for conflicting locks for lk in self._locks.values(): if self._conflicts(path, depth, lk): if scope == "exclusive" or lk.scope == "exclusive": return None token = f"urn:uuid:{uuid.uuid4()}" lock = LockInfo( token=token, path=path, owner_xml=owner_xml, depth=depth, scope=scope, timeout=time.time() + timeout_secs, ) self._locks[token] = lock return lock def refresh( self, token: str, timeout_secs: int = LOCK_DEFAULT_TIMEOUT ) -> LockInfo | None: self.cleanup() lock = self._locks.get(token) if lock: lock.timeout = time.time() + timeout_secs return lock def release(self, token: str) -> bool: return self._locks.pop(token, None) is not None def get_locks(self, path: str) -> list[LockInfo]: self.cleanup() result = [] for lk in self._locks.values(): if lk.path == path: result.append(lk) elif lk.depth == "infinity" and path.startswith(lk.path.rstrip("/") + "/"): result.append(lk) return result def check_locked(self, path: str, if_header: str | None) -> bool: """Return True if the operation is allowed (lock-wise).""" locks = self.get_locks(path) if not locks: return True if not if_header: return False # Extract tokens from If header: () or ( [etag]) tokens = re.findall(r"<([^>]+)>", if_header) for lk in locks: if lk.scope == "exclusive" and lk.token not in tokens: return False return True def remove_for_path(self, path: str) -> None: to_del = [ t for t, lk in self._locks.items() if lk.path == path or lk.path.startswith(path.rstrip("/") + "/") ] for t in to_del: del self._locks[t] def move_locks(self, src: str, dst: str) -> None: src_prefix = src.rstrip("/") + "/" updates: list[tuple[str, str]] = [] for lk in self._locks.values(): if lk.path == src: updates.append((lk.token, dst)) elif lk.path.startswith(src_prefix): new_path = dst.rstrip("/") + "/" + lk.path[len(src_prefix) :] updates.append((lk.token, new_path)) for token, new_path in updates: self._locks[token].path = new_path @staticmethod def _conflicts(path: str, depth: str, existing: LockInfo) -> bool: ep = existing.path if ep == path: return True if existing.depth == "infinity" and path.startswith(ep.rstrip("/") + "/"): return True if depth == "infinity" and ep.startswith(path.rstrip("/") + "/"): return True return False # --------------------------------------------------------------------------- # PropertyManager (dead properties, in-memory only) # --------------------------------------------------------------------------- @dataclass class PropertyManager: _props: dict[str, dict[str, str]] = field(default_factory=dict) def set_prop(self, path: str, key: str, value: str) -> None: bucket = self._props.setdefault(path, {}) if len(value) > MAX_PROP_VALUE_BYTES: raise falcon.HTTPRequestEntityTooLarge( description="Property value exceeds size limit" ) if key not in bucket and len(bucket) >= MAX_PROPS_PER_RESOURCE: raise falcon.HTTPInsufficientStorage( description="Too many properties on this resource" ) bucket[key] = value def remove_prop(self, path: str, key: str) -> bool: if path in self._props and key in self._props[path]: del self._props[path][key] return True return False def get_all(self, path: str) -> dict[str, str]: return dict(self._props.get(path, {})) def delete(self, path: str) -> None: prefix = path.rstrip("/") + "/" to_del = [p for p in self._props if p == path or p.startswith(prefix)] for p in to_del: del self._props[p] def move(self, src: str, dst: str) -> None: src_prefix = src.rstrip("/") + "/" moves: list[tuple[str, str]] = [] for p in list(self._props): if p == src: moves.append((p, dst)) elif p.startswith(src_prefix): moves.append((p, dst.rstrip("/") + "/" + p[len(src_prefix) :])) for old, new in moves: self._props[new] = self._props.pop(old) # --------------------------------------------------------------------------- # XML helpers # --------------------------------------------------------------------------- def _content_type(fspath: pathlib.Path) -> str: if fspath.is_dir(): return "httpd/unix-directory" return mimetypes.guess_type(fspath.name)[0] or "application/octet-stream" def _etag(st: os.stat_result) -> str: raw = f"{st.st_ino}-{st.st_mtime_ns}-{st.st_size}" return f'"{hashlib.md5(raw.encode()).hexdigest()}"' def _http_date(ts: float) -> str: return formatdate(ts, usegmt=True) def _iso_date(ts: float) -> str: return datetime.datetime.fromtimestamp(ts, datetime.UTC).strftime( "%Y-%m-%dT%H:%M:%SZ" ) def _supported_lock_element() -> etree._Element: return E.supportedlock( *[ E.lockentry( E.lockscope(getattr(E, scope_name)()), E.locktype(E.write()), ) for scope_name in ("exclusive", "shared") ] ) def _lock_discovery_element(locks: list[LockInfo]) -> etree._Element: ld = E.lockdiscovery() for lk in locks: owner_el = E.owner() if lk.owner_xml: try: owner_el.append(etree.fromstring(lk.owner_xml, parser=_SECURE_PARSER)) except etree.XMLSyntaxError: owner_el.text = lk.owner_xml remaining = max(0, int(lk.timeout - time.time())) ld.append( E.activelock( E.locktype(E.write()), E.lockscope(getattr(E, lk.scope)()), E.depth(lk.depth), owner_el, E.timeout(f"Second-{remaining}"), E.locktoken(E.href(lk.token)), E.lockroot(E.href(lk.path)), ) ) return ld def _build_prop_response( href: str, props_ok: dict[str, etree._Element | str | None], props_404: list[str], ) -> etree._Element: response = E.response(E.href(href)) if props_ok: prop = E.prop() for name, val in props_ok.items(): if isinstance(val, etree._Element): prop.append(val) elif val is not None: child = etree.SubElement(prop, f"{D}{name}") child.text = str(val) else: etree.SubElement(prop, f"{D}{name}") response.append(E.propstat(prop, E.status("HTTP/1.1 200 OK"))) if props_404: prop = E.prop() for name in props_404: etree.SubElement(prop, f"{D}{name}") response.append(E.propstat(prop, E.status("HTTP/1.1 404 Not Found"))) return response def _build_multistatus(responses: list[etree._Element]) -> bytes: return etree.tostring( E.multistatus(*responses), xml_declaration=True, encoding="utf-8", ) def _parse_propfind(body: bytes) -> tuple[str, set[str] | None]: """Return (mode, requested_props). mode is one of "allprop", "propname", or "prop". For allprop/propname, requested_props is None. For prop, requested_props is a set of local tag names. """ if not body or not body.strip(): return "allprop", None try: root = etree.fromstring(body, parser=_SECURE_PARSER) except etree.XMLSyntaxError: return "allprop", None if root.find(f"{D}allprop") is not None: return "allprop", None if root.find(f"{D}propname") is not None: return "propname", None prop_el = root.find(f"{D}prop") if prop_el is not None: names = set() for child in prop_el: # Strip namespace tag = child.tag if tag.startswith(f"{D}"): names.add(tag[len(D) :]) else: names.add(tag) return "prop", names return "allprop", None def _parse_timeout(header: str | None) -> int: if not header: return LOCK_DEFAULT_TIMEOUT for part in header.split(","): part = part.strip() if part.lower().startswith("second-"): try: return min(int(part[7:]), 86400) except ValueError: pass return LOCK_DEFAULT_TIMEOUT def _parse_lockinfo(body: bytes) -> tuple[str, str]: """Return (scope, owner_xml).""" scope = "exclusive" owner_xml = "" if not body or not body.strip(): return scope, owner_xml try: root = etree.fromstring(body, parser=_SECURE_PARSER) except etree.XMLSyntaxError: return scope, owner_xml scope_el = root.find(f"{D}lockscope") if scope_el is not None: if scope_el.find(f"{D}shared") is not None: scope = "shared" owner_el = root.find(f"{D}owner") if owner_el is not None: children = list(owner_el) if children: owner_xml = etree.tostring(children[0], encoding="unicode") elif owner_el.text: owner_xml = owner_el.text return scope, owner_xml def _info_page( req: falcon.asgi.Request, mode: str, basedir: pathlib.Path, ) -> tuple[str, str]: """Return (content_type, body) for a human/agent-facing info response. Negotiates on Accept: text/html → rendered HTML, else plain markdown. """ host = req.get_header("Host") or "localhost" scheme = ( "https" if req.forwarded_prefix and req.forwarded_prefix.startswith("https") else "http" ) base_url = f"{scheme}://{host}/" md = f"""\ # WebDAV Server **Access mode:** {mode} **Base URL:** {base_url} **Serving:** {basedir} ## Connecting ### macOS Finder: Go → Connect to Server → `{base_url}` ### Linux ```sh # Mount (davfs2) sudo apt install davfs2 sudo mkdir /mnt/webdav sudo mount -t davfs {base_url} /mnt/webdav # GUI: Nautilus address bar davs://{host}/ # GUI: Dolphin address bar webdavs://{host}/ # CLI cadaver {base_url} ``` ### Windows ``` # Map Network Drive → Folder: {base_url} # Or via command line: net use Z: {base_url} ``` Note: if Windows refuses to connect, set registry key `HKLM\\SYSTEM\\CurrentControlSet\\Services\\WebClient\\Parameters\\BasicAuthLevel = 2` and restart the WebClient service. ### Cross-platform (rclone) ```sh rclone copy {base_url} ./local-copy --webdav-vendor other ``` Cyberduck and WinSCP also support WebDAV via the GUI. ### Agents / programmatic access ```sh # List directory curl -X PROPFIND -H "Depth: 1" {base_url} # Download a file curl {base_url}path/to/file # Upload a file (requires auth if server has credentials) curl -T localfile {base_url}path/to/file # Create a directory curl -X MKCOL {base_url}newdir/ ``` ## Supported Methods `GET` `HEAD` `PUT` `DELETE` `MKCOL` `COPY` `MOVE` `PROPFIND` `PROPPATCH` `LOCK` `UNLOCK` `OPTIONS` ## Protocol RFC 4918 WebDAV — DAV compliance classes 1, 2, 3. Use `PROPFIND` with `Depth: 1` to list collections. Use `PROPFIND` with `Depth: 0` to inspect a single resource. """ accept = req.get_header("Accept") or "" if "text/html" in accept: # Escape all user-controlled values before embedding in HTML. h_url = _html.escape(base_url, quote=True) h_host = _html.escape(host, quote=True) h_mode = _html.escape(mode) h_basedir = _html.escape(str(basedir)) page = f"""\ WebDAV Server

WebDAV Server

Access mode{h_mode}
Base URL{h_url}
Serving{h_basedir}

Connecting

macOS

Finder: Go → Connect to Server{h_url}

Linux

# Mount (davfs2)
sudo apt install davfs2
sudo mkdir /mnt/webdav
sudo mount -t davfs {h_url} /mnt/webdav

# GUI: Nautilus address bar → davs://{h_host}/
# GUI: Dolphin address bar  → webdavs://{h_host}/

# CLI
cadaver {h_url}

Windows

Map Network Drive → Folder: {h_url}
Or: net use Z: {h_url}

If Windows refuses to connect: set HKLM\\SYSTEM\\CurrentControlSet\\Services\\WebClient\\Parameters\\BasicAuthLevel = 2 and restart the WebClient service.

Cross-platform (rclone)

rclone copy {h_url} ./local-copy --webdav-vendor other

Cyberduck and WinSCP also support WebDAV via GUI.

Agents / programmatic access

# List directory
curl -X PROPFIND -H "Depth: 1" {h_url}

# Download a file
curl {h_url}path/to/file

# Upload (requires auth if credentials are configured)
curl -T localfile {h_url}path/to/file

# Create a directory
curl -X MKCOL {h_url}newdir/

Supported Methods

GET  HEAD  PUT  DELETE  MKCOL  COPY  MOVE
PROPFIND  PROPPATCH  LOCK  UNLOCK  OPTIONS

RFC 4918 WebDAV — DAV compliance classes 1, 2, 3.

""" return "text/html; charset=utf-8", page return "text/markdown; charset=utf-8", md def _lock_response_body(lock: LockInfo) -> bytes: return etree.tostring( E.prop(_lock_discovery_element([lock])), xml_declaration=True, encoding="utf-8", ) # --------------------------------------------------------------------------- # Middleware # --------------------------------------------------------------------------- class AuthMiddleware: """Unified auth middleware covering all four access modes. | credentials | allow_anonymous_read | reads | writes | |-------------|----------------------|------------|-----------------| | set | False | auth req'd | auth req'd | | set | True | open | auth req'd | | not set | True | open | 403 (no creds) | | not set | False | — | refused at CLI | """ _ph = argon2.PasswordHasher() def __init__( self, username: str | None, password: str | None, allow_anonymous_read: bool, ) -> None: self._has_creds = username is not None self._anon_read = allow_anonymous_read # Bind username into the secret so a single verify call checks both. # Neither username nor password is kept in memory after this point. if username is not None and password is not None: self._hash: str | None = self._ph.hash(f"{username}:{password}") else: self._hash = None async def process_request( self, req: falcon.asgi.Request, resp: falcon.asgi.Response ) -> None: is_write = req.method in WRITE_METHODS # Writes with no credentials configured → always forbidden if is_write and not self._has_creds: raise falcon.HTTPForbidden( description="Server has no write credentials configured" ) # Reads with anonymous access allowed → no auth needed if not is_write and self._anon_read: return # All remaining requests require valid credentials auth = req.get_header("Authorization") if not auth or not auth.startswith("Basic "): resp.set_header("WWW-Authenticate", 'Basic realm="WebDAV"') raise falcon.HTTPUnauthorized(description="Authentication required") try: decoded = base64.b64decode(auth[6:]).decode("utf-8", errors="replace") except Exception: raise falcon.HTTPUnauthorized(description="Invalid credentials") supplied_user, _, supplied_pw = decoded.partition(":") # Single verify call covers both username and password. # Run in a thread — Argon2 is intentionally CPU-intensive and would # block the event loop for every authenticated request. try: await asyncio.to_thread( self._ph.verify, self._hash, f"{supplied_user}:{supplied_pw}" ) except ( argon2.exceptions.VerifyMismatchError, argon2.exceptions.VerificationError, ): resp.set_header("WWW-Authenticate", 'Basic realm="WebDAV"') raise falcon.HTTPUnauthorized(description="Invalid credentials") # --------------------------------------------------------------------------- # WebDAV resource # --------------------------------------------------------------------------- class WebDAVResource: def __init__( self, basedir: pathlib.Path, lock_mgr: LockManager, prop_mgr: PropertyManager, mode: str = "unknown", ) -> None: self.basedir = basedir self.lock_mgr = lock_mgr self.prop_mgr = prop_mgr self.mode = mode # -- helpers ----------------------------------------------------------- def _resolve(self, path: str) -> pathlib.Path: decoded = urllib.parse.unquote(path) resolved = (self.basedir / decoded).resolve() if not resolved.is_relative_to(self.basedir): raise falcon.HTTPForbidden(description="Path traversal denied") return resolved def _relpath(self, fspath: pathlib.Path) -> str: rel = fspath.relative_to(self.basedir).as_posix() return "/" if rel == "." else "/" + rel def _href(self, fspath: pathlib.Path) -> str: p = urllib.parse.quote(self._relpath(fspath), safe="/") if fspath.is_dir() and not p.endswith("/"): p += "/" return p def _dav_path(self, fspath: pathlib.Path) -> str: return self._relpath(fspath) def _get_props( self, fspath: pathlib.Path, st: os.stat_result, mode: str, requested: set[str] | None, ) -> tuple[dict[str, etree._Element | str | None], list[str]]: is_dir = fspath.is_dir() etag = _etag(st) live: dict[str, etree._Element | str | None] = {} live["resourcetype"] = E.resourcetype(E.collection()) if is_dir else E.resourcetype() live["getlastmodified"] = _http_date(st.st_mtime) live["creationdate"] = _iso_date(st.st_ctime) live["displayname"] = fspath.name or "/" live["getetag"] = etag live["getcontenttype"] = _content_type(fspath) if not is_dir: live["getcontentlength"] = str(st.st_size) live["getcontentlanguage"] = None # not set live["supportedlock"] = _supported_lock_element() locks = self.lock_mgr.get_locks(self._dav_path(fspath)) live["lockdiscovery"] = _lock_discovery_element(locks) # Dead properties dead = self.prop_mgr.get_all(self._dav_path(fspath)) if mode == "propname": return {k: None for k in (*live, *dead)}, [] if mode == "allprop": return {**live, **dead}, [] # mode == "prop" props_ok: dict[str, etree._Element | str | None] = {} props_404: list[str] = [] if requested: for name in requested: if name in live: props_ok[name] = live[name] elif name in dead: props_ok[name] = dead[name] else: props_404.append(name) return props_ok, props_404 def _parse_destination(self, req: falcon.asgi.Request) -> pathlib.Path: dest = req.get_header("Destination") if not dest: raise falcon.HTTPBadRequest(description="Missing Destination header") return self._resolve(urllib.parse.urlparse(dest).path.lstrip("/")) @staticmethod def _set_lock_response( resp: falcon.asgi.Response, lock: LockInfo, status: str ) -> None: resp.status = status resp.content_type = "application/xml; charset=utf-8" resp.set_header("Lock-Token", f"<{lock.token}>") resp.data = _lock_response_body(lock) def _check_lock(self, req: falcon.asgi.Request, fspath: pathlib.Path) -> None: dav_path = self._dav_path(fspath) if_header = req.get_header("If") if not self.lock_mgr.check_locked(dav_path, if_header): raise falcon.HTTPLocked() # -- OPTIONS ----------------------------------------------------------- async def on_options( self, req: falcon.asgi.Request, resp: falcon.asgi.Response, **kwargs: str ) -> None: resp.status = falcon.HTTP_200 resp.set_header("DAV", "1, 2, 3") resp.set_header("Allow", ALL_METHODS) resp.set_header("MS-Author-Via", "DAV") resp.content_length = 0 # -- HEAD -------------------------------------------------------------- # -- HEAD / GET -------------------------------------------------------- async def on_head( self, req: falcon.asgi.Request, resp: falcon.asgi.Response, **kwargs: str ) -> None: await self.on_get(req, resp, **kwargs) async def on_get( self, req: falcon.asgi.Request, resp: falcon.asgi.Response, **kwargs: str ) -> None: fspath = self._resolve(kwargs.get("path", "")) if not fspath.exists(): raise falcon.HTTPNotFound() if fspath.is_dir(): ctype, body = _info_page(req, self.mode, self.basedir) resp.set_header("Vary", "Accept") resp.content_type = ctype resp.text = body return st = await asyncio.to_thread(os.stat, fspath) etag = _etag(st) resp.set_header("ETag", etag) resp.set_header("Last-Modified", _http_date(st.st_mtime)) resp.set_header("Accept-Ranges", "bytes") resp.content_type = _content_type(fspath) # Conditional request support inm = req.get_header("If-None-Match") if inm and etag in [t.strip() for t in inm.split(",")]: resp.status = falcon.HTTP_304 return data = await asyncio.to_thread(fspath.read_bytes) # Range request support range_header = req.get_header("Range") if range_header and range_header.startswith("bytes="): total = len(data) try: range_spec = range_header[6:] start_s, _, end_s = range_spec.partition("-") if start_s: start = int(start_s) end = int(end_s) if end_s else total - 1 else: # suffix range: -500 means last 500 bytes suffix = int(end_s) start = max(0, total - suffix) end = total - 1 except ValueError: resp.status = "416 Range Not Satisfiable" resp.set_header("Content-Range", f"bytes */{total}") return if start < 0 or start >= total or end >= total or start > end: resp.status = "416 Range Not Satisfiable" resp.set_header("Content-Range", f"bytes */{total}") return resp.status = "206 Partial Content" resp.set_header("Content-Range", f"bytes {start}-{end}/{total}") resp.data = data[start : end + 1] resp.content_length = end - start + 1 else: resp.data = data resp.content_length = len(data) # -- PUT --------------------------------------------------------------- async def on_put( self, req: falcon.asgi.Request, resp: falcon.asgi.Response, **kwargs: str ) -> None: fspath = self._resolve(kwargs.get("path", "")) self._check_lock(req, fspath) parent = fspath.parent if not parent.exists(): raise falcon.HTTPConflict(description="Parent collection does not exist") if fspath.is_dir(): raise falcon.HTTPConflict(description="Cannot PUT to a collection") # Conditional request im = req.get_header("If-Match") if im: if fspath.exists(): st = await asyncio.to_thread(os.stat, fspath) etag = _etag(st) if etag not in [t.strip() for t in im.split(",")]: raise falcon.HTTPPreconditionFailed() else: raise falcon.HTTPPreconditionFailed() cl = req.content_length if cl is not None and cl > MAX_UPLOAD_BYTES: raise falcon.HTTPRequestEntityTooLarge( description=f"Upload exceeds {MAX_UPLOAD_BYTES // (1024**3)} GiB limit" ) existed = fspath.exists() body = await req.bounded_stream.read(MAX_UPLOAD_BYTES + 1) if len(body) > MAX_UPLOAD_BYTES: raise falcon.HTTPRequestEntityTooLarge( description=f"Upload exceeds {MAX_UPLOAD_BYTES // (1024**3)} GiB limit" ) await asyncio.to_thread(fspath.write_bytes, body) st = await asyncio.to_thread(os.stat, fspath) resp.set_header("ETag", _etag(st)) resp.status = falcon.HTTP_204 if existed else falcon.HTTP_201 # -- DELETE ------------------------------------------------------------ async def on_delete( self, req: falcon.asgi.Request, resp: falcon.asgi.Response, **kwargs: str ) -> None: fspath = self._resolve(kwargs.get("path", "")) if not fspath.exists(): raise falcon.HTTPNotFound() self._check_lock(req, fspath) dav_path = self._dav_path(fspath) if fspath.is_dir(): await asyncio.to_thread(shutil.rmtree, fspath) else: await asyncio.to_thread(fspath.unlink) self.lock_mgr.remove_for_path(dav_path) self.prop_mgr.delete(dav_path) resp.status = falcon.HTTP_204 # -- MKCOL ------------------------------------------------------------- async def on_mkcol( self, req: falcon.asgi.Request, resp: falcon.asgi.Response, **kwargs: str ) -> None: fspath = self._resolve(kwargs.get("path", "")) # MKCOL must not have a body body = await req.bounded_stream.read() if body and body.strip(): raise falcon.HTTPUnsupportedMediaType() if fspath.exists(): raise falcon.HTTPMethodNotAllowed(ALL_METHODS.split(", ")) if not fspath.parent.exists(): raise falcon.HTTPConflict(description="Parent collection does not exist") await asyncio.to_thread(fspath.mkdir) resp.status = falcon.HTTP_201 # -- COPY / MOVE shared preamble ---------------------------------------- def _check_copy_move( self, req: falcon.asgi.Request, fspath: pathlib.Path ) -> tuple[pathlib.Path, bool]: """Validate and return (destination, existed_before).""" dst = self._parse_destination(req) if fspath.resolve() == dst.resolve(): raise falcon.HTTPForbidden( description="Source and destination are the same" ) overwrite = req.get_header("Overwrite") or "T" existed = dst.exists() if existed and overwrite.upper() == "F": raise falcon.HTTPPreconditionFailed() return dst, existed # -- COPY -------------------------------------------------------------- async def on_copy( self, req: falcon.asgi.Request, resp: falcon.asgi.Response, **kwargs: str ) -> None: fspath = self._resolve(kwargs.get("path", "")) if not fspath.exists(): raise falcon.HTTPNotFound() dst, existed = self._check_copy_move(req, fspath) depth = req.get_header("Depth") or "infinity" if fspath.is_dir(): if existed: await asyncio.to_thread(shutil.rmtree, dst) if depth == "0": await asyncio.to_thread(dst.mkdir, parents=False, exist_ok=True) else: await asyncio.to_thread( shutil.copytree, fspath, dst, dirs_exist_ok=True ) else: if existed and dst.is_dir(): await asyncio.to_thread(shutil.rmtree, dst) await asyncio.to_thread(shutil.copy2, fspath, dst) resp.status = falcon.HTTP_204 if existed else falcon.HTTP_201 # -- MOVE -------------------------------------------------------------- async def on_move( self, req: falcon.asgi.Request, resp: falcon.asgi.Response, **kwargs: str ) -> None: fspath = self._resolve(kwargs.get("path", "")) if not fspath.exists(): raise falcon.HTTPNotFound() self._check_lock(req, fspath) dst, existed = self._check_copy_move(req, fspath) if not dst.parent.exists(): raise falcon.HTTPConflict(description="Destination parent does not exist") if existed: if dst.is_dir(): await asyncio.to_thread(shutil.rmtree, dst) else: await asyncio.to_thread(dst.unlink) await asyncio.to_thread(shutil.move, str(fspath), str(dst)) src_dav = self._dav_path(fspath) dst_dav = self._dav_path(dst) self.lock_mgr.move_locks(src_dav, dst_dav) self.prop_mgr.move(src_dav, dst_dav) resp.status = falcon.HTTP_204 if existed else falcon.HTTP_201 # -- PROPFIND ---------------------------------------------------------- async def on_propfind( self, req: falcon.asgi.Request, resp: falcon.asgi.Response, **kwargs: str ) -> None: fspath = self._resolve(kwargs.get("path", "")) if not fspath.exists(): raise falcon.HTTPNotFound() depth = req.get_header("Depth") or "1" if depth == "infinity": raise falcon.HTTPForbidden(description="Depth: infinity not supported") body = await req.bounded_stream.read() mode, requested = _parse_propfind(body) resources: list[pathlib.Path] = [fspath] if depth == "1" and fspath.is_dir(): children = await asyncio.to_thread( lambda: sorted(fspath.iterdir(), key=lambda p: p.name) ) resources.extend(children) responses: list[etree._Element] = [] for res in resources: try: st = await asyncio.to_thread(os.stat, res) except OSError: continue href = self._href(res) props_ok, props_404 = self._get_props(res, st, mode, requested) responses.append(_build_prop_response(href, props_ok, props_404)) resp.status = "207 Multi-Status" resp.content_type = "application/xml; charset=utf-8" resp.data = _build_multistatus(responses) # -- PROPPATCH --------------------------------------------------------- async def on_proppatch( self, req: falcon.asgi.Request, resp: falcon.asgi.Response, **kwargs: str ) -> None: fspath = self._resolve(kwargs.get("path", "")) if not fspath.exists(): raise falcon.HTTPNotFound() self._check_lock(req, fspath) body = await req.bounded_stream.read() if not body: raise falcon.HTTPBadRequest(description="Empty PROPPATCH body") try: root = etree.fromstring(body, parser=_SECURE_PARSER) except etree.XMLSyntaxError: raise falcon.HTTPBadRequest(description="Invalid XML") dav_path = self._dav_path(fspath) props_ok: dict[str, etree._Element | str | None] = {} for set_el in root.findall(f"{D}set"): prop_el = set_el.find(f"{D}prop") if prop_el is not None: for child in prop_el: key = child.tag value = ( etree.tostring(child, encoding="unicode") if len(child) else (child.text or "") ) self.prop_mgr.set_prop(dav_path, key, value) props_ok[ child.tag.split("}")[-1] if "}" in child.tag else child.tag ] = None for remove_el in root.findall(f"{D}remove"): prop_el = remove_el.find(f"{D}prop") if prop_el is not None: for child in prop_el: key = child.tag self.prop_mgr.remove_prop(dav_path, key) props_ok[ child.tag.split("}")[-1] if "}" in child.tag else child.tag ] = None href = self._href(fspath) response = _build_prop_response(href, props_ok, []) resp.status = "207 Multi-Status" resp.content_type = "application/xml; charset=utf-8" resp.data = _build_multistatus([response]) # -- LOCK -------------------------------------------------------------- async def on_lock( self, req: falcon.asgi.Request, resp: falcon.asgi.Response, **kwargs: str ) -> None: fspath = self._resolve(kwargs.get("path", "")) dav_path = self._dav_path(fspath) depth = req.get_header("Depth") or "infinity" timeout_secs = _parse_timeout(req.get_header("Timeout")) # Lock refresh if_header = req.get_header("If") if if_header: tokens = re.findall(r"<([^>]+)>", if_header) for token in tokens: lock = self.lock_mgr.refresh(token, timeout_secs) if lock: self._set_lock_response(resp, lock, falcon.HTTP_200) return raise falcon.HTTPPreconditionFailed() body = await req.bounded_stream.read() scope, owner_xml = _parse_lockinfo(body) # Create the resource if it doesn't exist (lock-null resource) created = False if not fspath.exists(): if not fspath.parent.exists(): raise falcon.HTTPConflict( description="Parent collection does not exist" ) await asyncio.to_thread(fspath.write_bytes, b"") created = True lock = self.lock_mgr.acquire(dav_path, owner_xml, depth, scope, timeout_secs) if not lock: raise falcon.HTTPLocked() self._set_lock_response( resp, lock, falcon.HTTP_201 if created else falcon.HTTP_200 ) # -- UNLOCK ------------------------------------------------------------ async def on_unlock( self, req: falcon.asgi.Request, resp: falcon.asgi.Response, **kwargs: str ) -> None: token_header = req.get_header("Lock-Token") if not token_header: raise falcon.HTTPBadRequest(description="Missing Lock-Token header") # Strip angle brackets token = token_header.strip("<>") if self.lock_mgr.release(token): resp.status = falcon.HTTP_204 else: raise falcon.HTTPConflict(description="Lock token not found") # --------------------------------------------------------------------------- # App factory # --------------------------------------------------------------------------- def _error_serializer( req: falcon.asgi.Request, resp: falcon.asgi.Response, exception: falcon.HTTPError, ) -> None: """Return plain-text error bodies instead of JSON.""" resp.content_type = "text/plain; charset=utf-8" title = exception.status or "Error" desc = exception.description or "" body = f"{title}\n{desc}" if desc else str(title) resp.text = body def create_app( basedir: pathlib.Path, username: str | None = None, password: str | None = None, allow_anonymous_read: bool = False, ) -> falcon.asgi.App: if username and allow_anonymous_read: mode = "public reads, authenticated writes" elif username: mode = "all access requires auth" else: mode = "public read-only" lock_mgr = LockManager() prop_mgr = PropertyManager() middleware: list = [AuthMiddleware(username, password, allow_anonymous_read)] app = falcon.asgi.App(middleware=middleware) app.set_error_serializer(_error_serializer) resource = WebDAVResource(basedir, lock_mgr, prop_mgr, mode) app.add_route("/{path:path}", resource) return app # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- cli = typer.Typer(add_completion=False) def _print_usage() -> None: lines = [ "Usage: webdav.py --basedir DIR [OPTIONS]", "", " RFC 4918 WebDAV server. Serves a directory over HTTP/WebDAV.", " Secure by default: at least one of --allow-anonymous-read or", " --username/--password must be provided.", "", "Options:", " --basedir TEXT Directory to serve (required)", " --port INT Port to listen on [default: 8080]", " --username TEXT Username for write access", " --password TEXT Password for write access", " --allow-anonymous-read Allow unauthenticated reads", " --help Show this message and exit", "", "Access matrix:", " --username/--password only → reads and writes require auth", " --allow-anonymous-read only → reads open, writes blocked (no creds)", " both → reads open, writes require auth", "", "Examples:", " # Public read-only share", " uv run webdav.py --basedir . --allow-anonymous-read", "", " # Private (all access requires auth)", " uv run webdav.py --basedir /srv/files --username alice --password secret", "", " # Public reads, authenticated writes", " uv run webdav.py --basedir ./data --allow-anonymous-read \\", " --username alice --password secret", "", "macOS Finder:", " Finder > Go > Connect to Server > http://:/", ] typer.echo("\n".join(lines)) @cli.command() def main( basedir: str = typer.Option( None, help="Directory to serve. Must exist.", show_default=False ), port: int = typer.Option(8080, help="Port to listen on."), username: str | None = typer.Option( None, help="Username for write access. Requires --password." ), password: str | None = typer.Option( None, help="Password for write access. Requires --username." ), allow_anonymous_read: bool = typer.Option( False, "--allow-anonymous-read", help="Allow unauthenticated read access." ), ) -> None: """RFC 4918 WebDAV server. Serves a directory over HTTP/WebDAV. Secure by default: requires --username/--password and/or --allow-anonymous-read. Works with macOS Finder, Cyberduck, cadaver, and other standard WebDAV clients. """ if not basedir: _print_usage() raise typer.Exit(0) # Credentials must be paired if bool(username) != bool(password): typer.echo( "Error: --username and --password must both be provided.\n" "\n" " Private server: uv run webdav.py --basedir DIR" " --username USER --password PASS\n" " Public reads: uv run webdav.py --basedir DIR" " --allow-anonymous-read", err=True, ) raise typer.Exit(1) # Must enable at least one access mode if not username and not allow_anonymous_read: typer.echo( "Error: server would be inaccessible — provide credentials and/or" " --allow-anonymous-read.\n" "\n" " Private server: uv run webdav.py --basedir DIR" " --username USER --password PASS\n" " Public reads: uv run webdav.py --basedir DIR" " --allow-anonymous-read\n" " Both: uv run webdav.py --basedir DIR" " --allow-anonymous-read --username USER --password PASS", err=True, ) raise typer.Exit(1) base = pathlib.Path(basedir).resolve() if not base.is_dir(): typer.echo( f"Error: '{base}' is not a directory.\n" "\n" f" Create it first: mkdir -p {base}\n" f" Or pick another: uv run webdav.py --basedir /tmp/dav", err=True, ) raise typer.Exit(1) app = create_app(base, username, password, allow_anonymous_read) if username and allow_anonymous_read: mode = "public reads, authenticated writes" elif username: mode = "all access requires auth" else: mode = "public read-only" typer.echo(f"WebDAV server [{mode}] on http://0.0.0.0:{port}/ serving {base}") uvicorn.run(app, host="0.0.0.0", port=port) if __name__ == "__main__": cli()