mirror of
https://github.com/mkdocs/mkdocs.git
synced 2026-03-27 09:58:31 +07:00
Restore the functionality of watching files through symlinks
Watchdog doesn't support it directly but it happened to work everywhere except Windows This is achieved by walking through the target directory, finding any symlinks in it, and watching their target too (+recursive walking of those too). Important note: only symlinks that exist during startup will be taken into account, new ones aren't added. Fixes #2425 Closes #2426
This commit is contained in:
@@ -4,6 +4,7 @@ import logging
|
||||
import mimetypes
|
||||
import os
|
||||
import os.path
|
||||
import pathlib
|
||||
import re
|
||||
import socketserver
|
||||
import threading
|
||||
@@ -76,8 +77,10 @@ class LiveReloadServer(socketserver.ThreadingMixIn, wsgiref.simple_server.WSGISe
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
def callback(event):
|
||||
if event.is_directory:
|
||||
def callback(event, allowed_path=None):
|
||||
if isinstance(event, watchdog.events.DirCreatedEvent):
|
||||
return
|
||||
if allowed_path is not None and event.src_path != allowed_path:
|
||||
return
|
||||
# Text editors always cause a "file close" event in addition to "modified" when saving
|
||||
# a file. Some editors also have "swap" functionality that keeps writing into another
|
||||
@@ -91,9 +94,43 @@ class LiveReloadServer(socketserver.ThreadingMixIn, wsgiref.simple_server.WSGISe
|
||||
self._to_rebuild[func] = True
|
||||
self._rebuild_cond.notify_all()
|
||||
|
||||
handler = watchdog.events.FileSystemEventHandler()
|
||||
handler.on_any_event = callback
|
||||
self.observer.schedule(handler, path, recursive=recursive)
|
||||
dir_handler = watchdog.events.FileSystemEventHandler()
|
||||
dir_handler.on_any_event = callback
|
||||
|
||||
seen = set()
|
||||
|
||||
def schedule(path):
|
||||
seen.add(path)
|
||||
if os.path.isfile(path):
|
||||
# Watchdog doesn't support watching files, so watch its directory and filter by path
|
||||
handler = watchdog.events.FileSystemEventHandler()
|
||||
handler.on_any_event = lambda event: callback(event, allowed_path=path)
|
||||
|
||||
parent = os.path.dirname(path)
|
||||
log.debug(f"Watching file '{path}' through directory '{parent}'")
|
||||
self.observer.schedule(handler, parent)
|
||||
else:
|
||||
log.debug(f"Watching directory '{path}'")
|
||||
self.observer.schedule(dir_handler, path, recursive=recursive)
|
||||
|
||||
schedule(os.path.realpath(path))
|
||||
|
||||
def watch_symlink_targets(path_obj): # path is os.DirEntry or pathlib.Path
|
||||
if path_obj.is_symlink():
|
||||
# The extra `readlink` is needed due to https://bugs.python.org/issue9949
|
||||
target = os.path.realpath(os.readlink(os.fspath(path_obj)))
|
||||
if target in seen or not os.path.exists(target):
|
||||
return
|
||||
schedule(target)
|
||||
|
||||
path_obj = pathlib.Path(target)
|
||||
|
||||
if path_obj.is_dir() and recursive:
|
||||
with os.scandir(os.fspath(path_obj)) as scan:
|
||||
for entry in scan:
|
||||
watch_symlink_targets(entry)
|
||||
|
||||
watch_symlink_targets(pathlib.Path(path))
|
||||
|
||||
def serve(self):
|
||||
self.observer.start()
|
||||
|
||||
@@ -70,8 +70,7 @@ def tempdir(files=None, **kw):
|
||||
"""
|
||||
files = {f: '' for f in files} if isinstance(files, (list, tuple)) else files or {}
|
||||
|
||||
if 'prefix' not in kw:
|
||||
kw['prefix'] = 'mkdocs_test-'
|
||||
kw['prefix'] = 'mkdocs_test-' + kw.get('prefix', '')
|
||||
|
||||
def decorator(fn):
|
||||
@wraps(fn)
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
import contextlib
|
||||
import email
|
||||
import io
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import unittest
|
||||
@@ -36,6 +37,7 @@ def testing_server(root, builder=lambda: None, mount_path="/"):
|
||||
port=0,
|
||||
root=root,
|
||||
mount_path=mount_path,
|
||||
build_delay=0.1,
|
||||
bind_and_activate=False,
|
||||
)
|
||||
server.setup_environ()
|
||||
@@ -75,28 +77,40 @@ class BuildTests(unittest.TestCase):
|
||||
self.assertEqual(headers["_status"], "200 OK")
|
||||
self.assertEqual(headers.get("content-length"), str(len(output)))
|
||||
|
||||
@tempdir({"foo.docs": "a"})
|
||||
@tempdir({"docs/foo.docs": "docs1", "mkdocs.yml": "yml1"})
|
||||
@tempdir({"foo.site": "original"})
|
||||
def test_basic_rebuild(self, site_dir, docs_dir):
|
||||
def test_basic_rebuild(self, site_dir, origin_dir):
|
||||
docs_dir = Path(origin_dir, "docs")
|
||||
|
||||
started_building = threading.Event()
|
||||
|
||||
def rebuild():
|
||||
started_building.set()
|
||||
content = Path(docs_dir, "foo.docs").read_text()
|
||||
Path(site_dir, "foo.site").write_text(content * 5)
|
||||
Path(site_dir, "foo.site").write_text(
|
||||
Path(docs_dir, "foo.docs").read_text() + Path(origin_dir, "mkdocs.yml").read_text()
|
||||
)
|
||||
|
||||
with testing_server(site_dir, rebuild) as server:
|
||||
server.watch(docs_dir, rebuild)
|
||||
server.watch(Path(origin_dir, "mkdocs.yml"), rebuild)
|
||||
time.sleep(0.01)
|
||||
|
||||
_, output = do_request(server, "GET /foo.site")
|
||||
self.assertEqual(output, "original")
|
||||
|
||||
Path(docs_dir, "foo.docs").write_text("b")
|
||||
Path(docs_dir, "foo.docs").write_text("docs2")
|
||||
self.assertTrue(started_building.wait(timeout=10))
|
||||
started_building.clear()
|
||||
|
||||
_, output = do_request(server, "GET /foo.site")
|
||||
self.assertEqual(output, "bbbbb")
|
||||
self.assertEqual(output, "docs2yml1")
|
||||
|
||||
Path(origin_dir, "mkdocs.yml").write_text("yml2")
|
||||
self.assertTrue(started_building.wait(timeout=10))
|
||||
started_building.clear()
|
||||
|
||||
_, output = do_request(server, "GET /foo.site")
|
||||
self.assertEqual(output, "docs2yml2")
|
||||
|
||||
@tempdir({"foo.docs": "a"})
|
||||
@tempdir({"foo.site": "original"})
|
||||
@@ -408,3 +422,99 @@ class BuildTests(unittest.TestCase):
|
||||
headers, _ = do_request(server, "GET /")
|
||||
self.assertEqual(headers["_status"], "302 Found")
|
||||
self.assertEqual(headers.get("location"), "/mount/path/")
|
||||
|
||||
@tempdir({"mkdocs.yml": "original", "mkdocs2.yml": "original"}, prefix="tmp_dir")
|
||||
@tempdir(prefix="origin_dir")
|
||||
@tempdir({"subdir/foo.md": "original"}, prefix="dest_docs_dir")
|
||||
def test_watches_direct_symlinks(self, dest_docs_dir, origin_dir, tmp_dir):
|
||||
try:
|
||||
Path(origin_dir, "docs").symlink_to(dest_docs_dir, target_is_directory=True)
|
||||
Path(origin_dir, "mkdocs.yml").symlink_to(Path(tmp_dir, "mkdocs.yml"))
|
||||
except NotImplementedError: # PyPy on Windows
|
||||
self.skipTest("Creating symlinks not supported")
|
||||
|
||||
started_building = threading.Event()
|
||||
|
||||
def wait_for_build():
|
||||
result = started_building.wait(timeout=10)
|
||||
started_building.clear()
|
||||
with self.assertLogs("mkdocs.livereload"):
|
||||
do_request(server, "GET /")
|
||||
return result
|
||||
|
||||
with testing_server(tmp_dir, started_building.set) as server:
|
||||
server.watch(Path(origin_dir, "docs"))
|
||||
server.watch(Path(origin_dir, "mkdocs.yml"))
|
||||
time.sleep(0.01)
|
||||
|
||||
Path(tmp_dir, "mkdocs.yml").write_text("edited")
|
||||
self.assertTrue(wait_for_build())
|
||||
|
||||
Path(dest_docs_dir, "subdir", "foo.md").write_text("edited")
|
||||
self.assertTrue(wait_for_build())
|
||||
|
||||
Path(origin_dir, "unrelated.md").write_text("foo")
|
||||
self.assertFalse(started_building.wait(timeout=0.2))
|
||||
|
||||
@tempdir(["file_dest_1.md", "file_dest_2.md", "file_dest_unused.md"], prefix="tmp_dir")
|
||||
@tempdir(["file_under.md"], prefix="dir_to_link_to")
|
||||
@tempdir()
|
||||
def test_watches_through_symlinks(self, docs_dir, dir_to_link_to, tmp_dir):
|
||||
try:
|
||||
Path(docs_dir, "link1.md").symlink_to(Path(tmp_dir, "file_dest_1.md"))
|
||||
Path(docs_dir, "linked_dir").symlink_to(dir_to_link_to, target_is_directory=True)
|
||||
|
||||
Path(dir_to_link_to, "sublink.md").symlink_to(Path(tmp_dir, "file_dest_2.md"))
|
||||
except NotImplementedError: # PyPy on Windows
|
||||
self.skipTest("Creating symlinks not supported")
|
||||
|
||||
started_building = threading.Event()
|
||||
|
||||
def wait_for_build():
|
||||
result = started_building.wait(timeout=10)
|
||||
started_building.clear()
|
||||
with self.assertLogs("mkdocs.livereload"):
|
||||
do_request(server, "GET /")
|
||||
return result
|
||||
|
||||
with testing_server(docs_dir, started_building.set) as server:
|
||||
server.watch(docs_dir)
|
||||
time.sleep(0.01)
|
||||
|
||||
Path(tmp_dir, "file_dest_1.md").write_text("edited")
|
||||
self.assertTrue(wait_for_build())
|
||||
|
||||
Path(dir_to_link_to, "file_under.md").write_text("edited")
|
||||
self.assertTrue(wait_for_build())
|
||||
|
||||
Path(tmp_dir, "file_dest_2.md").write_text("edited")
|
||||
self.assertTrue(wait_for_build())
|
||||
|
||||
Path(docs_dir, "link1.md").unlink()
|
||||
self.assertTrue(wait_for_build())
|
||||
|
||||
Path(tmp_dir, "file_dest_unused.md").write_text("edited")
|
||||
self.assertFalse(started_building.wait(timeout=0.2))
|
||||
|
||||
@tempdir()
|
||||
def test_watch_with_broken_symlinks(self, docs_dir):
|
||||
Path(docs_dir, "subdir").mkdir()
|
||||
|
||||
try:
|
||||
if sys.platform != "win32":
|
||||
Path(docs_dir, "subdir", "circular").symlink_to(Path(docs_dir))
|
||||
Path(docs_dir, "self_link").symlink_to(Path(docs_dir, "self_link"))
|
||||
|
||||
Path(docs_dir, "broken_1").symlink_to(Path(docs_dir, "oh no"))
|
||||
Path(docs_dir, "broken_2").symlink_to(Path(docs_dir, "oh no"), target_is_directory=True)
|
||||
Path(docs_dir, "broken_3").symlink_to(Path(docs_dir, "broken_2"))
|
||||
except NotImplementedError: # PyPy on Windows
|
||||
self.skipTest("Creating symlinks not supported")
|
||||
|
||||
started_building = threading.Event()
|
||||
with testing_server(docs_dir, started_building.set) as server:
|
||||
server.watch(docs_dir)
|
||||
time.sleep(0.01)
|
||||
|
||||
Path(docs_dir, "subdir", "test").write_text("test")
|
||||
self.assertTrue(started_building.wait(timeout=10))
|
||||
|
||||
Reference in New Issue
Block a user