diff --git a/pyproject.toml b/pyproject.toml index 5b71b45..31bea23 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,7 @@ classifiers = [ "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable" ] -scripts = { syncmymoodle = "syncmymoodle.__main__:main" } +scripts = { syncmymoodle = "syncmymoodle.cli:main" } urls = { "Homepage" = "https://github.com/Romern/syncMyMoodle", "Bug Tracker" = "https://github.com/Romern/syncMyMoodle/issues" } diff --git a/syncmymoodle/__main__.py b/syncmymoodle/__main__.py index 1e9c12e..ec40e33 100755 --- a/syncmymoodle/__main__.py +++ b/syncmymoodle/__main__.py @@ -1,3204 +1,4 @@ -#!/usr/bin/env python3 - -import base64 -import getpass -import gzip -import hashlib -import hmac -import http.client -import json -import logging -import os -import re -import struct -import sys -import tempfile -import time -import urllib.parse -from argparse import ArgumentParser -from contextlib import closing -from fnmatch import fnmatchcase -from pathlib import Path -from types import ModuleType -from typing import List - -import requests -import yt_dlp -from bs4 import BeautifulSoup as bs -from tqdm import tqdm - -try: - import keyring as imported_keyring - - keyring: ModuleType | None = imported_keyring -except ImportError: - keyring = None - -YOUTUBE_ID_LENGTH = 11 -NAME_CLASH_ID_UNSET = object() -YOUTUBE_LINK_RE = re.compile( - r"(https?://(www\.)?(youtube\.com/(watch\?[a-zA-Z0-9_=&-]*v=|embed/)|youtu.be/).{11})" -) -OPENCAST_LINK_RE = re.compile( - r"https://engage\.streaming\.rwth-aachen\.de/play/[a-zA-Z0-9-]+" -) -SCIEBO_LINK_RE = re.compile(r"https://rwth-aachen\.sciebo\.de/s/[a-zA-Z0-9-]+") -MOODLE_URL = "https://moodle.rwth-aachen.de/" -RWTH_HOMEPAGE_URL = "https://www.rwth-aachen.de/" -RWTH_STATUS_URL = "https://maintenance.itc.rwth-aachen.de/ticket/status/messages" -RWTH_MOODLE_STATUS_URL = ( - "https://maintenance.itc.rwth-aachen.de/ticket/status/messages/499?locale=en" -) -RWTH_SSO_STATUS_URL = ( - "https://maintenance.itc.rwth-aachen.de/ticket/status/messages/462?locale=en" -) -RWTH_DISRUPTIVE_STATUS_CLASSES = { - "statuslabel_stoerung", - "statuslabel_teilstoerung", - "statuslabel_wartung", - "statuslabel_warnung", -} -COURSE_PREFIX_RE = re.compile(r"^\((?P[^()]{2})\) +(?P.+)$") -COURSE_PREFIX_HANDLING_OPTIONS = ("keep", "remove", "suffix") - -logger = logging.getLogger(__name__) - - -""" -To add TOTP functionality without adding external dependencies. -Code taken from: -https://github.com/susam/mintotp -""" - - -def hotp(key, counter, digits=6, digest="sha1"): - key = base64.b32decode(key.upper() + "=" * ((8 - len(key)) % 8)) - counter = struct.pack(">Q", counter) - mac = hmac.new(key, counter, digest).digest() - offset = mac[-1] & 0x0F - binary = struct.unpack(">L", mac[offset : offset + 4])[0] & 0x7FFFFFFF - return str(binary)[-digits:].zfill(digits) - - -def totp(key, time_step=30, digits=6, digest="sha1"): - return hotp(key, int(time.time() / time_step), digits, digest) - - -class Node: - def __init__( - self, - name, - id, - type, # noqa: A003 - keep original name for compatibility - parent, - url=None, - additional_info=None, - timemodified=None, - etag=None, - name_clash_id=NAME_CLASH_ID_UNSET, - is_downloaded=False, - ): - self.name = name - self.id = id - self.url = url - self.type = type - self.parent = parent - self.children: List[Node] = [] - # Currently only used for course_id in opencast, auth header in sciebo, - # and may be extended for other module-specific data. - self.additional_info = additional_info - self.timemodified = timemodified - self.etag = etag - self.name_clash_id = ( - id if name_clash_id is NAME_CLASH_ID_UNSET else name_clash_id - ) - self.is_downloaded = ( - is_downloaded # Can also be used to exclude files from being downloaded - ) - - def __repr__(self): - return f"Node(name={self.name}, id={self.id}, url={self.url}, type={self.type})" - - def add_child( - self, - name, - id, - type, - url=None, - additional_info=None, - timemodified=None, - etag=None, - name_clash_id=NAME_CLASH_ID_UNSET, - ): - if url: - url = url.replace("?forcedownload=1", "").replace( - "mod_page/content/3/", "mod_page/content/" - ) - url = url.replace("webservice/pluginfile.php", "pluginfile.php") - - # Check for duplicate urls and just ignore those nodes: - if url and any([True for c in self.children if c.url == url]): - return None - - temp = Node( - name, - id, - type, - self, - url=url, - additional_info=additional_info, - timemodified=timemodified, - etag=etag, - name_clash_id=name_clash_id, - ) - self.children.append(temp) - return temp - - def clone(self, parent=None): - clone = Node( - self.name, - self.id, - self.type, - parent, - url=self.url, - additional_info=self.additional_info, - timemodified=self.timemodified, - etag=self.etag, - name_clash_id=self.name_clash_id, - is_downloaded=self.is_downloaded, - ) - clone.children = [child.clone(clone) for child in self.children] - return clone - - def get_path(self): - ret = [] - cur = self - while cur is not None: - ret.insert(0, cur.name) - cur = cur.parent - return ret - - def go_to_path(self, target_path): - target_node = [self] - for path_child in target_path: - if path_child == "": - continue - try: - target_node.append( - [ - node_child - for node_child in target_node[-1].children - if node_child.name == path_child - ][0] - ) - except IndexError: - raise Exception("The path is not found in this root node. Wrong path?") - return target_node[-1] - - def _clash_suffix(self): - # Stable, distinct suffix used to disambiguate same-named siblings. - # Fall back to the URL when no name_clash_id is set (direct-link, - # embedded, and direct-content file nodes pass name_clash_id=None); - # otherwise such nodes would all hash to md5("None") and collide onto - # the same path, silently dropping all but one file. - key = self.name_clash_id if self.name_clash_id is not None else self.url - return base64.urlsafe_b64encode( - hashlib.md5(str(key).encode("utf-8")).hexdigest().encode("utf-8") - ).decode()[:10] - - def remove_children_nameclashes(self): - # Check for duplicate filenames - - unclashed_children = [] - # work on copy since deleting from the iterated list breaks stuff - copy_children = self.children.copy() - for child in copy_children: - if child not in self.children: - continue - self.children.remove(child) - unclashed_children.append(child) - if child.type == "Opencast": - siblings = [ - c - for c in self.children - if c.name == child.name and c.url != child.url - ] - if len(siblings) > 0: - # if an Opencast filename is duplicate in its directory, we append the filename as it was uploaded - tmp_name = Path(child.name).name - child.name = f"{tmp_name}_{child.url.split('/')[-1]}" - for s in siblings: - tmp_name = Path(s.name).name - s.name = f"{s.name}_{s.url.split('/')[-1]}" - self.children.remove(s) - unclashed_children.extend(siblings) - - self.children = unclashed_children - - unclashed_children = [] - copy_children = self.children.copy() - for child in copy_children: - if child not in self.children: - continue - self.children.remove(child) - unclashed_children.append(child) - siblings = [ - c - for c in self.children - if c.name == child.name - and ( - c.url != child.url - # Course prefix handling may create duplicate URL-less course - # folders. Other URL-less nodes, such as duplicate Moodle - # sections, keep the legacy behavior and merge silently. - or ( - child.type == "Course" - and c.type == "Course" - and c.name_clash_id != child.name_clash_id - ) - ) - ] - if len(siblings) > 0: - # if a filename is still duplicate in its directory, we rename - # it by appending a stable per-node key (works for ids and urls). - filename = Path(child.name) - child.name = ( - filename.stem + "_" + child._clash_suffix() + filename.suffix - ) - for s in siblings: - filename = Path(s.name) - s.name = filename.stem + "_" + s._clash_suffix() + filename.suffix - self.children.remove(s) - unclashed_children.extend(siblings) - - self.children = unclashed_children - - for child in self.children: - # recurse whole tree - child.remove_children_nameclashes() - - -class SyncMyMoodle: - params = {"lang": "en"} # Titles for some pages differ - block_size = 1024 - invalid_chars = '~"#%&*:<>?/\\{|}' - - def __init__(self, config): - self.config = config - self.session = None - self.session_key = None - self.wstoken = None - self.user_id = None - self.root_node = None - # Per-course caches: mapping from course directory path to cached - # course root node loaded from `.syncmymoodle_cache`. - self._course_caches = {} - # Track repeated Opencast errors so we can hint at the RWTH - # status page without spamming messages - self._opencast_error_count = 0 - self._opencast_status_hint_logged = False - # Sciebo shares often appear multiple times in Moodle pages. Cache the - # resolved node tree during one run so repeated links do not trigger - # duplicate page fetches and WebDAV PROPFIND walks. - self._sciebo_link_cache = {} - # Opencast episodes can be discovered through multiple Moodle surfaces - # in one sync run. Cache successful launches and track lookups to avoid - # repeating identical LTI/API requests. - self._opencast_episode_auth_cache = set() - self._opencast_track_cache = {} - - def _harden_private_file(self, path: Path, description: str) -> bool: - if not path.exists(): - return True - if path.is_symlink(): - logger.warning("Refusing to use symlinked %s file: %s", description, path) - return False - try: - path.chmod(0o600) - except OSError: - logger.warning( - "Could not restrict permissions for %s file: %s", description, path - ) - return True - - def _write_private_gzip_json(self, path: Path, payload) -> None: - path = path.expanduser() - path.parent.mkdir(parents=True, exist_ok=True) - - json_bytes = json.dumps(payload, separators=(",", ":")).encode("utf-8") - data = gzip.compress(json_bytes) - - fd, tmp_name = tempfile.mkstemp(prefix=f".{path.name}.", dir=path.parent) - tmp_path = Path(tmp_name) - try: - os.fchmod(fd, 0o600) - with os.fdopen(fd, "wb") as f: - f.write(data) - os.replace(tmp_path, path) - path.chmod(0o600) - finally: - if tmp_path.exists(): - tmp_path.unlink() - - def _read_private_gzip_json(self, path: Path, description: str): - path = path.expanduser() - if not path.exists(): - return None - if not self._harden_private_file(path, description): - return None - try: - with path.open("rb") as f: - return json.loads(gzip.decompress(f.read()).decode("utf-8")) - except (OSError, gzip.BadGzipFile, UnicodeDecodeError, json.JSONDecodeError): - logger.warning( - "Ignoring legacy or invalid %s file %s. Delete it if this warning repeats.", - description, - path, - ) - return None - - def _match_old_cache_child(self, old_node, child): - """Find the previous cache node corresponding to ``child``, if any.""" - if old_node is None: - return None - candidates = [ - c - for c in getattr(old_node, "children", []) - if c.name == child.name and c.type == child.type - ] - if not candidates: - return None - for candidate in candidates: - if candidate.url == child.url: - return candidate - return candidates[0] - - def _node_to_cache_data(self, node: Node, old_node: Node | None = None): - timemodified = node.timemodified - etag = node.etag - is_downloaded = node.is_downloaded - # If this file was not (re)downloaded this run but a previously - # downloaded version is still on disk, keep the previously cached version - # markers. Otherwise the cache would record Moodle's new timemodified/etag - # for a file we never actually fetched, which either skips the file - # forever or moves the on-disk copy aside as a spurious conflict on the - # next run's retry. - if ( - not node.is_downloaded - and old_node is not None - and getattr(old_node, "is_downloaded", False) - and self.get_sanitized_node_path(node).exists() - ): - timemodified = getattr(old_node, "timemodified", None) - etag = getattr(old_node, "etag", None) - is_downloaded = True - return { - "name": node.name, - "id": node.id, - "type": node.type, - "url": node.url, - "timemodified": timemodified, - "etag": etag, - "name_clash_id": node.name_clash_id, - "is_downloaded": is_downloaded, - "children": [ - self._node_to_cache_data( - child, self._match_old_cache_child(old_node, child) - ) - for child in node.children - ], - } - - def _node_from_cache_data(self, data, parent=None): - node = Node( - data.get("name", ""), - data.get("id"), - data.get("type", "Unknown"), - parent, - url=data.get("url"), - timemodified=data.get("timemodified"), - etag=data.get("etag"), - name_clash_id=data.get("name_clash_id", NAME_CLASH_ID_UNSET), - is_downloaded=data.get("is_downloaded", False), - ) - node.children = [ - self._node_from_cache_data(child, node) - for child in data.get("children", []) - if isinstance(child, dict) - ] - return node - - def _cookies_to_data(self): - cookies = [] - for cookie in self.session.cookies: - cookies.append( - { - "name": cookie.name, - "value": cookie.value, - "domain": cookie.domain, - "path": cookie.path, - "secure": cookie.secure, - "expires": cookie.expires, - "rest": getattr(cookie, "_rest", {}), - } - ) - return {"format": "syncmymoodle.cookies.v1", "cookies": cookies} - - def _load_cookies_from_data(self, payload): - if not isinstance(payload, dict): - return - if payload.get("format") != "syncmymoodle.cookies.v1": - logger.warning("Ignoring unsupported cookie file format") - return - - for cookie_data in payload.get("cookies", []): - if not isinstance(cookie_data, dict): - continue - if not cookie_data.get("name"): - continue - cookie = requests.cookies.create_cookie( - name=cookie_data["name"], - value=cookie_data.get("value", ""), - domain=cookie_data.get("domain") or "", - path=cookie_data.get("path") or "/", - secure=bool(cookie_data.get("secure")), - expires=cookie_data.get("expires"), - rest=cookie_data.get("rest") or {}, - ) - self.session.cookies.set_cookie(cookie) - - def _save_session_cookies(self, cookie_file: Path) -> None: - self._write_private_gzip_json(cookie_file, self._cookies_to_data()) - - def cache_root_node(self): - """Persist per-course caches into .syncmymoodle_cache files. - - Each course directory beneath basedir receives its own cache file - containing the course subtree, which makes caching less brittle than - a single global root cache. - """ - if not self.root_node: - return - - for semester_node in self.root_node.children: - if semester_node.type != "Semester": - continue - for course_node in semester_node.children: - if course_node.type != "Course": - continue - course_path = self.get_sanitized_node_path(course_node) - # Read the previous course cache before overwriting it, so we can - # preserve version markers for files that were not downloaded - # this run (see _node_to_cache_data). - old_course_root = self._get_course_cache_root(course_node) - course_path.mkdir(parents=True, exist_ok=True) - cache_path = course_path / ".syncmymoodle_cache" - self._write_private_gzip_json( - cache_path, - { - "format": "syncmymoodle.course-cache.v1", - "course": self._node_to_cache_data( - course_node, old_course_root - ), - }, - ) - - def _ensure_timemodified_attribute(self, node): - # Old cached root nodes might not have the timemodified attribute yet. - if not hasattr(node, "timemodified"): - node.timemodified = None - if not hasattr(node, "etag"): - node.etag = None - if not hasattr(node, "name_clash_id"): - node.name_clash_id = getattr(node, "id", None) - for child in getattr(node, "children", []): - self._ensure_timemodified_attribute(child) - - def _get_course_node(self, node: Node) -> Node: - """Return the enclosing course node for the given node.""" - cur = node - while cur is not None and cur.parent is not None: - if cur.type == "Course": - return cur - cur = cur.parent - raise Exception("Node is not part of a course subtree") - - def _get_course_cache_root(self, course_node: Node): - """Load and return the cached course root for the given course node.""" - course_path = self.get_sanitized_node_path(course_node) - if course_path in self._course_caches: - return self._course_caches[course_path] - - cache_path = course_path / ".syncmymoodle_cache" - if not cache_path.exists(): - return None - - payload = self._read_private_gzip_json(cache_path, "course cache") - if not isinstance(payload, dict): - return None - if payload.get("format") != "syncmymoodle.course-cache.v1": - logger.warning("Ignoring unsupported course cache format: %s", cache_path) - return None - course_data = payload.get("course") - if not isinstance(course_data, dict): - return None - - cached_course_root = self._node_from_cache_data(course_data) - self._ensure_timemodified_attribute(cached_course_root) - - self._course_caches[course_path] = cached_course_root - return cached_course_root - - def _get_old_node_for(self, node: Node): - """Return the cached node for this node from the course cache, if any.""" - try: - course_node = self._get_course_node(node) - except Exception: - return None - - cached_course_root = self._get_course_cache_root(course_node) - if cached_course_root is None: - return None - - full_path = node.get_path() - course_path = course_node.get_path() - # Compute the path segments beneath the course root - rel_segments = full_path[len(course_path) :] - if not rel_segments: - return cached_course_root - - try: - return cached_course_root.go_to_path(rel_segments) - except Exception: - return None - - def _get_or_add_child(self, parent_node, name, id, type): - for child in parent_node.children: - if child.name == name and child.type == type: - return child - return parent_node.add_child(name, id, type) - - def _add_moodle_file_node( - self, - parent_node, - moodle_filepath, - filename, - id, - type, - url, - timemodified=None, - name_clash_id=NAME_CLASH_ID_UNSET, - ): - target_node = parent_node - path_segments = [ - self.sanitize(segment) - for segment in str(moodle_filepath or "").strip("/").split("/") - if segment - ] - - for segment in path_segments: - target_node = self._get_or_add_child(target_node, segment, None, "Folder") - if target_node is None: - return None - - return target_node.add_child( - filename, - id, - type, - url=url, - timemodified=timemodified, - name_clash_id=name_clash_id, - ) - - def _add_moodle_content_file_node(self, parent_node, content, file_type=None): - file_url = content.get("fileurl") - if not file_url: - return None - - mimetype = content.get("mimetype") or "unknown" - filename = urllib.parse.urlsplit(file_url).path.split("/")[-1] - if not filename: - filename = content.get("filename") - return self._add_moodle_file_node( - parent_node, - "/", - filename, - file_url, - file_type or f"Linked file [{mimetype}]", - file_url, - timemodified=content.get("timemodified"), - name_clash_id=None, - ) - - def _is_direct_moodle_file_content(self, module, content): - file_url = content.get("fileurl") - if not file_url or content.get("type") != "file": - return False - - mimetype = str(content.get("mimetype") or "").split(";", 1)[0].lower() - if not mimetype or mimetype in { - "document/unknown", - "unknown", - "text/html", - "application/xhtml+xml", - }: - return False - if mimetype.startswith("text/"): - return False - - modname = module.get("modname") - if modname in {"resource", "pdfannotator"}: - return True - - # Page modules often expose their rendered body as index.html. Keep - # that path in the HTML scanner, but direct-add binary attachments. - if modname == "page" and content.get("filename") != "index.html": - return True - - return False - - def _scan_html_text_for_links( - self, html_text, base_url, parent_node, course_id, module_title=None - ): - if "video-js" in html_text and " bool: - """Return True if ``course_id`` is referenced by a configured entry. - - Entries are course URLs (``.../course/view.php?id=NNN``). The ``id`` - query parameter is compared exactly, so e.g. ``id=12`` does not also - match courses ``1`` or ``2``. A bare numeric id entry is also accepted. - """ - course_id = str(course_id) - for entry in entries or []: - entry = str(entry) - parsed = urllib.parse.urlparse(entry) - if course_id in urllib.parse.parse_qs(parsed.query).get("id", []): - return True - if entry.strip() == course_id: - return True - return False - - def _configured_patterns(self, *keys, course_id=None): - patterns = [] - for key in keys: - value = self.config.get(key) - if isinstance(value, dict): - patterns.extend(self._as_list(value.get("*"))) - if course_id is not None: - patterns.extend(self._as_list(value.get(str(course_id)))) - else: - patterns.extend(self._as_list(value)) - return [str(pattern) for pattern in patterns if pattern is not None] - - def _format_course_name(self, course_name): - prefix_handling = self.config.get("course_prefix_handling", "keep") - if prefix_handling == "keep": - return course_name - if prefix_handling not in COURSE_PREFIX_HANDLING_OPTIONS: - logger.warning( - "Unsupported course_prefix_handling value %r; using keep", - prefix_handling, - ) - return course_name - - match = COURSE_PREFIX_RE.match(course_name) - if not match: - return course_name - - name = match.group("course_name") - prefix = match.group("prefix") - if prefix_handling == "remove": - return name - return f"{name} ({prefix})" - - def _matches_any_pattern(self, values, patterns): - for value in values: - if value is None: - continue - value = str(value) - for pattern in patterns: - if value == pattern or fnmatchcase(value, pattern): - return True - return False - - def _domain_matches(self, netloc, allowed_domain): - host = netloc.split("@")[-1].split(":")[0].lower() - domain = str(allowed_domain).strip().lower() - domain = urllib.parse.urlparse(domain).netloc or domain - domain = domain.split("@")[-1].split(":")[0] - if not domain: - return False - if fnmatchcase(host, domain): - return True - if domain.startswith("*."): - return host.endswith(domain[1:]) - return host == domain or host.endswith(f".{domain}") - - def _should_skip_url(self, url, context="link"): - if not url: - return False - - url = str(url).replace("&", "&") - if self._matches_any_pattern([url], self._configured_patterns("exclude_links")): - logger.info("Skipping %s %s because it matches exclude_links", context, url) - return True - - allowed_domains = self._configured_patterns("allowed_domains") - if allowed_domains: - parsed_url = urllib.parse.urlparse(url) - if parsed_url.scheme in {"http", "https"} and parsed_url.netloc: - if not any( - self._domain_matches(parsed_url.netloc, domain) - for domain in allowed_domains - ): - logger.info( - "Skipping %s %s because it is outside allowed_domains", - context, - url, - ) - return True - - return False - - def _should_skip_section(self, section, course_id): - patterns = self._configured_patterns( - "exclude_sections", "skip_sections", course_id=course_id - ) - if not patterns: - return False - - values = [section.get("name"), section.get("id")] - if self._matches_any_pattern(values, patterns): - logger.info( - "Skipping section %s (%s) in course %s because it matches " - "exclude_sections", - section.get("name"), - section.get("id"), - course_id, - ) - return True - return False - - def _should_skip_module(self, module, course_id): - patterns = self._configured_patterns( - "exclude_modules", "skip_modules", course_id=course_id - ) - if not patterns: - return False - - module_id = module.get("id") - module_name = module.get("name") - modname = module.get("modname") - module_urls = [] - if module.get("url"): - module_urls.append(module.get("url")) - if module_id and modname: - module_urls.extend( - [ - f"https://moodle.rwth-aachen.de/mod/{modname}/view.php?id={module_id}", - f"https://moodle.rwth-aachen.de/mod/{modname}/launch.php?id={module_id}", - ] - ) - - values = [module_id, module_name, modname, *module_urls] - if self._matches_any_pattern(values, patterns): - logger.info( - "Skipping module %s (%s) in course %s because it matches " - "exclude_modules", - module_name, - module_id, - course_id, - ) - return True - return False - - def _make_conflict_path(self, path: Path) -> Path: - """Return a unique path for storing a locally modified file.""" - suffix = path.suffix - stem = path.stem - - # Derive a short hash from the current contents to make the filename - # stable and recognizable while remaining reasonably unique. - hash_str = "unknown" - try: - with path.open("rb") as f: - digest = hashlib.file_digest(f, "sha1") - hash_str = digest.hexdigest()[:8] - except FileNotFoundError: - hash_str = "missing" - - conflict_path = path.with_name(f"{stem}.syncconflict.{hash_str}{suffix}") - index = 1 - while conflict_path.exists(): - conflict_path = path.with_name( - f"{stem}.syncconflict.{hash_str}.{index}{suffix}" - ) - index += 1 - return conflict_path - - def _local_file_matches_etag(self, path: Path, etag: str) -> bool: - """Return True if the local file content matches the given ETag hash. - - We currently support strong ETags that contain a plain hex digest for - MD5 (32 chars), SHA1 (40 chars) or SHA256 (64 chars). Other formats are - ignored and treated as non-matching. - """ - # Extract a plausible hex digest from the ETag value, ignoring weak - # prefixes (W/) and surrounding quotes or algorithm markers. - match = re.search(r"([0-9a-fA-F]{32,64})", etag) - if not match: - return False - hex_str = match.group(1).lower() - - algo = None - if len(hex_str) == 32: - algo = "md5" - elif len(hex_str) == 40: - algo = "sha1" - elif len(hex_str) == 64: - algo = "sha256" - else: - return False - - with path.open("rb") as f: - digest = hashlib.file_digest(f, algo) - return digest.hexdigest() == hex_str - - def _log_opencast_backend_issue(self, response_body: str | None = None) -> None: - """Log additional context for repeated Opencast backend issues. - - We keep the response body at INFO level (only shown with --verbose) and - emit a hint to the RWTH ITC status page once the error - counter exceeds a small threshold. - """ - self._opencast_error_count += 1 - - if response_body: - logger.info(f"Opencast response body (truncated): {response_body[:1000]}") - - if self._opencast_error_count >= 5 and not self._opencast_status_hint_logged: - logger.warning( - "Multiple Opencast backend errors occurred. Please check the RWTH " - "ITC status page before reporting an issue on GitHub: " - "https://maintenance.itc.rwth-aachen.de/ticket/status/messages/499" - ) - self._opencast_status_hint_logged = True - - def _check_general_connectivity(self): - try: - response = requests.get(RWTH_HOMEPAGE_URL, timeout=10) - except requests.RequestException as exc: - logger.warning( - "General connectivity check to %s failed: %s", - RWTH_HOMEPAGE_URL, - exc, - ) - return False - - if response.status_code >= 500: - logger.warning( - "General connectivity check to %s returned status %s", - RWTH_HOMEPAGE_URL, - response.status_code, - ) - return False - - logger.info("General connectivity check to %s succeeded", RWTH_HOMEPAGE_URL) - return True - - def _current_rwth_service_issues(self, service_name, status_url): - try: - response = requests.get(status_url, timeout=10) - except requests.RequestException as exc: - logger.warning( - "Could not fetch RWTH ITC status page for %s: %s", service_name, exc - ) - return [] - - if not (200 <= response.status_code < 300): - logger.warning( - "RWTH ITC status page for %s returned status %s", - service_name, - response.status_code, - ) - return [] - - soup = bs(response.text, features="lxml") - issues = [] - for card in soup.select(".notification-card"): - indicator = card.select_one(".notification-status-indicator") - status_label = card.select_one(".incident_queue-statuses div") - if indicator and "old" in indicator.get("class", []): - continue - if status_label and "old" in status_label.get("class", []): - continue - - status_classes = set(status_label.get("class", []) if status_label else []) - if not status_classes.intersection(RWTH_DISRUPTIVE_STATUS_CLASSES): - continue - - title = card.select_one(".report_title h3") - issue_link = card.select_one("[id^=link-to-copy-]") - issues.append( - { - "service": service_name, - "status": ( - status_label.get_text(" ", strip=True) - if status_label - else "Status issue" - ), - "title": ( - title.get_text(" ", strip=True) - if title - else "Current service issue" - ), - "url": ( - issue_link.get_text(" ", strip=True) - if issue_link - else status_url - ), - } - ) - return issues - - def _check_rwth_status_page(self): - logger.warning("Check the RWTH ITC status page: %s", RWTH_STATUS_URL) - issues = [] - for service_name, status_url in [ - ("RWTHmoodle", RWTH_MOODLE_STATUS_URL), - ("RWTH Single Sign-On", RWTH_SSO_STATUS_URL), - ]: - issues.extend(self._current_rwth_service_issues(service_name, status_url)) - - if not issues: - logger.info( - "No current RWTHmoodle or RWTH Single Sign-On outage was found " - "on the RWTH ITC status pages" - ) - return - - for issue in issues: - logger.warning( - "%s may currently be affected: %s - %s. See %s", - issue["service"], - issue["status"], - issue["title"], - issue["url"], - ) - - def _check_moodle_availability(self): - if not self.session: - raise Exception("You need a requests session first.") - - try: - response = self.session.get(MOODLE_URL, timeout=15) - except requests.RequestException as exc: - logger.critical("Could not reach RWTHmoodle at %s: %s", MOODLE_URL, exc) - self._check_general_connectivity() - self._check_rwth_status_page() - sys.exit(1) - - if response.status_code >= 500: - logger.critical( - "RWTHmoodle returned status %s before login", - response.status_code, - ) - self._check_rwth_status_page() - sys.exit(1) - - if response.status_code >= 400: - logger.warning( - "RWTHmoodle availability check returned status %s; login may fail", - response.status_code, - ) - self._check_rwth_status_page() - - return response - - # RWTH SSO Login - - def login(self): - def get_session_key(soup): - script = soup.find("script", string=lambda text: text and "sesskey" in text) - match = ( - re.search(r'"sesskey":"(.*?)"', script.text) - if script is not None - else None - ) - if match: - return match.group(1) - else: - logger.critical("Can't retrieve session key from JavaScript config") - sys.exit(1) - - def require_input_value(soup, name, context): - value = self._get_input_value(soup, name) - if value is None: - logger.critical( - "Failed to login: expected form field %r was missing at the " - "%s. The RWTH login flow may have changed or the servers may " - "have difficulties. For current service status, see %s.", - name, - context, - RWTH_STATUS_URL, - ) - self._check_rwth_status_page() - logger.info("-------Login-Error-Soup--------") - logger.info(soup) - sys.exit(1) - return value - - self.session = requests.Session() - cookie_file = Path(self.config.get("cookie_file", "./session")).expanduser() - cookie_payload = self._read_private_gzip_json(cookie_file, "session cookie") - if cookie_payload is not None: - self._load_cookies_from_data(cookie_payload) - self._check_moodle_availability() - try: - resp = self.session.get( - urllib.parse.urljoin(MOODLE_URL, "auth/shibboleth/index.php"), - timeout=15, - ) - except requests.RequestException as exc: - logger.critical("Could not reach RWTH SSO login endpoint: %s", exc) - self._check_general_connectivity() - self._check_rwth_status_page() - sys.exit(1) - if resp.url.startswith("https://moodle.rwth-aachen.de/my/"): - soup = bs(resp.text, features="lxml") - self.session_key = get_session_key(soup) - self._save_session_cookies(cookie_file) - return - - # Create a separate soup for maintenance detection - soup_check = bs(resp.text, features="lxml") - - # Remove known info banners by class - for banner in soup_check.select(".themeboostunioninfobanner"): - banner.decompose() - - # Also remove Bootstrap-style alert boxes marked as informational alerts - for alert in soup_check.select('div.alert[role="alert"]'): - alert.decompose() - - # Extract body text after cleanup - body = soup_check.find("body") - body_text = body.get_text(separator=" ", strip=True) if body else "" - - # Check for maintenance notice - if "Wartungsarbeiten" in body_text: - logger.critical( - "Detected Maintenance mode! If this is an error, please report it on GitHub." - ) - logger.info(f"Cleaned page body:\n{body_text}") - sys.exit() - - soup = bs(resp.text, features="lxml") - if soup.find("input", {"name": "RelayState"}) is None: - csrf_token = require_input_value( - soup, "csrf_token", "username/password form" - ) - login_data = { - "j_username": self.config["user"], - "j_password": self.config["password"], - "_eventId_proceed": "", - "csrf_token": csrf_token, - } - resp2 = self.session.post(resp.url, data=login_data) - - soup = bs(resp2.text, features="lxml") - - if soup.find(id="fudis_selected_token_ids_input") is None: - logger.critical( - "Failed to login. Maybe your login-info was wrong or the " - "RWTH servers have difficulties. For current service " - "status, see %s. For more info use the --verbose argument.", - RWTH_STATUS_URL, - ) - self._check_rwth_status_page() - logger.info("-------Login-Error-Soup--------") - logger.info(soup) - sys.exit(1) - - csrf_token = require_input_value( - soup, "csrf_token", "TOTP generator selection form" - ) - - print("Setting TOTP generator") - totp_selection_data = { - "fudis_selected_token_ids_input": self.config["totp"], - "_eventId_proceed": "", - "csrf_token": csrf_token, - } - - resp3 = self.session.post(resp2.url, data=totp_selection_data) - - soup = bs(resp3.text, features="lxml") - if soup.find(id="fudis_otp_input") is None: - logger.critical( - "Failed to select TOTP generator. Maybe your TOTP serial " - "number is wrong or the RWTH servers have difficulties. " - "For current service status, see %s. For more info use " - "the --verbose argument.", - RWTH_STATUS_URL, - ) - self._check_rwth_status_page() - logger.info("-------Login-Error-Soup--------") - logger.info(soup) - sys.exit(1) - - csrf_token = require_input_value(soup, "csrf_token", "TOTP entry form") - if not self.config.get("totpsecret"): - totp_input = input(f"Enter TOTP for generator {self.config['totp']}:\n") - else: - totp_input = totp(self.config.get("totpsecret")) - print(f"Generated TOTP from provided secret: {totp_input}") - - totp_login_data = { - "fudis_otp_input": totp_input, - "_eventId_proceed": "", - "csrf_token": csrf_token, - } - - resp4 = self.session.post(resp3.url, data=totp_login_data) - - time.sleep(1) # if we go too fast, we might have our connection closed - soup = bs(resp4.text, features="lxml") - if soup.find("input", {"name": "RelayState"}) is None: - logger.critical( - "Failed to login. Maybe your login-info was wrong or the RWTH " - "servers have difficulties. For current service status, see " - "%s. For more info use the --verbose argument.", - RWTH_STATUS_URL, - ) - self._check_rwth_status_page() - logger.info("-------Login-Error-Soup--------") - logger.info(soup) - sys.exit(1) - data = { - "RelayState": require_input_value(soup, "RelayState", "SAML response"), - "SAMLResponse": require_input_value(soup, "SAMLResponse", "SAML response"), - } - resp = self.session.post( - "https://moodle.rwth-aachen.de/Shibboleth.sso/SAML2/POST", data=data - ) - soup = bs(resp.text, features="lxml") - self.session_key = get_session_key(soup) - self._save_session_cookies(cookie_file) - - # Moodle Web Services API - - def get_moodle_wstoken(self): - if not self.session: - raise Exception("You need to login() first.") - params = { - "service": "moodle_mobile_app", - "passport": 1, - "urlscheme": "moodlemobile", - } - # response = self.session.head("https://moodle.rwth-aachen.de/admin/tool/mobile/launch.php", params=params, allow_redirects=False) - - def getCookies(cookie_jar, domain): - # workaround for macos - cookie_dict = cookie_jar.get_dict(domain=domain) - found = ["%s=%s" % (name, value) for (name, value) in cookie_dict.items()] - return ";".join(found) - - conn = http.client.HTTPSConnection("moodle.rwth-aachen.de") - conn.request( - "GET", - "/admin/tool/mobile/launch.php?" + urllib.parse.urlencode(params), - headers={ - "Cookie": getCookies(self.session.cookies, "moodle.rwth-aachen.de") - }, - ) - response = conn.getresponse() - - # token is in an app schema, which contains the wstoken base64-encoded along with some other token - location = response.getheader("Location") - if location is None or "token=" not in location: - location_path = urllib.parse.urlparse(location).path if location else None - body_prefix = response.read(1000).decode("utf-8", errors="replace") - conn.close() - - if location_path and location_path.startswith("/admin/tool/policy/"): - logger.critical( - "RWTHmoodle requires you to accept updated policies/terms " - "before syncmymoodle can create a webservice token. Please " - "open https://moodle.rwth-aachen.de/ in your browser, accept " - "the pending policy page, and rerun syncmymoodle." - ) - logger.info( - "Unexpected mobile launch redirect target: " - f"{location_path or ''}" - ) - sys.exit(1) - - if location_path == "/login/index.php": - logger.critical( - "Failed to retrieve the Moodle webservice token because " - "Moodle redirected back to the login page. Your saved " - "session is probably stale or the SSO login did not finish " - "correctly. Delete the cookie file and try again." - ) - logger.info( - "Unexpected mobile launch redirect target: " - f"{location_path or ''}" - ) - sys.exit(1) - - logger.critical( - "Failed to retrieve the Moodle webservice token because Moodle " - "returned an unexpected redirect instead of a token." - ) - logger.info( - "Unexpected mobile launch redirect target: " - f"{location_path or ''}" - ) - if body_prefix: - logger.info( - "Unexpected mobile launch response body (truncated): " - f"{body_prefix}" - ) - sys.exit(1) - - # The redirect looks like moodlemobile://token=BASE64[&...]; isolate the - # token value and decode it defensively so a malformed redirect yields a - # clear message instead of a traceback. - token_base64d = location.split("token=", 1)[1].split("&")[0] - conn.close() - try: - token_parts = base64.b64decode(token_base64d).decode().split(":::") - except (ValueError, UnicodeDecodeError): - token_parts = [] - if len(token_parts) < 2 or not token_parts[1]: - logger.critical( - "Failed to parse the Moodle webservice token from the mobile " - "launch redirect. Your saved session may be stale; delete the " - "cookie file and try again." - ) - sys.exit(1) - self.wstoken = token_parts[1] - return self.wstoken - - def get_all_courses(self): - data = { - "requests[0][function]": "core_enrol_get_users_courses", - "requests[0][arguments]": json.dumps( - {"userid": str(self.user_id), "returnusercount": "0"} - ), - "requests[0][settingfilter]": 1, - "requests[0][settingfileurl]": 1, - "wsfunction": "tool_mobile_call_external_functions", - "wstoken": self.wstoken, - } - params = { - "moodlewsrestformat": "json", - "wsfunction": "tool_mobile_call_external_functions", - } - resp = self.session.post( - "https://moodle.rwth-aachen.de/webservice/rest/server.php", - params=params, - data=data, - ) - return json.loads(resp.json()["responses"][0]["data"]) - - def get_course(self, course_id): - data = { - "courseid": int(course_id), - "moodlewssettingfilter": True, - "moodlewssettingfileurl": True, - "wsfunction": "core_course_get_contents", - "wstoken": self.wstoken, - } - params = { - "moodlewsrestformat": "json", - "wsfunction": "core_course_get_contents", - } - resp = self.session.post( - "https://moodle.rwth-aachen.de/webservice/rest/server.php", - params=params, - data=data, - ) - return resp.json() - - def get_userid(self): - data = { - "moodlewssettingfilter": True, - "moodlewssettingfileurl": True, - "wsfunction": "core_webservice_get_site_info", - "wstoken": self.wstoken, - } - params = { - "moodlewsrestformat": "json", - "wsfunction": "core_webservice_get_site_info", - } - resp = self.session.post( - "https://moodle.rwth-aachen.de/webservice/rest/server.php", - params=params, - data=data, - ) - payload = resp.json() - if not payload.get("userid") or not payload["userprivateaccesskey"]: - logger.critical( - f"Error while getting userid and access key: {json.dumps(payload, indent=4)}" - ) - sys.exit(1) - self.user_id = payload["userid"] - self.user_private_access_key = payload["userprivateaccesskey"] - return self.user_id, self.user_private_access_key - - def get_assignment(self, course_id): - data = { - "courseids[0]": int(course_id), - "includenotenrolledcourses": 1, - "moodlewssettingfilter": True, - "moodlewssettingfileurl": True, - "wsfunction": "mod_assign_get_assignments", - "wstoken": self.wstoken, - } - params = { - "moodlewsrestformat": "json", - "wsfunction": "mod_assign_get_assignments", - } - resp = self.session.post( - "https://moodle.rwth-aachen.de/webservice/rest/server.php", - params=params, - data=data, - ) - courses = resp.json()["courses"] - return courses[0] if courses else None - - def get_assignment_submission_files(self, assignment_id): - data = { - "assignid": assignment_id, - "userid": self.user_id, - "moodlewssettingfilter": True, - "moodlewssettingfileurl": True, - "wsfunction": "mod_assign_get_submission_status", - "wstoken": self.wstoken, - } - - params = { - "moodlewsrestformat": "json", - "wsfunction": "mod_assign_get_submission_status", - } - - response = self.session.post( - "https://moodle.rwth-aachen.de/webservice/rest/server.php", - params=params, - data=data, - ) - - logger.info(f"------ASSIGNMENT-{assignment_id}-DATA------") - logger.info(response.text) - - payload = response.json() - files = payload.get("lastattempt", {}).get("submission", {}).get("plugins", []) - files += ( - payload.get("lastattempt", {}).get("teamsubmission", {}).get("plugins", []) - ) - files += payload.get("feedback", {}).get("plugins", []) - - files = [ - f.get("files", []) - for p in files - for f in p.get("fileareas", []) - if f["area"] in ["download", "submission_files", "feedback_files"] - ] - files = [f for folder in files for f in folder] - return files - - def get_folders_by_courses(self, course_id): - data = { - "courseids[0]": str(course_id), - "moodlewssettingfilter": True, - "moodlewssettingfileurl": True, - "wsfunction": "mod_folder_get_folders_by_courses", - "wstoken": self.wstoken, - } - - params = { - "moodlewsrestformat": "json", - "wsfunction": "mod_folder_get_folders_by_courses", - } - - response = self.session.post( - "https://moodle.rwth-aachen.de/webservice/rest/server.php", - params=params, - data=data, - ) - folder = response.json()["folders"] - return folder - - def sync(self): - """Retrives the file tree for all courses""" - if not self.session: - raise Exception("You need to login() first.") - if not self.wstoken: - raise Exception("You need to get_moodle_wstoken() first.") - if not self.user_id: - raise Exception("You need to get_userid() first.") - self.root_node = Node("", -1, "Root", None) - - # Syncing all courses - for course in self.get_all_courses(): - course_name = self._format_course_name( - course.get("shortname") or f"course-{course.get('id')}" - ) - course_id = course["id"] - - selected_courses = self.config.get("selected_courses", []) - if selected_courses: - # selected_courses is an explicit allowlist that overrides - # skip_courses (and, below, only_sync_semester). - if not self._course_id_in_filter(course_id, selected_courses): - continue - elif self._course_id_in_filter( - course_id, self.config.get("skip_courses", []) - ): - continue - - semestername = (course.get("idnumber") or "")[:4] or "unknown-semester" - # Skip not selected semesters (selected_courses overrides this) - if ( - not selected_courses - and self.config.get("only_sync_semester", []) - and semestername not in self.config.get("only_sync_semester", []) - ): - continue - - semester_node = [ - s for s in self.root_node.children if s.name == semestername - ] - if len(semester_node) == 0: - semester_node = self.root_node.add_child(semestername, None, "Semester") - else: - semester_node = semester_node[0] - - course_node = semester_node.add_child(course_name, course_id, "Course") - - print(f"Syncing {course_name}...") - course_sections = self.get_course(course_id) - module_names = { - module.get("modname") - for section in course_sections - if isinstance(section, dict) - for module in section.get("modules", []) - } - - assignments = None - if self.config.get("used_modules", {}).get("assign", {}) and ( - "assign" in module_names - ): - assignments = self.get_assignment(course_id) - assignments_by_cmid = { - assignment["cmid"]: assignment - for assignment in ((assignments or {}).get("assignments") or []) - if "cmid" in assignment - } - - folders = [] - if self.config.get("used_modules", {}).get("folder", {}) and ( - "folder" in module_names - ): - folders = self.get_folders_by_courses(course_id) - folders_by_coursemodule = { - folder.get("coursemodule"): folder for folder in folders - } - - logger.info("-----------------------") - logger.info(f"------{semestername} - {course_name}------") - logger.info("------COURSE-DATA------") - logger.info(json.dumps(course)) - logger.info("------ASSIGNMENT-DATA------") - logger.info(json.dumps(assignments)) - logger.info("------FOLDER-DATA------") - logger.info(json.dumps(folders)) - - for section in course_sections: - if isinstance(section, str): - logger.error(f"Error syncing section in {course_name}: {section}") - continue - if self._should_skip_section(section, course_id): - continue - logger.info("------SECTION-DATA------") - logger.info(json.dumps(section)) - section_node = course_node.add_child( - section["name"], section["id"], "Section" - ) - for module in section["modules"]: - try: - if self._should_skip_module(module, course_id): - continue - - # Get Assignments - if module["modname"] == "assign" and self.config.get( - "used_modules", {} - ).get("assign", {}): - ass = assignments_by_cmid.get(module["id"]) - if not ass: - continue - assignment_id = ass["id"] - assignment_name = module["name"] - assignment_node = section_node.add_child( - assignment_name, assignment_id, "Assignment" - ) - - assignment_intro = ass.get("intro") - if assignment_intro: - self.scanForLinks( - assignment_intro, - assignment_node, - course_id, - module_title=assignment_name, - ) - - ass = ass[ - "introattachments" - ] + self.get_assignment_submission_files(assignment_id) - for c in ass: - if self._should_skip_url( - c.get("fileurl"), "assignment file" - ): - continue - self._add_moodle_file_node( - assignment_node, - c.get("filepath", "/"), - c["filename"], - c["fileurl"], - "Assignment File", - c["fileurl"], - timemodified=c.get("timemodified"), - ) - - # Get Resources or URLs - if module["modname"] in [ - "resource", - "url", - "book", - "page", - "pdfannotator", - ]: - if module["modname"] == "resource" and not self.config.get( - "used_modules", {} - ).get("resource", {}): - continue - for c in module.get("contents", []): - file_url = c.get("fileurl") - if not file_url: - continue - if self._should_skip_url(file_url, "resource link"): - continue - if self._is_direct_moodle_file_content(module, c): - self._add_moodle_content_file_node(section_node, c) - elif not ( - module["modname"] == "page" - and c.get("filename") == "index.html" - ): - self.scanForLinks( - file_url, - section_node, - course_id, - single=True, - module_title=module["name"], - ) - - # Get Folders - if module["modname"] == "folder" and self.config.get( - "used_modules", {} - ).get("folder", {}): - folder_node = section_node.add_child( - module["name"], module["id"], "Folder" - ) - - # Scan intro for links - folder_info = folders_by_coursemodule.get(module["id"]) - if folder_info and folder_info.get("intro"): - self.scanForLinks( - folder_info["intro"], folder_node, course_id - ) - - for c in module.get("contents", []): - if self._should_skip_url( - c.get("fileurl"), "folder file" - ): - continue - self._add_moodle_file_node( - folder_node, - c.get("filepath", "/"), - c["filename"], - c["fileurl"], - "Folder File", - c["fileurl"], - timemodified=c.get("timemodified"), - ) - - # Get embedded videos in pages or labels - if module["modname"] in [ - "page", - "label", - "h5pactivity", - ] and self.config.get("used_modules", {}).get("url", {}): - if module["modname"] == "page": - opencast_enabled = ( - self.config.get("used_modules", {}) - .get("url", {}) - .get("opencast", {}) - ) - html_url = ( - module.get("url") - or f'https://moodle.rwth-aachen.de/mod/page/view.php?id={module["id"]}' - ) - scan_page_links = not self.config.get( - "nolinks" - ) and not self._should_skip_url(html_url, "page link") - if opencast_enabled or scan_page_links: - try: - response = self.session.get(html_url) - except Exception: - logger.exception( - "Failed to fetch page module %s", - module["id"], - ) - response = None - if response and not ( - 200 <= response.status_code < 300 - ): - logger.warning( - "Page module %s returned status %s", - module["id"], - response.status_code, - ) - response = None - if response: - if opencast_enabled: - html = bs( - response.text, - features="lxml", - ) - for iframe in html.find_all("iframe"): - iframe_src = iframe.get("src") - if not iframe_src: - continue - iframe_src = urllib.parse.urljoin( - response.url or html_url, - iframe_src, - ) - vid_id = ( - self._extract_opencast_episode_id( - iframe_src - ) - ) - if not vid_id: - continue - if not self._authenticate_opencast_episode( - course_id, vid_id - ): - continue - vid = self.extractTrackFromEpisode( - vid_id - ) - if not vid: - continue - - if self._should_skip_url( - vid, "Opencast video URL" - ): - continue - - section_node.add_child( - module["name"], - vid_id, - "Opencast", - url=vid, - additional_info=course_id, - ) - - if scan_page_links: - self._scan_html_text_for_links( - response.text, - response.url or html_url, - section_node, - course_id, - module_title=module["name"], - ) - # "Interactive" h5p videos - elif module["modname"] == "h5pactivity": - html_url = f'https://moodle.rwth-aachen.de/mod/h5pactivity/view.php?id={module["id"]}' - html = bs( - self.session.get(html_url).text, - features="lxml", - ) - # Get h5p iframe - iframe = html.find("iframe") - iframe_src = iframe.get("src") if iframe else None - if iframe_src: - iframe_src = urllib.parse.urljoin( - html_url, iframe_src - ) - iframe_html = str( - bs( - self.session.get(iframe_src).text, - features="lxml", - ) - ) - # Moodle devs dont know how to use CDATA correctly, so we need to remove all backslashes - sanitized_html = iframe_html.replace("\\", "") - else: - # H5P outside iframes - sanitized_html = str(html).replace("\\", "") - - self.scanForLinks( - sanitized_html, - section_node, - course_id, - module_title=module["modname"], - single=False, - ) - else: - self.scanForLinks( - module.get("description", ""), - section_node, - course_id, - module_title=module["name"], - ) - - # New OpenCast integration - if module["modname"] == "lti" and self.config.get( - "used_modules", {} - ).get("url", {}).get("opencast", {}): - info_url = f'https://moodle.rwth-aachen.de/mod/lti/launch.php?id={module["id"]}&triggerview=0' - try: - info_response = self.session.get(info_url) - except Exception: - logger.exception( - "Opencast: failed to fetch LTI module %s", - module["id"], - ) - continue - if not (200 <= info_response.status_code < 300): - logger.warning( - "Opencast: LTI module %s returned status %s", - module["id"], - info_response.status_code, - ) - self._log_opencast_backend_issue(info_response.text) - continue - - info_res = bs(info_response.text, features="lxml") - - engage_series_id = self._get_input_value( - info_res, "custom_series" - ) - engage_single_id = self._get_input_value( - info_res, "custom_id" - ) - name = ( - self._get_input_value(info_res, "resource_link_title") - or module["name"] - ) - engage_data = self._extract_lti_form_data(info_res) - - if engage_series_id: - # Found an Opencast "series" page - series_id = engage_series_id - - series_node = course_node.add_child( - name, series_id, "Section" - ) - - if not self._submit_opencast_lti_form( - engage_data, f"LTI series module {module['id']}" - ): - continue - - series_url = f"https://engage.streaming.rwth-aachen.de/search/episode.json?limit=100&offset=0&sid={series_id}" - series_response = self._fetch_opencast_json( - series_url, f"series {series_id}" - ) - if series_response is None: - continue - - for episode in self._get_opencast_result_list( - series_response, f"series {series_id}" - ): - if not isinstance(episode, dict): - continue - mediapackage = episode.get("mediapackage", {}) - if not isinstance(mediapackage, dict): - continue - episode_id = mediapackage.get("id") - if not episode_id: - logger.warning( - "Opencast: series %s contains episode without id", - series_id, - ) - continue - vid = self.extractTrackFromEpisode(episode_id) - if not vid: - continue - if self._should_skip_url(vid, "Opencast video URL"): - continue - series_node.add_child( - mediapackage.get("title") or episode_id, - episode_id, - "Opencast", - url=vid, - additional_info=module["id"], - ) - else: - if not engage_single_id: - logger.info( - "Failed to find either custom_id or custom_series on lti page." - ) - logger.info("------LTI-ERROR-HTML------") - logger.info(f"url: {info_url}") - logger.info(info_res) - else: - if not self._submit_opencast_lti_form( - engage_data, f"LTI module {module['id']}" - ): - continue - vid = self.extractTrackFromEpisode(engage_single_id) - if not vid: - continue - if self._should_skip_url(vid, "Opencast video URL"): - continue - section_node.add_child( - name, - engage_single_id, - "Opencast", - url=vid, - additional_info=module["id"], - ) - # Integration for Quizzes - if module["modname"] == "quiz" and self.config.get( - "used_modules", {} - ).get("url", {}).get("quiz", {}): - info_url = f'https://moodle.rwth-aachen.de/mod/quiz/view.php?id={module["id"]}' - info_res = bs( - self.session.get(info_url).text, features="lxml" - ) - attempts = info_res.find_all( - "a", - { - "title": "Überprüfung der eigenen Antworten dieses Versuchs" - }, - ) - attempt_cnt = 0 - for attempt in attempts: - attempt_cnt += 1 - review_url = attempt.get("href") - quiz_res = bs( - self.session.get(review_url).text, - features="lxml", - ) - name = ( - quiz_res.find("title") - .get_text() - .replace(": Überprüfung des Testversuchs", "") - + ", Versuch " - + str(attempt_cnt) - ) - section_node.add_child( - self.sanitize(name), - urllib.parse.urlparse(review_url)[1], - "Quiz", - url=review_url, - ) - - except Exception: - logger.exception(f"Failed to download the module {module}") - - self.root_node.remove_children_nameclashes() - - def download_all_files(self): - if not self.session: - raise Exception("You need to login() first.") - if not self.wstoken: - raise Exception("You need to get_moodle_wstoken() first.") - if not self.user_id: - raise Exception("You need to get_userid() first.") - if not self.root_node: - raise Exception("You need to sync() first.") - - self._download_all_files(self.root_node) - - def _download_all_files(self, cur_node): - if len(cur_node.children) == 0: - if cur_node.url and not cur_node.is_downloaded: - if cur_node.type == "Youtube": - try: - self.scanAndDownloadYouTube(cur_node) - cur_node.is_downloaded = True - except Exception: - logger.exception(f"Failed to download the module {cur_node}") - logger.error( - "This could be caused by an out of date yt-dlp version. Try upgrading yt-dlp through pip or your package manager." - ) - elif cur_node.type == "Opencast": - try: - # download Opencast videos - if ".mp4" not in cur_node.name: - if cur_node.name is not None and cur_node.name != "": - cur_node.name += ".mp4" - else: - cur_node.name = cur_node.url.split("/")[-1] - if self.download_file(cur_node): - cur_node.is_downloaded = True - except Exception: - logger.exception(f"Failed to download the module {cur_node}") - elif cur_node.type == "Quiz": - logger.warning( - "Skipping quiz PDF generation for %s because it is disabled " - "for security.", - cur_node.name, - ) - else: - try: - if self.download_file(cur_node): - cur_node.is_downloaded = True - except Exception: - logger.exception(f"Failed to download the module {cur_node}") - return - - for child in cur_node.children: - self._download_all_files(child) - - def get_sanitized_node_path(self, node: Node) -> Path: - basedir = Path(self.config.get("basedir", "./")).expanduser() - path_segments = [] - for part in node.get_path(): - if part == "": - continue - sanitized = self.sanitize(part) - if sanitized in {"", ".", ".."}: - sanitized = "_" - path_segments.append(sanitized) - - target_path = basedir.joinpath(*path_segments) - resolved_basedir = basedir.resolve(strict=False) - resolved_target = target_path.resolve(strict=False) - if not resolved_target.is_relative_to(resolved_basedir): - raise ValueError(f"Refusing to write outside basedir: {target_path}") - return target_path - - def sanitize(self, path): - path = urllib.parse.unquote(path) - path = "".join([s for s in path if s not in self.invalid_chars]) - while path and path[-1] == " ": - path = path[:-1] - while path and path[0] == " ": - path = path[1:] - - # Folders downloaded from Moodle display amp; in places where an - # ampersand should be displayed instead. In the web UI, however, the - # ampersand is shown correctly, and we're trying to emulate that here. - path = path.replace("amp;", "&") - - return path - - def _content_type_without_parameters(self, response): - content_type = response.headers.get("Content-Type", "") - return content_type.split(";", 1)[0].strip().lower() - - def _node_allows_html_download(self, node): - html_suffixes = {".htm", ".html", ".xhtml"} - node_suffix = Path(str(node.name or "")).suffix.lower() - url_suffix = Path( - urllib.parse.urlparse(str(node.url or "")).path - ).suffix.lower() - return node_suffix in html_suffixes or url_suffix in html_suffixes - - def _chunk_looks_like_html(self, chunk): - body_start = chunk.lstrip().lower() - return body_start.startswith(b" - - - - - - -""" - headers = { - **auth_header, - "Depth": "1", - "Content-Type": "application/xml", - } - try: - propfind_response = self.session.request( - "PROPFIND", - sciebo_url + href, - headers=headers, - data=propfind_body, - ) - except Exception: - logger.exception( - "Sciebo PROPFIND failed for href %s (share %s)", - href, - sharingToken, - ) - return - - if not (200 <= propfind_response.status_code < 300): - logger.warning( - "Sciebo PROPFIND returned status %s for href %s (share %s)", - propfind_response.status_code, - href, - sharingToken, - ) - return - - # parse the response - soup_xml = bs(propfind_response.text, features="xml") - - for resp in soup_xml.find_all("d:response"): - # get the href of the response - href_tag = resp.find("d:href") - if href_tag is None or not href_tag.text: - continue - new_href = href_tag.text - - if new_href == href: - logger.info( - "Sciebo: skipping %s because it is the current folder", - new_href, - ) - continue - - # Extract a stable content hash for this item. Prefer the - # SHA1 checksum from oc:checksums if available; fall back - # to the raw ETag otherwise. - etag_value = None - prop = resp.find("d:prop") - if prop is not None: - checksums_tag = prop.find("oc:checksums") - if checksums_tag is not None: - for cs in checksums_tag.find_all("oc:checksum"): - text = (cs.text or "").strip() - if text.upper().startswith("SHA1:"): - etag_value = text.split(":", 1)[1] - break - - if etag_value is None: - etag_tag = prop.find("d:getetag") - if etag_tag and etag_tag.text: - etag_value = etag_tag.text.strip() - - logger.info(f"Sciebo response href: {new_href}") - # get the displayname of the response - displayname = ( - new_href.split("/")[-2] - if new_href.endswith("/") - else new_href.split("/")[-1] - ) - displayname = ( - f"sciebo-{sharingToken}" - if displayname == "webdav" - else displayname - ) - - # check if the response is a folder - if new_href.endswith("/"): - # create a new node for the folder - folder_node = parent_node.add_child( - displayname, None, "Sciebo Folder", etag=etag_value - ) - # recursive call to get all files in the folder - get_sciebo_files( - new_href, folder_node, sharingToken, auth_header - ) - else: - # create a new node for the file - parent_node.add_child( - displayname, - None, - "Sciebo File", - url=sciebo_url + new_href, - additional_info=auth_header, - etag=etag_value, - ) - - get_sciebo_files( - webdav_location, sciebo_root, sharingToken, auth_header - ) - self._sciebo_link_cache[link] = sciebo_root.clone() - - -def main(): - parser = ArgumentParser( - prog="python3 -m syncmymoodle", - description="Synchronization client for RWTH Moodle. All optional arguments override those in config.json.", - ) - - if keyring: - parser.add_argument( - "--secretservice", - action="store_true", - help="Use system's keyring for storing and retrieving account credentials", - ) - parser.add_argument( - "--secretservicetotpsecret", - action="store_true", - help="Save TOTP secret in keyring", - ) - - parser.add_argument( - "--user", default=None, help="set your RWTH Single Sign-On username" - ) - parser.add_argument( - "--password", default=None, help="set your RWTH Single Sign-On password" - ) - parser.add_argument( - "--totp", - default=None, - help="set your RWTH Single Sign-On TOTP provider's serial number (see https://idm.rwth-aachen.de/selfservice/MFATokenManager)", - ) - parser.add_argument( - "--totpsecret", - default=None, - help="(optional) set your RWTH Single Sign-On TOTP provider Secret", - ) - parser.add_argument("--config", default=None, help="set your configuration file") - parser.add_argument( - "--cookiefile", default=None, help="set the location of a cookie file" - ) - parser.add_argument( - "--courses", - default=None, - help="specify the courses that should be synced using comma-separated links. Defaults to all courses, if no additional restrictions e.g. semester are defined.", - ) - parser.add_argument( - "--skipcourses", - default=None, - help="exclude specific courses using comma-separated links. Defaults to None.", - ) - parser.add_argument( - "--semester", - default=None, - help="specify semesters to be synced e.g. `22s`, comma-separated. Defaults to all semesters, if no additional restrictions e.g. courses are defined.", - ) - parser.add_argument( - "--basedir", - default=None, - help="specify the directory where all files will be synced", - ) - parser.add_argument( - "--courseprefix", - choices=COURSE_PREFIX_HANDLING_OPTIONS, - default=None, - help=( - "handle leading two-character course prefixes in local folder names: " - "'keep' (default), 'remove', or 'suffix'" - ), - ) - parser.add_argument( - "--nolinks", - action="store_true", - help="define whether various links in moodle pages should also be inspected e.g. youtube videos, wikipedia articles", - ) - parser.add_argument( - "--excludefiletypes", - default=None, - help='specify whether specific file types should be excluded, comma-separated e.g. "mp4,mkv"', - ) - parser.add_argument( - "--updatefiles", - action="store_true", - help="define whether modified files with the same name/path should be redownloaded", - ) - parser.add_argument( - "--updatefilesconflict", - choices=["rename", "keep", "overwrite"], - default=None, - help=( - "define how to handle locally modified files when updating: " - "'rename' (default) moves the old file aside, 'keep' skips the " - "update, 'overwrite' replaces the local file" - ), - ) - parser.add_argument( - "-v", - "--verbose", - action="store_const", - dest="loglevel", - const=logging.INFO, - default=logging.WARNING, - help="show information useful for debugging", - ) - args = parser.parse_args() - - if args.config: - overwrite_config = Path(args.config) - if overwrite_config.is_file(): - with overwrite_config.open() as f: - config = json.load(f) - else: - config = {} - - global_config = ( - Path(os.environ.get("XDG_CONFIG_HOME", Path("~/.config").expanduser())) - / "syncmymoodle" - / "config.json" - ) - if global_config.is_file(): - with global_config.open() as f: - config.update(json.load(f)) - - local_config = Path("config.json") - if local_config.is_file(): - with local_config.open() as f: - config.update(json.load(f)) - - config["user"] = args.user or config.get("user") - config["password"] = args.password or config.get("password") - config["totp"] = args.totp or config.get("totp") - config["totpsecret"] = args.totpsecret or config.get("totpsecret") - config["cookie_file"] = args.cookiefile or config.get("cookie_file", "./session") - config["selected_courses"] = ( - args.courses.split(",") if args.courses else config.get("selected_courses", []) - ) - config["only_sync_semester"] = ( - args.semester.split(",") - if args.semester - else config.get("only_sync_semester", []) - ) - config["basedir"] = args.basedir or config.get("basedir", "./") - config["course_prefix_handling"] = args.courseprefix or config.get( - "course_prefix_handling", "keep" - ) - config["use_secret_service"] = ( - args.secretservice if keyring else None - ) or config.get("use_secret_service") - config["secret_service_store_totp_secret"] = ( - args.secretservicetotpsecret if keyring else None - ) or config.get("secret_service_store_totp_secret") - config["skip_courses"] = ( - args.skipcourses.split(",") - if args.skipcourses - else config.get("skip_courses", []) - ) - config["nolinks"] = args.nolinks or config.get("no_links") - config["used_modules"] = config.get("used_modules") or { - "assign": True, - "resource": True, - "url": {"youtube": True, "opencast": True, "sciebo": True, "quiz": False}, - "folder": True, - } - config["exclude_filetypes"] = ( - args.excludefiletypes.split(",") - if args.excludefiletypes - else config.get("exclude_filetypes", []) - ) - config["exclude_files"] = config.get("exclude_files", []) - config["exclude_links"] = config.get("exclude_links", []) - config["allowed_domains"] = config.get("allowed_domains", []) - config["exclude_sections"] = config.get( - "exclude_sections", config.get("skip_sections", []) - ) - config["exclude_modules"] = config.get( - "exclude_modules", config.get("skip_modules", []) - ) - config["updatefiles"] = args.updatefiles or config.get("update_files", False) - config["update_files_conflict"] = args.updatefilesconflict or config.get( - "update_files_conflict", "rename" - ) - - logging.basicConfig(level=args.loglevel) - - if config["used_modules"]["url"].get("quiz"): - config["used_modules"]["url"]["quiz"] = False - logger.warning( - "Quiz PDF generation is disabled until the pdfkit/wkhtmltopdf " - "renderer is replaced with a safer implementation." - ) - - if keyring and config.get("use_secret_service"): - if config.get("password"): - logger.critical("You need to remove your password from your config file!") - sys.exit(1) - - if config.get("secret_service_store_totp_secret") and config.get("totpsecret"): - logger.critical("You need to remove your totpsecret from your config file!") - sys.exit(1) - - if not args.user and not config.get("user"): - print( - "You need to provide your username in the config file or through --user!" - ) - sys.exit(1) - - if ( - config.get("secretservicetotpsecret") - and not args.totp - and not config.get("totp") - ): - print( - "You need to provide your TOTP provider in the config file or through --totp!" - ) - sys.exit(1) - - config["password"] = keyring.get_password("syncmymoodle", config.get("user")) - if config["password"] is None: - if args.password: - password = args.password - else: - password = getpass.getpass("Password:") - keyring.set_password("syncmymoodle", config.get("user"), password) - config["password"] = password - - if config.get("secret_service_store_totp_secret"): - config["totpsecret"] = keyring.get_password( - "syncmymoodle", config.get("totp") - ) - if config["totpsecret"] is None: - if args.totpsecret: - totpsecret = args.totpsecret - else: - totpsecret = getpass.getpass("TOTP-Secret:") - keyring.set_password("syncmymoodle", config.get("totp"), totpsecret) - config["totpsecret"] = totpsecret - - if not config.get("user") or not config.get("password"): - logger.critical( - "You need to specify your username and password in the config file or as an argument!" - ) - sys.exit(1) - - if not config.get("totp"): - logger.critical( - "You need to specify your TOTP generator in the config file or as an argument!" - ) - sys.exit(1) - - smm = SyncMyMoodle(config) - - print("Logging in...") - smm.login() - smm.get_moodle_wstoken() - smm.get_userid() - print("Syncing file tree...") - smm.sync() - print("Downloading files...") - smm.download_all_files() - print("Saving root node as cache...") - smm.cache_root_node() - - # If we saw multiple Opencast backend errors send a reminder - # to check the RWTH ITC status page before filing a bug. - try: - if smm._opencast_error_count >= 5: - logger.warning( - "Multiple Opencast backend errors occurred. Please check the RWTH " - "ITC status page before reporting an issue on GitHub: " - "https://maintenance.itc.rwth-aachen.de/ticket/status/messages/499" - ) - except Exception: - # Never let summary logging break the main flow. - pass - +from syncmymoodle.cli import main if __name__ == "__main__": - main() + raise SystemExit(main()) diff --git a/syncmymoodle/app.py b/syncmymoodle/app.py new file mode 100644 index 0000000..ce650ff --- /dev/null +++ b/syncmymoodle/app.py @@ -0,0 +1,2533 @@ +import base64 +import hashlib +import http.client +import json +import logging +import os +import re +import sys +import time +import urllib.parse +from contextlib import closing +from fnmatch import fnmatchcase +from pathlib import Path + +import requests +import yt_dlp +from bs4 import BeautifulSoup as bs +from tqdm import tqdm + +from syncmymoodle.constants import ( + MOODLE_URL, + OPENCAST_LINK_RE, + RWTH_DISRUPTIVE_STATUS_CLASSES, + RWTH_HOMEPAGE_URL, + RWTH_MOODLE_STATUS_URL, + RWTH_SSO_STATUS_URL, + RWTH_STATUS_URL, + SCIEBO_LINK_RE, + YOUTUBE_ID_LENGTH, + YOUTUBE_LINK_RE, +) +from syncmymoodle.context import SyncContext +from syncmymoodle.filters import ( + as_list, + configured_patterns, + course_id_in_filter, + domain_matches, + format_course_name, + matches_any_pattern, + should_skip_module, + should_skip_section, + should_skip_url, +) +from syncmymoodle.node import NAME_CLASH_ID_UNSET, Node +from syncmymoodle.pathing import ( + get_sanitized_node_path, + make_conflict_path, + sanitize_path_part, +) +from syncmymoodle.storage import ( + load_cookies_from_data, + read_private_gzip_json, + save_session_cookies, + write_private_gzip_json, +) +from syncmymoodle.totp import totp as generate_totp + +logger = logging.getLogger(__name__) + + +class SyncMyMoodle: + params = {"lang": "en"} # Titles for some pages differ + block_size = 1024 + invalid_chars = '~"#%&*:<>?/\\{|}' + + def __init__(self, config): + self.ctx = SyncContext(config=config) + + @property + def config(self): + return self.ctx.config + + @config.setter + def config(self, value): + self.ctx.config = value + + @property + def session(self): + return self.ctx.session + + @session.setter + def session(self, value): + self.ctx.session = value + + @property + def session_key(self): + return self.ctx.session_key + + @session_key.setter + def session_key(self, value): + self.ctx.session_key = value + + @property + def wstoken(self): + return self.ctx.wstoken + + @wstoken.setter + def wstoken(self, value): + self.ctx.wstoken = value + + @property + def user_id(self): + return self.ctx.user_id + + @user_id.setter + def user_id(self, value): + self.ctx.user_id = value + + @property + def user_private_access_key(self): + return self.ctx.user_private_access_key + + @user_private_access_key.setter + def user_private_access_key(self, value): + self.ctx.user_private_access_key = value + + @property + def root_node(self): + return self.ctx.root_node + + @root_node.setter + def root_node(self, value): + self.ctx.root_node = value + + @property + def _course_caches(self): + return self.ctx.course_caches + + @_course_caches.setter + def _course_caches(self, value): + self.ctx.course_caches = value + + @property + def _opencast_error_count(self): + return self.ctx.opencast_error_count + + @_opencast_error_count.setter + def _opencast_error_count(self, value): + self.ctx.opencast_error_count = value + + @property + def _opencast_status_hint_logged(self): + return self.ctx.opencast_status_hint_logged + + @_opencast_status_hint_logged.setter + def _opencast_status_hint_logged(self, value): + self.ctx.opencast_status_hint_logged = value + + @property + def _sciebo_link_cache(self): + return self.ctx.sciebo_link_cache + + @_sciebo_link_cache.setter + def _sciebo_link_cache(self, value): + self.ctx.sciebo_link_cache = value + + @property + def _opencast_episode_auth_cache(self): + return self.ctx.opencast_episode_auth_cache + + @_opencast_episode_auth_cache.setter + def _opencast_episode_auth_cache(self, value): + self.ctx.opencast_episode_auth_cache = value + + @property + def _opencast_track_cache(self): + return self.ctx.opencast_track_cache + + @_opencast_track_cache.setter + def _opencast_track_cache(self, value): + self.ctx.opencast_track_cache = value + + @property + def _downloaded_paths(self): + if self.ctx.downloaded_paths is None: + raise AttributeError("_downloaded_paths") + return self.ctx.downloaded_paths + + @_downloaded_paths.setter + def _downloaded_paths(self, value): + self.ctx.downloaded_paths = value + + def _match_old_cache_child(self, old_node, child): + """Find the previous cache node corresponding to ``child``, if any.""" + if old_node is None: + return None + candidates = [ + c + for c in getattr(old_node, "children", []) + if c.name == child.name and c.type == child.type + ] + if not candidates: + return None + for candidate in candidates: + if candidate.url == child.url: + return candidate + return candidates[0] + + def _node_to_cache_data(self, node: Node, old_node: Node | None = None): + timemodified = node.timemodified + etag = node.etag + is_downloaded = node.is_downloaded + # If this file was not (re)downloaded this run but a previously + # downloaded version is still on disk, keep the previously cached version + # markers. Otherwise the cache would record Moodle's new timemodified/etag + # for a file we never actually fetched, which either skips the file + # forever or moves the on-disk copy aside as a spurious conflict on the + # next run's retry. + if ( + not node.is_downloaded + and old_node is not None + and getattr(old_node, "is_downloaded", False) + and self.get_sanitized_node_path(node).exists() + ): + timemodified = getattr(old_node, "timemodified", None) + etag = getattr(old_node, "etag", None) + is_downloaded = True + return { + "name": node.name, + "id": node.id, + "type": node.type, + "url": node.url, + "timemodified": timemodified, + "etag": etag, + "name_clash_id": node.name_clash_id, + "is_downloaded": is_downloaded, + "children": [ + self._node_to_cache_data( + child, self._match_old_cache_child(old_node, child) + ) + for child in node.children + ], + } + + def _node_from_cache_data(self, data, parent=None): + node = Node( + data.get("name", ""), + data.get("id"), + data.get("type", "Unknown"), + parent, + url=data.get("url"), + timemodified=data.get("timemodified"), + etag=data.get("etag"), + name_clash_id=data.get("name_clash_id", NAME_CLASH_ID_UNSET), + is_downloaded=data.get("is_downloaded", False), + ) + node.children = [ + self._node_from_cache_data(child, node) + for child in data.get("children", []) + if isinstance(child, dict) + ] + return node + + def cache_root_node(self): + """Persist per-course caches into .syncmymoodle_cache files. + + Each course directory beneath basedir receives its own cache file + containing the course subtree, which makes caching less brittle than + a single global root cache. + """ + if not self.root_node: + return + + for semester_node in self.root_node.children: + if semester_node.type != "Semester": + continue + for course_node in semester_node.children: + if course_node.type != "Course": + continue + course_path = self.get_sanitized_node_path(course_node) + # Read the previous course cache before overwriting it, so we can + # preserve version markers for files that were not downloaded + # this run (see _node_to_cache_data). + old_course_root = self._get_course_cache_root(course_node) + course_path.mkdir(parents=True, exist_ok=True) + cache_path = course_path / ".syncmymoodle_cache" + write_private_gzip_json( + cache_path, + { + "format": "syncmymoodle.course-cache.v1", + "course": self._node_to_cache_data( + course_node, old_course_root + ), + }, + ) + + def _ensure_timemodified_attribute(self, node): + # Old cached root nodes might not have the timemodified attribute yet. + if not hasattr(node, "timemodified"): + node.timemodified = None + if not hasattr(node, "etag"): + node.etag = None + if not hasattr(node, "name_clash_id"): + node.name_clash_id = getattr(node, "id", None) + for child in getattr(node, "children", []): + self._ensure_timemodified_attribute(child) + + def _get_course_node(self, node: Node) -> Node: + """Return the enclosing course node for the given node.""" + cur = node + while cur is not None and cur.parent is not None: + if cur.type == "Course": + return cur + cur = cur.parent + raise Exception("Node is not part of a course subtree") + + def _get_course_cache_root(self, course_node: Node): + """Load and return the cached course root for the given course node.""" + course_path = self.get_sanitized_node_path(course_node) + if course_path in self._course_caches: + return self._course_caches[course_path] + + cache_path = course_path / ".syncmymoodle_cache" + if not cache_path.exists(): + return None + + payload = read_private_gzip_json(cache_path, "course cache") + if not isinstance(payload, dict): + return None + if payload.get("format") != "syncmymoodle.course-cache.v1": + logger.warning("Ignoring unsupported course cache format: %s", cache_path) + return None + course_data = payload.get("course") + if not isinstance(course_data, dict): + return None + + cached_course_root = self._node_from_cache_data(course_data) + self._ensure_timemodified_attribute(cached_course_root) + + self._course_caches[course_path] = cached_course_root + return cached_course_root + + def _get_old_node_for(self, node: Node): + """Return the cached node for this node from the course cache, if any.""" + try: + course_node = self._get_course_node(node) + except Exception: + return None + + cached_course_root = self._get_course_cache_root(course_node) + if cached_course_root is None: + return None + + full_path = node.get_path() + course_path = course_node.get_path() + # Compute the path segments beneath the course root + rel_segments = full_path[len(course_path) :] + if not rel_segments: + return cached_course_root + + try: + return cached_course_root.go_to_path(rel_segments) + except Exception: + return None + + def _get_or_add_child(self, parent_node, name, id, type): + for child in parent_node.children: + if child.name == name and child.type == type: + return child + return parent_node.add_child(name, id, type) + + def _add_moodle_file_node( + self, + parent_node, + moodle_filepath, + filename, + id, + type, + url, + timemodified=None, + name_clash_id=NAME_CLASH_ID_UNSET, + ): + target_node = parent_node + path_segments = [ + self.sanitize(segment) + for segment in str(moodle_filepath or "").strip("/").split("/") + if segment + ] + + for segment in path_segments: + target_node = self._get_or_add_child(target_node, segment, None, "Folder") + if target_node is None: + return None + + return target_node.add_child( + filename, + id, + type, + url=url, + timemodified=timemodified, + name_clash_id=name_clash_id, + ) + + def _add_moodle_content_file_node(self, parent_node, content, file_type=None): + file_url = content.get("fileurl") + if not file_url: + return None + + mimetype = content.get("mimetype") or "unknown" + filename = urllib.parse.urlsplit(file_url).path.split("/")[-1] + if not filename: + filename = content.get("filename") + return self._add_moodle_file_node( + parent_node, + "/", + filename, + file_url, + file_type or f"Linked file [{mimetype}]", + file_url, + timemodified=content.get("timemodified"), + name_clash_id=None, + ) + + def _is_direct_moodle_file_content(self, module, content): + file_url = content.get("fileurl") + if not file_url or content.get("type") != "file": + return False + + mimetype = str(content.get("mimetype") or "").split(";", 1)[0].lower() + if not mimetype or mimetype in { + "document/unknown", + "unknown", + "text/html", + "application/xhtml+xml", + }: + return False + if mimetype.startswith("text/"): + return False + + modname = module.get("modname") + if modname in {"resource", "pdfannotator"}: + return True + + # Page modules often expose their rendered body as index.html. Keep + # that path in the HTML scanner, but direct-add binary attachments. + if modname == "page" and content.get("filename") != "index.html": + return True + + return False + + def _scan_html_text_for_links( + self, html_text, base_url, parent_node, course_id, module_title=None + ): + if "video-js" in html_text and " bool: + return course_id_in_filter(course_id, entries) + + def _configured_patterns(self, *keys, course_id=None): + return configured_patterns(self.config, *keys, course_id=course_id) + + def _format_course_name(self, course_name): + return format_course_name(course_name, self.config, logger) + + def _matches_any_pattern(self, values, patterns): + return matches_any_pattern(values, patterns) + + def _domain_matches(self, netloc, allowed_domain): + return domain_matches(netloc, allowed_domain) + + def _should_skip_url(self, url, context="link"): + return should_skip_url(self.config, url, context, logger) + + def _should_skip_section(self, section, course_id): + return should_skip_section(self.config, section, course_id, logger) + + def _should_skip_module(self, module, course_id): + return should_skip_module(self.config, module, course_id, logger) + + def _make_conflict_path(self, path: Path) -> Path: + return make_conflict_path(path) + + def _local_file_matches_etag(self, path: Path, etag: str) -> bool: + """Return True if the local file content matches the given ETag hash. + + We currently support strong ETags that contain a plain hex digest for + MD5 (32 chars), SHA1 (40 chars) or SHA256 (64 chars). Other formats are + ignored and treated as non-matching. + """ + # Extract a plausible hex digest from the ETag value, ignoring weak + # prefixes (W/) and surrounding quotes or algorithm markers. + match = re.search(r"([0-9a-fA-F]{32,64})", etag) + if not match: + return False + hex_str = match.group(1).lower() + + algo = None + if len(hex_str) == 32: + algo = "md5" + elif len(hex_str) == 40: + algo = "sha1" + elif len(hex_str) == 64: + algo = "sha256" + else: + return False + + with path.open("rb") as f: + digest = hashlib.file_digest(f, algo) + return digest.hexdigest() == hex_str + + def _log_opencast_backend_issue(self, response_body: str | None = None) -> None: + """Log additional context for repeated Opencast backend issues. + + We keep the response body at INFO level (only shown with --verbose) and + emit a hint to the RWTH ITC status page once the error + counter exceeds a small threshold. + """ + self._opencast_error_count += 1 + + if response_body: + logger.info(f"Opencast response body (truncated): {response_body[:1000]}") + + if self._opencast_error_count >= 5 and not self._opencast_status_hint_logged: + logger.warning( + "Multiple Opencast backend errors occurred. Please check the RWTH " + "ITC status page before reporting an issue on GitHub: " + "https://maintenance.itc.rwth-aachen.de/ticket/status/messages/499" + ) + self._opencast_status_hint_logged = True + + def _check_general_connectivity(self): + try: + response = requests.get(RWTH_HOMEPAGE_URL, timeout=10) + except requests.RequestException as exc: + logger.warning( + "General connectivity check to %s failed: %s", + RWTH_HOMEPAGE_URL, + exc, + ) + return False + + if response.status_code >= 500: + logger.warning( + "General connectivity check to %s returned status %s", + RWTH_HOMEPAGE_URL, + response.status_code, + ) + return False + + logger.info("General connectivity check to %s succeeded", RWTH_HOMEPAGE_URL) + return True + + def _current_rwth_service_issues(self, service_name, status_url): + try: + response = requests.get(status_url, timeout=10) + except requests.RequestException as exc: + logger.warning( + "Could not fetch RWTH ITC status page for %s: %s", service_name, exc + ) + return [] + + if not (200 <= response.status_code < 300): + logger.warning( + "RWTH ITC status page for %s returned status %s", + service_name, + response.status_code, + ) + return [] + + soup = bs(response.text, features="lxml") + issues = [] + for card in soup.select(".notification-card"): + indicator = card.select_one(".notification-status-indicator") + status_label = card.select_one(".incident_queue-statuses div") + if indicator and "old" in indicator.get("class", []): + continue + if status_label and "old" in status_label.get("class", []): + continue + + status_classes = set(status_label.get("class", []) if status_label else []) + if not status_classes.intersection(RWTH_DISRUPTIVE_STATUS_CLASSES): + continue + + title = card.select_one(".report_title h3") + issue_link = card.select_one("[id^=link-to-copy-]") + issues.append( + { + "service": service_name, + "status": ( + status_label.get_text(" ", strip=True) + if status_label + else "Status issue" + ), + "title": ( + title.get_text(" ", strip=True) + if title + else "Current service issue" + ), + "url": ( + issue_link.get_text(" ", strip=True) + if issue_link + else status_url + ), + } + ) + return issues + + def _check_rwth_status_page(self): + logger.warning("Check the RWTH ITC status page: %s", RWTH_STATUS_URL) + issues = [] + for service_name, status_url in [ + ("RWTHmoodle", RWTH_MOODLE_STATUS_URL), + ("RWTH Single Sign-On", RWTH_SSO_STATUS_URL), + ]: + issues.extend(self._current_rwth_service_issues(service_name, status_url)) + + if not issues: + logger.info( + "No current RWTHmoodle or RWTH Single Sign-On outage was found " + "on the RWTH ITC status pages" + ) + return + + for issue in issues: + logger.warning( + "%s may currently be affected: %s - %s. See %s", + issue["service"], + issue["status"], + issue["title"], + issue["url"], + ) + + def _check_moodle_availability(self): + if not self.session: + raise Exception("You need a requests session first.") + + try: + response = self.session.get(MOODLE_URL, timeout=15) + except requests.RequestException as exc: + logger.critical("Could not reach RWTHmoodle at %s: %s", MOODLE_URL, exc) + self._check_general_connectivity() + self._check_rwth_status_page() + sys.exit(1) + + if response.status_code >= 500: + logger.critical( + "RWTHmoodle returned status %s before login", + response.status_code, + ) + self._check_rwth_status_page() + sys.exit(1) + + if response.status_code >= 400: + logger.warning( + "RWTHmoodle availability check returned status %s; login may fail", + response.status_code, + ) + self._check_rwth_status_page() + + return response + + # RWTH SSO Login + + def login(self): + def get_session_key(soup): + script = soup.find("script", string=lambda text: text and "sesskey" in text) + match = ( + re.search(r'"sesskey":"(.*?)"', script.text) + if script is not None + else None + ) + if match: + return match.group(1) + else: + logger.critical("Can't retrieve session key from JavaScript config") + sys.exit(1) + + def require_input_value(soup, name, context): + value = self._get_input_value(soup, name) + if value is None: + logger.critical( + "Failed to login: expected form field %r was missing at the " + "%s. The RWTH login flow may have changed or the servers may " + "have difficulties. For current service status, see %s.", + name, + context, + RWTH_STATUS_URL, + ) + self._check_rwth_status_page() + logger.info("-------Login-Error-Soup--------") + logger.info(soup) + sys.exit(1) + return value + + self.session = requests.Session() + cookie_file = Path(self.config.get("cookie_file", "./session")).expanduser() + cookie_payload = read_private_gzip_json(cookie_file, "session cookie") + if cookie_payload is not None: + load_cookies_from_data(self.session.cookies, cookie_payload) + self._check_moodle_availability() + try: + resp = self.session.get( + urllib.parse.urljoin(MOODLE_URL, "auth/shibboleth/index.php"), + timeout=15, + ) + except requests.RequestException as exc: + logger.critical("Could not reach RWTH SSO login endpoint: %s", exc) + self._check_general_connectivity() + self._check_rwth_status_page() + sys.exit(1) + if resp.url.startswith("https://moodle.rwth-aachen.de/my/"): + soup = bs(resp.text, features="lxml") + self.session_key = get_session_key(soup) + save_session_cookies(cookie_file, self.session.cookies) + return + + # Create a separate soup for maintenance detection + soup_check = bs(resp.text, features="lxml") + + # Remove known info banners by class + for banner in soup_check.select(".themeboostunioninfobanner"): + banner.decompose() + + # Also remove Bootstrap-style alert boxes marked as informational alerts + for alert in soup_check.select('div.alert[role="alert"]'): + alert.decompose() + + # Extract body text after cleanup + body = soup_check.find("body") + body_text = body.get_text(separator=" ", strip=True) if body else "" + + # Check for maintenance notice + if "Wartungsarbeiten" in body_text: + logger.critical( + "Detected Maintenance mode! If this is an error, please report it on GitHub." + ) + logger.info(f"Cleaned page body:\n{body_text}") + sys.exit() + + soup = bs(resp.text, features="lxml") + if soup.find("input", {"name": "RelayState"}) is None: + csrf_token = require_input_value( + soup, "csrf_token", "username/password form" + ) + login_data = { + "j_username": self.config["user"], + "j_password": self.config["password"], + "_eventId_proceed": "", + "csrf_token": csrf_token, + } + resp2 = self.session.post(resp.url, data=login_data) + + soup = bs(resp2.text, features="lxml") + + if soup.find(id="fudis_selected_token_ids_input") is None: + logger.critical( + "Failed to login. Maybe your login-info was wrong or the " + "RWTH servers have difficulties. For current service " + "status, see %s. For more info use the --verbose argument.", + RWTH_STATUS_URL, + ) + self._check_rwth_status_page() + logger.info("-------Login-Error-Soup--------") + logger.info(soup) + sys.exit(1) + + csrf_token = require_input_value( + soup, "csrf_token", "TOTP generator selection form" + ) + + print("Setting TOTP generator") + totp_selection_data = { + "fudis_selected_token_ids_input": self.config["totp"], + "_eventId_proceed": "", + "csrf_token": csrf_token, + } + + resp3 = self.session.post(resp2.url, data=totp_selection_data) + + soup = bs(resp3.text, features="lxml") + if soup.find(id="fudis_otp_input") is None: + logger.critical( + "Failed to select TOTP generator. Maybe your TOTP serial " + "number is wrong or the RWTH servers have difficulties. " + "For current service status, see %s. For more info use " + "the --verbose argument.", + RWTH_STATUS_URL, + ) + self._check_rwth_status_page() + logger.info("-------Login-Error-Soup--------") + logger.info(soup) + sys.exit(1) + + csrf_token = require_input_value(soup, "csrf_token", "TOTP entry form") + if not self.config.get("totpsecret"): + totp_input = input(f"Enter TOTP for generator {self.config['totp']}:\n") + else: + totp_input = generate_totp(self.config.get("totpsecret")) + print(f"Generated TOTP from provided secret: {totp_input}") + + totp_login_data = { + "fudis_otp_input": totp_input, + "_eventId_proceed": "", + "csrf_token": csrf_token, + } + + resp4 = self.session.post(resp3.url, data=totp_login_data) + + time.sleep(1) # if we go too fast, we might have our connection closed + soup = bs(resp4.text, features="lxml") + if soup.find("input", {"name": "RelayState"}) is None: + logger.critical( + "Failed to login. Maybe your login-info was wrong or the RWTH " + "servers have difficulties. For current service status, see " + "%s. For more info use the --verbose argument.", + RWTH_STATUS_URL, + ) + self._check_rwth_status_page() + logger.info("-------Login-Error-Soup--------") + logger.info(soup) + sys.exit(1) + data = { + "RelayState": require_input_value(soup, "RelayState", "SAML response"), + "SAMLResponse": require_input_value(soup, "SAMLResponse", "SAML response"), + } + resp = self.session.post( + "https://moodle.rwth-aachen.de/Shibboleth.sso/SAML2/POST", data=data + ) + soup = bs(resp.text, features="lxml") + self.session_key = get_session_key(soup) + save_session_cookies(cookie_file, self.session.cookies) + + # Moodle Web Services API + + def get_moodle_wstoken(self): + if not self.session: + raise Exception("You need to login() first.") + params = { + "service": "moodle_mobile_app", + "passport": 1, + "urlscheme": "moodlemobile", + } + # response = self.session.head("https://moodle.rwth-aachen.de/admin/tool/mobile/launch.php", params=params, allow_redirects=False) + + def getCookies(cookie_jar, domain): + # workaround for macos + cookie_dict = cookie_jar.get_dict(domain=domain) + found = ["%s=%s" % (name, value) for (name, value) in cookie_dict.items()] + return ";".join(found) + + conn = http.client.HTTPSConnection("moodle.rwth-aachen.de") + conn.request( + "GET", + "/admin/tool/mobile/launch.php?" + urllib.parse.urlencode(params), + headers={ + "Cookie": getCookies(self.session.cookies, "moodle.rwth-aachen.de") + }, + ) + response = conn.getresponse() + + # token is in an app schema, which contains the wstoken base64-encoded along with some other token + location = response.getheader("Location") + if location is None or "token=" not in location: + location_path = urllib.parse.urlparse(location).path if location else None + body_prefix = response.read(1000).decode("utf-8", errors="replace") + conn.close() + + if location_path and location_path.startswith("/admin/tool/policy/"): + logger.critical( + "RWTHmoodle requires you to accept updated policies/terms " + "before syncmymoodle can create a webservice token. Please " + "open https://moodle.rwth-aachen.de/ in your browser, accept " + "the pending policy page, and rerun syncmymoodle." + ) + logger.info( + "Unexpected mobile launch redirect target: " + f"{location_path or ''}" + ) + sys.exit(1) + + if location_path == "/login/index.php": + logger.critical( + "Failed to retrieve the Moodle webservice token because " + "Moodle redirected back to the login page. Your saved " + "session is probably stale or the SSO login did not finish " + "correctly. Delete the cookie file and try again." + ) + logger.info( + "Unexpected mobile launch redirect target: " + f"{location_path or ''}" + ) + sys.exit(1) + + logger.critical( + "Failed to retrieve the Moodle webservice token because Moodle " + "returned an unexpected redirect instead of a token." + ) + logger.info( + "Unexpected mobile launch redirect target: " + f"{location_path or ''}" + ) + if body_prefix: + logger.info( + "Unexpected mobile launch response body (truncated): " + f"{body_prefix}" + ) + sys.exit(1) + + # The redirect looks like moodlemobile://token=BASE64[&...]; isolate the + # token value and decode it defensively so a malformed redirect yields a + # clear message instead of a traceback. + token_base64d = location.split("token=", 1)[1].split("&")[0] + conn.close() + try: + token_parts = base64.b64decode(token_base64d).decode().split(":::") + except (ValueError, UnicodeDecodeError): + token_parts = [] + if len(token_parts) < 2 or not token_parts[1]: + logger.critical( + "Failed to parse the Moodle webservice token from the mobile " + "launch redirect. Your saved session may be stale; delete the " + "cookie file and try again." + ) + sys.exit(1) + self.wstoken = token_parts[1] + return self.wstoken + + def get_all_courses(self): + data = { + "requests[0][function]": "core_enrol_get_users_courses", + "requests[0][arguments]": json.dumps( + {"userid": str(self.user_id), "returnusercount": "0"} + ), + "requests[0][settingfilter]": 1, + "requests[0][settingfileurl]": 1, + "wsfunction": "tool_mobile_call_external_functions", + "wstoken": self.wstoken, + } + params = { + "moodlewsrestformat": "json", + "wsfunction": "tool_mobile_call_external_functions", + } + resp = self.session.post( + "https://moodle.rwth-aachen.de/webservice/rest/server.php", + params=params, + data=data, + ) + return json.loads(resp.json()["responses"][0]["data"]) + + def get_course(self, course_id): + data = { + "courseid": int(course_id), + "moodlewssettingfilter": True, + "moodlewssettingfileurl": True, + "wsfunction": "core_course_get_contents", + "wstoken": self.wstoken, + } + params = { + "moodlewsrestformat": "json", + "wsfunction": "core_course_get_contents", + } + resp = self.session.post( + "https://moodle.rwth-aachen.de/webservice/rest/server.php", + params=params, + data=data, + ) + return resp.json() + + def get_userid(self): + data = { + "moodlewssettingfilter": True, + "moodlewssettingfileurl": True, + "wsfunction": "core_webservice_get_site_info", + "wstoken": self.wstoken, + } + params = { + "moodlewsrestformat": "json", + "wsfunction": "core_webservice_get_site_info", + } + resp = self.session.post( + "https://moodle.rwth-aachen.de/webservice/rest/server.php", + params=params, + data=data, + ) + payload = resp.json() + if not payload.get("userid") or not payload["userprivateaccesskey"]: + logger.critical( + f"Error while getting userid and access key: {json.dumps(payload, indent=4)}" + ) + sys.exit(1) + self.user_id = payload["userid"] + self.user_private_access_key = payload["userprivateaccesskey"] + return self.user_id, self.user_private_access_key + + def get_assignment(self, course_id): + data = { + "courseids[0]": int(course_id), + "includenotenrolledcourses": 1, + "moodlewssettingfilter": True, + "moodlewssettingfileurl": True, + "wsfunction": "mod_assign_get_assignments", + "wstoken": self.wstoken, + } + params = { + "moodlewsrestformat": "json", + "wsfunction": "mod_assign_get_assignments", + } + resp = self.session.post( + "https://moodle.rwth-aachen.de/webservice/rest/server.php", + params=params, + data=data, + ) + courses = resp.json()["courses"] + return courses[0] if courses else None + + def get_assignment_submission_files(self, assignment_id): + data = { + "assignid": assignment_id, + "userid": self.user_id, + "moodlewssettingfilter": True, + "moodlewssettingfileurl": True, + "wsfunction": "mod_assign_get_submission_status", + "wstoken": self.wstoken, + } + + params = { + "moodlewsrestformat": "json", + "wsfunction": "mod_assign_get_submission_status", + } + + response = self.session.post( + "https://moodle.rwth-aachen.de/webservice/rest/server.php", + params=params, + data=data, + ) + + logger.info(f"------ASSIGNMENT-{assignment_id}-DATA------") + logger.info(response.text) + + payload = response.json() + files = payload.get("lastattempt", {}).get("submission", {}).get("plugins", []) + files += ( + payload.get("lastattempt", {}).get("teamsubmission", {}).get("plugins", []) + ) + files += payload.get("feedback", {}).get("plugins", []) + + files = [ + f.get("files", []) + for p in files + for f in p.get("fileareas", []) + if f["area"] in ["download", "submission_files", "feedback_files"] + ] + files = [f for folder in files for f in folder] + return files + + def get_folders_by_courses(self, course_id): + data = { + "courseids[0]": str(course_id), + "moodlewssettingfilter": True, + "moodlewssettingfileurl": True, + "wsfunction": "mod_folder_get_folders_by_courses", + "wstoken": self.wstoken, + } + + params = { + "moodlewsrestformat": "json", + "wsfunction": "mod_folder_get_folders_by_courses", + } + + response = self.session.post( + "https://moodle.rwth-aachen.de/webservice/rest/server.php", + params=params, + data=data, + ) + folder = response.json()["folders"] + return folder + + def sync(self): + """Retrives the file tree for all courses""" + if not self.session: + raise Exception("You need to login() first.") + if not self.wstoken: + raise Exception("You need to get_moodle_wstoken() first.") + if not self.user_id: + raise Exception("You need to get_userid() first.") + self.root_node = Node("", -1, "Root", None) + + # Syncing all courses + for course in self.get_all_courses(): + course_name = self._format_course_name( + course.get("shortname") or f"course-{course.get('id')}" + ) + course_id = course["id"] + + selected_courses = self.config.get("selected_courses", []) + if selected_courses: + # selected_courses is an explicit allowlist that overrides + # skip_courses (and, below, only_sync_semester). + if not self._course_id_in_filter(course_id, selected_courses): + continue + elif self._course_id_in_filter( + course_id, self.config.get("skip_courses", []) + ): + continue + + semestername = (course.get("idnumber") or "")[:4] or "unknown-semester" + # Skip not selected semesters (selected_courses overrides this) + if ( + not selected_courses + and self.config.get("only_sync_semester", []) + and semestername not in self.config.get("only_sync_semester", []) + ): + continue + + semester_node = [ + s for s in self.root_node.children if s.name == semestername + ] + if len(semester_node) == 0: + semester_node = self.root_node.add_child(semestername, None, "Semester") + else: + semester_node = semester_node[0] + + course_node = semester_node.add_child(course_name, course_id, "Course") + + print(f"Syncing {course_name}...") + course_sections = self.get_course(course_id) + module_names = { + module.get("modname") + for section in course_sections + if isinstance(section, dict) + for module in section.get("modules", []) + } + + assignments = None + if self.config.get("used_modules", {}).get("assign", {}) and ( + "assign" in module_names + ): + assignments = self.get_assignment(course_id) + assignments_by_cmid = { + assignment["cmid"]: assignment + for assignment in ((assignments or {}).get("assignments") or []) + if "cmid" in assignment + } + + folders = [] + if self.config.get("used_modules", {}).get("folder", {}) and ( + "folder" in module_names + ): + folders = self.get_folders_by_courses(course_id) + folders_by_coursemodule = { + folder.get("coursemodule"): folder for folder in folders + } + + logger.info("-----------------------") + logger.info(f"------{semestername} - {course_name}------") + logger.info("------COURSE-DATA------") + logger.info(json.dumps(course)) + logger.info("------ASSIGNMENT-DATA------") + logger.info(json.dumps(assignments)) + logger.info("------FOLDER-DATA------") + logger.info(json.dumps(folders)) + + for section in course_sections: + if isinstance(section, str): + logger.error(f"Error syncing section in {course_name}: {section}") + continue + if self._should_skip_section(section, course_id): + continue + logger.info("------SECTION-DATA------") + logger.info(json.dumps(section)) + section_node = course_node.add_child( + section["name"], section["id"], "Section" + ) + for module in section["modules"]: + try: + if self._should_skip_module(module, course_id): + continue + + # Get Assignments + if module["modname"] == "assign" and self.config.get( + "used_modules", {} + ).get("assign", {}): + ass = assignments_by_cmid.get(module["id"]) + if not ass: + continue + assignment_id = ass["id"] + assignment_name = module["name"] + assignment_node = section_node.add_child( + assignment_name, assignment_id, "Assignment" + ) + + assignment_intro = ass.get("intro") + if assignment_intro: + self.scanForLinks( + assignment_intro, + assignment_node, + course_id, + module_title=assignment_name, + ) + + ass = ass[ + "introattachments" + ] + self.get_assignment_submission_files(assignment_id) + for c in ass: + if self._should_skip_url( + c.get("fileurl"), "assignment file" + ): + continue + self._add_moodle_file_node( + assignment_node, + c.get("filepath", "/"), + c["filename"], + c["fileurl"], + "Assignment File", + c["fileurl"], + timemodified=c.get("timemodified"), + ) + + # Get Resources or URLs + if module["modname"] in [ + "resource", + "url", + "book", + "page", + "pdfannotator", + ]: + if module["modname"] == "resource" and not self.config.get( + "used_modules", {} + ).get("resource", {}): + continue + for c in module.get("contents", []): + file_url = c.get("fileurl") + if not file_url: + continue + if self._should_skip_url(file_url, "resource link"): + continue + if self._is_direct_moodle_file_content(module, c): + self._add_moodle_content_file_node(section_node, c) + elif not ( + module["modname"] == "page" + and c.get("filename") == "index.html" + ): + self.scanForLinks( + file_url, + section_node, + course_id, + single=True, + module_title=module["name"], + ) + + # Get Folders + if module["modname"] == "folder" and self.config.get( + "used_modules", {} + ).get("folder", {}): + folder_node = section_node.add_child( + module["name"], module["id"], "Folder" + ) + + # Scan intro for links + folder_info = folders_by_coursemodule.get(module["id"]) + if folder_info and folder_info.get("intro"): + self.scanForLinks( + folder_info["intro"], folder_node, course_id + ) + + for c in module.get("contents", []): + if self._should_skip_url( + c.get("fileurl"), "folder file" + ): + continue + self._add_moodle_file_node( + folder_node, + c.get("filepath", "/"), + c["filename"], + c["fileurl"], + "Folder File", + c["fileurl"], + timemodified=c.get("timemodified"), + ) + + # Get embedded videos in pages or labels + if module["modname"] in [ + "page", + "label", + "h5pactivity", + ] and self.config.get("used_modules", {}).get("url", {}): + if module["modname"] == "page": + opencast_enabled = ( + self.config.get("used_modules", {}) + .get("url", {}) + .get("opencast", {}) + ) + html_url = ( + module.get("url") + or f'https://moodle.rwth-aachen.de/mod/page/view.php?id={module["id"]}' + ) + scan_page_links = not self.config.get( + "nolinks" + ) and not self._should_skip_url(html_url, "page link") + if opencast_enabled or scan_page_links: + try: + response = self.session.get(html_url) + except Exception: + logger.exception( + "Failed to fetch page module %s", + module["id"], + ) + response = None + if response and not ( + 200 <= response.status_code < 300 + ): + logger.warning( + "Page module %s returned status %s", + module["id"], + response.status_code, + ) + response = None + if response: + if opencast_enabled: + html = bs( + response.text, + features="lxml", + ) + for iframe in html.find_all("iframe"): + iframe_src = iframe.get("src") + if not iframe_src: + continue + iframe_src = urllib.parse.urljoin( + response.url or html_url, + iframe_src, + ) + vid_id = ( + self._extract_opencast_episode_id( + iframe_src + ) + ) + if not vid_id: + continue + if not self._authenticate_opencast_episode( + course_id, vid_id + ): + continue + vid = self.extractTrackFromEpisode( + vid_id + ) + if not vid: + continue + + if self._should_skip_url( + vid, "Opencast video URL" + ): + continue + + section_node.add_child( + module["name"], + vid_id, + "Opencast", + url=vid, + additional_info=course_id, + ) + + if scan_page_links: + self._scan_html_text_for_links( + response.text, + response.url or html_url, + section_node, + course_id, + module_title=module["name"], + ) + # "Interactive" h5p videos + elif module["modname"] == "h5pactivity": + html_url = f'https://moodle.rwth-aachen.de/mod/h5pactivity/view.php?id={module["id"]}' + html = bs( + self.session.get(html_url).text, + features="lxml", + ) + # Get h5p iframe + iframe = html.find("iframe") + iframe_src = iframe.get("src") if iframe else None + if iframe_src: + iframe_src = urllib.parse.urljoin( + html_url, iframe_src + ) + iframe_html = str( + bs( + self.session.get(iframe_src).text, + features="lxml", + ) + ) + # Moodle devs dont know how to use CDATA correctly, so we need to remove all backslashes + sanitized_html = iframe_html.replace("\\", "") + else: + # H5P outside iframes + sanitized_html = str(html).replace("\\", "") + + self.scanForLinks( + sanitized_html, + section_node, + course_id, + module_title=module["modname"], + single=False, + ) + else: + self.scanForLinks( + module.get("description", ""), + section_node, + course_id, + module_title=module["name"], + ) + + # New OpenCast integration + if module["modname"] == "lti" and self.config.get( + "used_modules", {} + ).get("url", {}).get("opencast", {}): + info_url = f'https://moodle.rwth-aachen.de/mod/lti/launch.php?id={module["id"]}&triggerview=0' + try: + info_response = self.session.get(info_url) + except Exception: + logger.exception( + "Opencast: failed to fetch LTI module %s", + module["id"], + ) + continue + if not (200 <= info_response.status_code < 300): + logger.warning( + "Opencast: LTI module %s returned status %s", + module["id"], + info_response.status_code, + ) + self._log_opencast_backend_issue(info_response.text) + continue + + info_res = bs(info_response.text, features="lxml") + + engage_series_id = self._get_input_value( + info_res, "custom_series" + ) + engage_single_id = self._get_input_value( + info_res, "custom_id" + ) + name = ( + self._get_input_value(info_res, "resource_link_title") + or module["name"] + ) + engage_data = self._extract_lti_form_data(info_res) + + if engage_series_id: + # Found an Opencast "series" page + series_id = engage_series_id + + series_node = course_node.add_child( + name, series_id, "Section" + ) + + if not self._submit_opencast_lti_form( + engage_data, f"LTI series module {module['id']}" + ): + continue + + series_url = f"https://engage.streaming.rwth-aachen.de/search/episode.json?limit=100&offset=0&sid={series_id}" + series_response = self._fetch_opencast_json( + series_url, f"series {series_id}" + ) + if series_response is None: + continue + + for episode in self._get_opencast_result_list( + series_response, f"series {series_id}" + ): + if not isinstance(episode, dict): + continue + mediapackage = episode.get("mediapackage", {}) + if not isinstance(mediapackage, dict): + continue + episode_id = mediapackage.get("id") + if not episode_id: + logger.warning( + "Opencast: series %s contains episode without id", + series_id, + ) + continue + vid = self.extractTrackFromEpisode(episode_id) + if not vid: + continue + if self._should_skip_url(vid, "Opencast video URL"): + continue + series_node.add_child( + mediapackage.get("title") or episode_id, + episode_id, + "Opencast", + url=vid, + additional_info=module["id"], + ) + else: + if not engage_single_id: + logger.info( + "Failed to find either custom_id or custom_series on lti page." + ) + logger.info("------LTI-ERROR-HTML------") + logger.info(f"url: {info_url}") + logger.info(info_res) + else: + if not self._submit_opencast_lti_form( + engage_data, f"LTI module {module['id']}" + ): + continue + vid = self.extractTrackFromEpisode(engage_single_id) + if not vid: + continue + if self._should_skip_url(vid, "Opencast video URL"): + continue + section_node.add_child( + name, + engage_single_id, + "Opencast", + url=vid, + additional_info=module["id"], + ) + # Integration for Quizzes + if module["modname"] == "quiz" and self.config.get( + "used_modules", {} + ).get("url", {}).get("quiz", {}): + info_url = f'https://moodle.rwth-aachen.de/mod/quiz/view.php?id={module["id"]}' + info_res = bs( + self.session.get(info_url).text, features="lxml" + ) + attempts = info_res.find_all( + "a", + { + "title": "Überprüfung der eigenen Antworten dieses Versuchs" + }, + ) + attempt_cnt = 0 + for attempt in attempts: + attempt_cnt += 1 + review_url = attempt.get("href") + quiz_res = bs( + self.session.get(review_url).text, + features="lxml", + ) + name = ( + quiz_res.find("title") + .get_text() + .replace(": Überprüfung des Testversuchs", "") + + ", Versuch " + + str(attempt_cnt) + ) + section_node.add_child( + self.sanitize(name), + urllib.parse.urlparse(review_url)[1], + "Quiz", + url=review_url, + ) + + except Exception: + logger.exception(f"Failed to download the module {module}") + + self.root_node.remove_children_nameclashes() + + def download_all_files(self): + if not self.session: + raise Exception("You need to login() first.") + if not self.wstoken: + raise Exception("You need to get_moodle_wstoken() first.") + if not self.user_id: + raise Exception("You need to get_userid() first.") + if not self.root_node: + raise Exception("You need to sync() first.") + + self._download_all_files(self.root_node) + + def _download_all_files(self, cur_node): + if len(cur_node.children) == 0: + if cur_node.url and not cur_node.is_downloaded: + if cur_node.type == "Youtube": + try: + self.scanAndDownloadYouTube(cur_node) + cur_node.is_downloaded = True + except Exception: + logger.exception(f"Failed to download the module {cur_node}") + logger.error( + "This could be caused by an out of date yt-dlp version. Try upgrading yt-dlp through pip or your package manager." + ) + elif cur_node.type == "Opencast": + try: + # download Opencast videos + if ".mp4" not in cur_node.name: + if cur_node.name is not None and cur_node.name != "": + cur_node.name += ".mp4" + else: + cur_node.name = cur_node.url.split("/")[-1] + if self.download_file(cur_node): + cur_node.is_downloaded = True + except Exception: + logger.exception(f"Failed to download the module {cur_node}") + elif cur_node.type == "Quiz": + logger.warning( + "Skipping quiz PDF generation for %s because it is disabled " + "for security.", + cur_node.name, + ) + else: + try: + if self.download_file(cur_node): + cur_node.is_downloaded = True + except Exception: + logger.exception(f"Failed to download the module {cur_node}") + return + + for child in cur_node.children: + self._download_all_files(child) + + def get_sanitized_node_path(self, node: Node) -> Path: + return get_sanitized_node_path( + node, Path(self.config.get("basedir", "./")), self.invalid_chars + ) + + def sanitize(self, path): + return sanitize_path_part(path, self.invalid_chars) + + def _content_type_without_parameters(self, response): + content_type = response.headers.get("Content-Type", "") + return content_type.split(";", 1)[0].strip().lower() + + def _node_allows_html_download(self, node): + html_suffixes = {".htm", ".html", ".xhtml"} + node_suffix = Path(str(node.name or "")).suffix.lower() + url_suffix = Path( + urllib.parse.urlparse(str(node.url or "")).path + ).suffix.lower() + return node_suffix in html_suffixes or url_suffix in html_suffixes + + def _chunk_looks_like_html(self, chunk): + body_start = chunk.lstrip().lower() + return body_start.startswith(b" + + + + + + +""" + headers = { + **auth_header, + "Depth": "1", + "Content-Type": "application/xml", + } + try: + propfind_response = self.session.request( + "PROPFIND", + sciebo_url + href, + headers=headers, + data=propfind_body, + ) + except Exception: + logger.exception( + "Sciebo PROPFIND failed for href %s (share %s)", + href, + sharingToken, + ) + return + + if not (200 <= propfind_response.status_code < 300): + logger.warning( + "Sciebo PROPFIND returned status %s for href %s (share %s)", + propfind_response.status_code, + href, + sharingToken, + ) + return + + # parse the response + soup_xml = bs(propfind_response.text, features="xml") + + for resp in soup_xml.find_all("d:response"): + # get the href of the response + href_tag = resp.find("d:href") + if href_tag is None or not href_tag.text: + continue + new_href = href_tag.text + + if new_href == href: + logger.info( + "Sciebo: skipping %s because it is the current folder", + new_href, + ) + continue + + # Extract a stable content hash for this item. Prefer the + # SHA1 checksum from oc:checksums if available; fall back + # to the raw ETag otherwise. + etag_value = None + prop = resp.find("d:prop") + if prop is not None: + checksums_tag = prop.find("oc:checksums") + if checksums_tag is not None: + for cs in checksums_tag.find_all("oc:checksum"): + text = (cs.text or "").strip() + if text.upper().startswith("SHA1:"): + etag_value = text.split(":", 1)[1] + break + + if etag_value is None: + etag_tag = prop.find("d:getetag") + if etag_tag and etag_tag.text: + etag_value = etag_tag.text.strip() + + logger.info(f"Sciebo response href: {new_href}") + # get the displayname of the response + displayname = ( + new_href.split("/")[-2] + if new_href.endswith("/") + else new_href.split("/")[-1] + ) + displayname = ( + f"sciebo-{sharingToken}" + if displayname == "webdav" + else displayname + ) + + # check if the response is a folder + if new_href.endswith("/"): + # create a new node for the folder + folder_node = parent_node.add_child( + displayname, None, "Sciebo Folder", etag=etag_value + ) + if folder_node is None: + continue + # recursive call to get all files in the folder + get_sciebo_files( + new_href, folder_node, sharingToken, auth_header + ) + else: + # create a new node for the file + parent_node.add_child( + displayname, + None, + "Sciebo File", + url=sciebo_url + new_href, + additional_info=auth_header, + etag=etag_value, + ) + + get_sciebo_files( + webdav_location, sciebo_root, sharingToken, auth_header + ) + self._sciebo_link_cache[link] = sciebo_root.clone() diff --git a/syncmymoodle/cli.py b/syncmymoodle/cli.py new file mode 100644 index 0000000..5eeaf99 --- /dev/null +++ b/syncmymoodle/cli.py @@ -0,0 +1,299 @@ +#!/usr/bin/env python3 + +import getpass +import json +import logging +import os +import sys +from argparse import ArgumentParser +from pathlib import Path +from types import ModuleType + +from syncmymoodle.app import SyncMyMoodle +from syncmymoodle.constants import COURSE_PREFIX_HANDLING_OPTIONS + +try: + import keyring as imported_keyring + + keyring: ModuleType | None = imported_keyring +except ImportError: + keyring = None + +logger = logging.getLogger(__name__) + + +def main(): + parser = ArgumentParser( + prog="python3 -m syncmymoodle", + description="Synchronization client for RWTH Moodle. All optional arguments override those in config.json.", + ) + + if keyring: + parser.add_argument( + "--secretservice", + action="store_true", + help="Use system's keyring for storing and retrieving account credentials", + ) + parser.add_argument( + "--secretservicetotpsecret", + action="store_true", + help="Save TOTP secret in keyring", + ) + + parser.add_argument( + "--user", default=None, help="set your RWTH Single Sign-On username" + ) + parser.add_argument( + "--password", default=None, help="set your RWTH Single Sign-On password" + ) + parser.add_argument( + "--totp", + default=None, + help="set your RWTH Single Sign-On TOTP provider's serial number (see https://idm.rwth-aachen.de/selfservice/MFATokenManager)", + ) + parser.add_argument( + "--totpsecret", + default=None, + help="(optional) set your RWTH Single Sign-On TOTP provider Secret", + ) + parser.add_argument("--config", default=None, help="set your configuration file") + parser.add_argument( + "--cookiefile", default=None, help="set the location of a cookie file" + ) + parser.add_argument( + "--courses", + default=None, + help="specify the courses that should be synced using comma-separated links. Defaults to all courses, if no additional restrictions e.g. semester are defined.", + ) + parser.add_argument( + "--skipcourses", + default=None, + help="exclude specific courses using comma-separated links. Defaults to None.", + ) + parser.add_argument( + "--semester", + default=None, + help="specify semesters to be synced e.g. `22s`, comma-separated. Defaults to all semesters, if no additional restrictions e.g. courses are defined.", + ) + parser.add_argument( + "--basedir", + default=None, + help="specify the directory where all files will be synced", + ) + parser.add_argument( + "--courseprefix", + choices=COURSE_PREFIX_HANDLING_OPTIONS, + default=None, + help=( + "handle leading two-character course prefixes in local folder names: " + "'keep' (default), 'remove', or 'suffix'" + ), + ) + parser.add_argument( + "--nolinks", + action="store_true", + help="define whether various links in moodle pages should also be inspected e.g. youtube videos, wikipedia articles", + ) + parser.add_argument( + "--excludefiletypes", + default=None, + help='specify whether specific file types should be excluded, comma-separated e.g. "mp4,mkv"', + ) + parser.add_argument( + "--updatefiles", + action="store_true", + help="define whether modified files with the same name/path should be redownloaded", + ) + parser.add_argument( + "--updatefilesconflict", + choices=["rename", "keep", "overwrite"], + default=None, + help=( + "define how to handle locally modified files when updating: " + "'rename' (default) moves the old file aside, 'keep' skips the " + "update, 'overwrite' replaces the local file" + ), + ) + parser.add_argument( + "-v", + "--verbose", + action="store_const", + dest="loglevel", + const=logging.INFO, + default=logging.WARNING, + help="show information useful for debugging", + ) + args = parser.parse_args() + + if args.config: + overwrite_config = Path(args.config) + if overwrite_config.is_file(): + with overwrite_config.open() as f: + config = json.load(f) + else: + config = {} + + global_config = ( + Path(os.environ.get("XDG_CONFIG_HOME", Path("~/.config").expanduser())) + / "syncmymoodle" + / "config.json" + ) + if global_config.is_file(): + with global_config.open() as f: + config.update(json.load(f)) + + local_config = Path("config.json") + if local_config.is_file(): + with local_config.open() as f: + config.update(json.load(f)) + + config["user"] = args.user or config.get("user") + config["password"] = args.password or config.get("password") + config["totp"] = args.totp or config.get("totp") + config["totpsecret"] = args.totpsecret or config.get("totpsecret") + config["cookie_file"] = args.cookiefile or config.get("cookie_file", "./session") + config["selected_courses"] = ( + args.courses.split(",") if args.courses else config.get("selected_courses", []) + ) + config["only_sync_semester"] = ( + args.semester.split(",") + if args.semester + else config.get("only_sync_semester", []) + ) + config["basedir"] = args.basedir or config.get("basedir", "./") + config["course_prefix_handling"] = args.courseprefix or config.get( + "course_prefix_handling", "keep" + ) + config["use_secret_service"] = ( + args.secretservice if keyring else None + ) or config.get("use_secret_service") + config["secret_service_store_totp_secret"] = ( + args.secretservicetotpsecret if keyring else None + ) or config.get("secret_service_store_totp_secret") + config["skip_courses"] = ( + args.skipcourses.split(",") + if args.skipcourses + else config.get("skip_courses", []) + ) + config["nolinks"] = args.nolinks or config.get("no_links") + config["used_modules"] = config.get("used_modules") or { + "assign": True, + "resource": True, + "url": {"youtube": True, "opencast": True, "sciebo": True, "quiz": False}, + "folder": True, + } + config["exclude_filetypes"] = ( + args.excludefiletypes.split(",") + if args.excludefiletypes + else config.get("exclude_filetypes", []) + ) + config["exclude_files"] = config.get("exclude_files", []) + config["exclude_links"] = config.get("exclude_links", []) + config["allowed_domains"] = config.get("allowed_domains", []) + config["exclude_sections"] = config.get( + "exclude_sections", config.get("skip_sections", []) + ) + config["exclude_modules"] = config.get( + "exclude_modules", config.get("skip_modules", []) + ) + config["updatefiles"] = args.updatefiles or config.get("update_files", False) + config["update_files_conflict"] = args.updatefilesconflict or config.get( + "update_files_conflict", "rename" + ) + + logging.basicConfig(level=args.loglevel) + + if config["used_modules"]["url"].get("quiz"): + config["used_modules"]["url"]["quiz"] = False + logger.warning( + "Quiz PDF generation is disabled until the pdfkit/wkhtmltopdf " + "renderer is replaced with a safer implementation." + ) + + if keyring and config.get("use_secret_service"): + if config.get("password"): + logger.critical("You need to remove your password from your config file!") + sys.exit(1) + + if config.get("secret_service_store_totp_secret") and config.get("totpsecret"): + logger.critical("You need to remove your totpsecret from your config file!") + sys.exit(1) + + if not args.user and not config.get("user"): + print( + "You need to provide your username in the config file or through --user!" + ) + sys.exit(1) + + if ( + config.get("secretservicetotpsecret") + and not args.totp + and not config.get("totp") + ): + print( + "You need to provide your TOTP provider in the config file or through --totp!" + ) + sys.exit(1) + + config["password"] = keyring.get_password("syncmymoodle", config.get("user")) + if config["password"] is None: + if args.password: + password = args.password + else: + password = getpass.getpass("Password:") + keyring.set_password("syncmymoodle", config.get("user"), password) + config["password"] = password + + if config.get("secret_service_store_totp_secret"): + config["totpsecret"] = keyring.get_password( + "syncmymoodle", config.get("totp") + ) + if config["totpsecret"] is None: + if args.totpsecret: + totpsecret = args.totpsecret + else: + totpsecret = getpass.getpass("TOTP-Secret:") + keyring.set_password("syncmymoodle", config.get("totp"), totpsecret) + config["totpsecret"] = totpsecret + + if not config.get("user") or not config.get("password"): + logger.critical( + "You need to specify your username and password in the config file or as an argument!" + ) + sys.exit(1) + + if not config.get("totp"): + logger.critical( + "You need to specify your TOTP generator in the config file or as an argument!" + ) + sys.exit(1) + + smm = SyncMyMoodle(config) + + print("Logging in...") + smm.login() + smm.get_moodle_wstoken() + smm.get_userid() + print("Syncing file tree...") + smm.sync() + print("Downloading files...") + smm.download_all_files() + print("Saving root node as cache...") + smm.cache_root_node() + + # If we saw multiple Opencast backend errors send a reminder + # to check the RWTH ITC status page before filing a bug. + try: + if smm._opencast_error_count >= 5: + logger.warning( + "Multiple Opencast backend errors occurred. Please check the RWTH " + "ITC status page before reporting an issue on GitHub: " + "https://maintenance.itc.rwth-aachen.de/ticket/status/messages/499" + ) + except Exception: + # Never let summary logging break the main flow. + pass + + +if __name__ == "__main__": + main() diff --git a/syncmymoodle/constants.py b/syncmymoodle/constants.py new file mode 100644 index 0000000..5e53198 --- /dev/null +++ b/syncmymoodle/constants.py @@ -0,0 +1,27 @@ +import re + +YOUTUBE_ID_LENGTH = 11 +YOUTUBE_LINK_RE = re.compile( + r"(https?://(www\.)?(youtube\.com/(watch\?[a-zA-Z0-9_=&-]*v=|embed/)|youtu.be/).{11})" +) +OPENCAST_LINK_RE = re.compile( + r"https://engage\.streaming\.rwth-aachen\.de/play/[a-zA-Z0-9-]+" +) +SCIEBO_LINK_RE = re.compile(r"https://rwth-aachen\.sciebo\.de/s/[a-zA-Z0-9-]+") +MOODLE_URL = "https://moodle.rwth-aachen.de/" +RWTH_HOMEPAGE_URL = "https://www.rwth-aachen.de/" +RWTH_STATUS_URL = "https://maintenance.itc.rwth-aachen.de/ticket/status/messages" +RWTH_MOODLE_STATUS_URL = ( + "https://maintenance.itc.rwth-aachen.de/ticket/status/messages/499?locale=en" +) +RWTH_SSO_STATUS_URL = ( + "https://maintenance.itc.rwth-aachen.de/ticket/status/messages/462?locale=en" +) +RWTH_DISRUPTIVE_STATUS_CLASSES = { + "statuslabel_stoerung", + "statuslabel_teilstoerung", + "statuslabel_wartung", + "statuslabel_warnung", +} +COURSE_PREFIX_RE = re.compile(r"^\((?P[^()]{2})\) +(?P.+)$") +COURSE_PREFIX_HANDLING_OPTIONS = ("keep", "remove", "suffix") diff --git a/syncmymoodle/context.py b/syncmymoodle/context.py new file mode 100644 index 0000000..015bc4a --- /dev/null +++ b/syncmymoodle/context.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +from syncmymoodle.node import Node + + +@dataclass +class SyncContext: + config: dict[str, Any] + session: Any = None + session_key: str | None = None + wstoken: str | None = None + user_id: Any = None + user_private_access_key: str | None = None + root_node: Node | None = None + course_caches: dict[Path, Node] = field(default_factory=dict) + opencast_error_count: int = 0 + opencast_status_hint_logged: bool = False + sciebo_link_cache: dict[str, Node] = field(default_factory=dict) + opencast_episode_auth_cache: set[tuple[Any, str]] = field(default_factory=set) + opencast_track_cache: dict[str, str] = field(default_factory=dict) + downloaded_paths: set[Path] | None = None diff --git a/syncmymoodle/filters.py b/syncmymoodle/filters.py new file mode 100644 index 0000000..a4f11af --- /dev/null +++ b/syncmymoodle/filters.py @@ -0,0 +1,193 @@ +import logging +import urllib.parse +from fnmatch import fnmatchcase +from typing import Any + +from syncmymoodle.constants import COURSE_PREFIX_HANDLING_OPTIONS, COURSE_PREFIX_RE + +logger = logging.getLogger(__name__) + + +def as_list(value: Any) -> list[Any]: + if value is None: + return [] + if isinstance(value, list): + return value + return [value] + + +def course_id_in_filter(course_id: Any, entries: Any) -> bool: + """Return True if ``course_id`` is referenced by a configured entry. + + Entries are course URLs (``.../course/view.php?id=NNN``). The ``id`` + query parameter is compared exactly, so e.g. ``id=12`` does not also + match courses ``1`` or ``2``. A bare numeric id entry is also accepted. + """ + course_id = str(course_id) + for entry in entries or []: + entry = str(entry) + parsed = urllib.parse.urlparse(entry) + if course_id in urllib.parse.parse_qs(parsed.query).get("id", []): + return True + if entry.strip() == course_id: + return True + return False + + +def configured_patterns( + config: dict[str, Any], *keys: str, course_id=None +) -> list[str]: + patterns = [] + for key in keys: + value = config.get(key) + if isinstance(value, dict): + patterns.extend(as_list(value.get("*"))) + if course_id is not None: + patterns.extend(as_list(value.get(str(course_id)))) + else: + patterns.extend(as_list(value)) + return [str(pattern) for pattern in patterns if pattern is not None] + + +def format_course_name( + course_name: str, config: dict[str, Any], log: logging.Logger = logger +) -> str: + prefix_handling = config.get("course_prefix_handling", "keep") + if prefix_handling == "keep": + return course_name + if prefix_handling not in COURSE_PREFIX_HANDLING_OPTIONS: + log.warning( + "Unsupported course_prefix_handling value %r; using keep", + prefix_handling, + ) + return course_name + + match = COURSE_PREFIX_RE.match(course_name) + if not match: + return course_name + + name = match.group("course_name") + prefix = match.group("prefix") + if prefix_handling == "remove": + return name + return f"{name} ({prefix})" + + +def matches_any_pattern(values: list[Any], patterns: list[str]) -> bool: + for value in values: + if value is None: + continue + value = str(value) + for pattern in patterns: + if value == pattern or fnmatchcase(value, pattern): + return True + return False + + +def domain_matches(netloc: str, allowed_domain: str) -> bool: + host = netloc.split("@")[-1].split(":")[0].lower() + domain = str(allowed_domain).strip().lower() + domain = urllib.parse.urlparse(domain).netloc or domain + domain = domain.split("@")[-1].split(":")[0] + if not domain: + return False + if fnmatchcase(host, domain): + return True + if domain.startswith("*."): + return host.endswith(domain[1:]) + return host == domain or host.endswith(f".{domain}") + + +def should_skip_url( + config: dict[str, Any], + url: str | None, + context: str = "link", + log: logging.Logger = logger, +) -> bool: + if not url: + return False + + url = str(url).replace("&", "&") + if matches_any_pattern([url], configured_patterns(config, "exclude_links")): + log.info("Skipping %s %s because it matches exclude_links", context, url) + return True + + allowed_domains = configured_patterns(config, "allowed_domains") + if allowed_domains: + parsed_url = urllib.parse.urlparse(url) + if parsed_url.scheme in {"http", "https"} and parsed_url.netloc: + if not any( + domain_matches(parsed_url.netloc, domain) for domain in allowed_domains + ): + log.info( + "Skipping %s %s because it is outside allowed_domains", + context, + url, + ) + return True + + return False + + +def should_skip_section( + config: dict[str, Any], + section: dict[str, Any], + course_id: Any, + log: logging.Logger = logger, +) -> bool: + patterns = configured_patterns( + config, "exclude_sections", "skip_sections", course_id=course_id + ) + if not patterns: + return False + + values = [section.get("name"), section.get("id")] + if matches_any_pattern(values, patterns): + log.info( + "Skipping section %s (%s) in course %s because it matches " + "exclude_sections", + section.get("name"), + section.get("id"), + course_id, + ) + return True + return False + + +def should_skip_module( + config: dict[str, Any], + module: dict[str, Any], + course_id: Any, + log: logging.Logger = logger, +) -> bool: + patterns = configured_patterns( + config, "exclude_modules", "skip_modules", course_id=course_id + ) + if not patterns: + return False + + module_id = module.get("id") + module_name = module.get("name") + modname = module.get("modname") + module_urls = [] + if module.get("url"): + module_urls.append(module.get("url")) + if module_id and modname: + module_urls.extend( + [ + f"https://moodle.rwth-aachen.de/mod/{modname}/view.php?id={module_id}", + f"https://moodle.rwth-aachen.de/mod/{modname}/launch.php?id={module_id}", + ] + ) + + values = [module_id, module_name, modname, *module_urls] + if matches_any_pattern(values, patterns): + log.info( + "Skipping module %s (%s) in course %s because it matches " + "exclude_modules", + module_name, + module_id, + course_id, + ) + return True + return False diff --git a/syncmymoodle/node.py b/syncmymoodle/node.py new file mode 100644 index 0000000..9ab01c4 --- /dev/null +++ b/syncmymoodle/node.py @@ -0,0 +1,202 @@ +from __future__ import annotations + +import base64 +import hashlib +from pathlib import Path +from typing import Any, cast + +NAME_CLASH_ID_UNSET = object() + + +class Node: + def __init__( + self, + name: str, + id: Any, + type: str, # noqa: A003 - keep original name for compatibility + parent: Node | None, + url: str | None = None, + additional_info: Any = None, + timemodified: Any = None, + etag: str | None = None, + name_clash_id: Any = NAME_CLASH_ID_UNSET, + is_downloaded: bool = False, + ) -> None: + self.name = name + self.id = id + self.url = url + self.type = type + self.parent = parent + self.children: list[Node] = [] + # Currently only used for course_id in opencast, auth header in sciebo, + # and may be extended for other module-specific data. + self.additional_info = additional_info + self.timemodified = timemodified + self.etag = etag + self.name_clash_id = ( + id if name_clash_id is NAME_CLASH_ID_UNSET else name_clash_id + ) + self.is_downloaded = ( + is_downloaded # Can also be used to exclude files from being downloaded + ) + + def __repr__(self) -> str: + return f"Node(name={self.name}, id={self.id}, url={self.url}, type={self.type})" + + def add_child( + self, + name: str, + id: Any, + type: str, # noqa: A003 - keep original name for compatibility + url: str | None = None, + additional_info: Any = None, + timemodified: Any = None, + etag: str | None = None, + name_clash_id: Any = NAME_CLASH_ID_UNSET, + ) -> Node | None: + if url: + url = url.replace("?forcedownload=1", "").replace( + "mod_page/content/3/", "mod_page/content/" + ) + url = url.replace("webservice/pluginfile.php", "pluginfile.php") + + # Check for duplicate urls and just ignore those nodes: + if url and any([True for c in self.children if c.url == url]): + return None + + temp = Node( + name, + id, + type, + self, + url=url, + additional_info=additional_info, + timemodified=timemodified, + etag=etag, + name_clash_id=name_clash_id, + ) + self.children.append(temp) + return temp + + def clone(self, parent: Node | None = None) -> Node: + clone = Node( + self.name, + self.id, + self.type, + parent, + url=self.url, + additional_info=self.additional_info, + timemodified=self.timemodified, + etag=self.etag, + name_clash_id=self.name_clash_id, + is_downloaded=self.is_downloaded, + ) + clone.children = [child.clone(clone) for child in self.children] + return clone + + def get_path(self) -> list[str]: + ret: list[str] = [] + cur: Node | None = self + while cur is not None: + ret.insert(0, cur.name) + cur = cur.parent + return ret + + def go_to_path(self, target_path: list[str]) -> Node: + target_node = [self] + for path_child in target_path: + if path_child == "": + continue + try: + target_node.append( + [ + node_child + for node_child in target_node[-1].children + if node_child.name == path_child + ][0] + ) + except IndexError: + raise Exception("The path is not found in this root node. Wrong path?") + return target_node[-1] + + def _clash_suffix(self) -> str: + # Stable, distinct suffix used to disambiguate same-named siblings. + # Fall back to the URL when no name_clash_id is set (direct-link, + # embedded, and direct-content file nodes pass name_clash_id=None); + # otherwise such nodes would all hash to md5("None") and collide onto + # the same path, silently dropping all but one file. + key = self.name_clash_id if self.name_clash_id is not None else self.url + return base64.urlsafe_b64encode( + hashlib.md5(str(key).encode("utf-8")).hexdigest().encode("utf-8") + ).decode()[:10] + + def remove_children_nameclashes(self) -> None: + # Check for duplicate filenames + + unclashed_children = [] + # work on copy since deleting from the iterated list breaks stuff + copy_children = self.children.copy() + for child in copy_children: + if child not in self.children: + continue + self.children.remove(child) + unclashed_children.append(child) + if child.type == "Opencast": + siblings = [ + c + for c in self.children + if c.name == child.name and c.url != child.url + ] + if len(siblings) > 0: + # if an Opencast filename is duplicate in its directory, we append the filename as it was uploaded + tmp_name = Path(child.name).name + child.name = f"{tmp_name}_{cast(str, child.url).split('/')[-1]}" + for s in siblings: + tmp_name = Path(s.name).name + s.name = f"{s.name}_{cast(str, s.url).split('/')[-1]}" + self.children.remove(s) + unclashed_children.extend(siblings) + + self.children = unclashed_children + + unclashed_children = [] + copy_children = self.children.copy() + for child in copy_children: + if child not in self.children: + continue + self.children.remove(child) + unclashed_children.append(child) + siblings = [ + c + for c in self.children + if c.name == child.name + and ( + c.url != child.url + # Course prefix handling may create duplicate URL-less course + # folders. Other URL-less nodes, such as duplicate Moodle + # sections, keep the legacy behavior and merge silently. + or ( + child.type == "Course" + and c.type == "Course" + and c.name_clash_id != child.name_clash_id + ) + ) + ] + if len(siblings) > 0: + # if a filename is still duplicate in its directory, we rename + # it by appending a stable per-node key (works for ids and urls). + filename = Path(child.name) + child.name = ( + filename.stem + "_" + child._clash_suffix() + filename.suffix + ) + for s in siblings: + filename = Path(s.name) + s.name = filename.stem + "_" + s._clash_suffix() + filename.suffix + self.children.remove(s) + unclashed_children.extend(siblings) + + self.children = unclashed_children + + for child in self.children: + # recurse whole tree + child.remove_children_nameclashes() diff --git a/syncmymoodle/pathing.py b/syncmymoodle/pathing.py new file mode 100644 index 0000000..6e49f26 --- /dev/null +++ b/syncmymoodle/pathing.py @@ -0,0 +1,65 @@ +import hashlib +import urllib.parse +from pathlib import Path + +from syncmymoodle.node import Node + + +def sanitize_path_part(path: str, invalid_chars: str) -> str: + path = urllib.parse.unquote(path) + path = "".join([s for s in path if s not in invalid_chars]) + while path and path[-1] == " ": + path = path[:-1] + while path and path[0] == " ": + path = path[1:] + + # Folders downloaded from Moodle display amp; in places where an + # ampersand should be displayed instead. In the web UI, however, the + # ampersand is shown correctly, and we're trying to emulate that here. + path = path.replace("amp;", "&") + + return path + + +def get_sanitized_node_path(node: Node, basedir: Path, invalid_chars: str) -> Path: + basedir = basedir.expanduser() + path_segments = [] + for part in node.get_path(): + if part == "": + continue + sanitized = sanitize_path_part(part, invalid_chars) + if sanitized in {"", ".", ".."}: + sanitized = "_" + path_segments.append(sanitized) + + target_path = basedir.joinpath(*path_segments) + resolved_basedir = basedir.resolve(strict=False) + resolved_target = target_path.resolve(strict=False) + if not resolved_target.is_relative_to(resolved_basedir): + raise ValueError(f"Refusing to write outside basedir: {target_path}") + return target_path + + +def make_conflict_path(path: Path) -> Path: + """Return a unique path for storing a locally modified file.""" + suffix = path.suffix + stem = path.stem + + # Derive a short hash from the current contents to make the filename + # stable and recognizable while remaining reasonably unique. + hash_str = "unknown" + try: + with path.open("rb") as f: + digest = hashlib.file_digest(f, "sha1") + hash_str = digest.hexdigest()[:8] + except FileNotFoundError: + hash_str = "missing" + + conflict_path = path.with_name(f"{stem}.syncconflict.{hash_str}{suffix}") + index = 1 + while conflict_path.exists(): + conflict_path = path.with_name( + f"{stem}.syncconflict.{hash_str}.{index}{suffix}" + ) + index += 1 + return conflict_path diff --git a/syncmymoodle/storage.py b/syncmymoodle/storage.py new file mode 100644 index 0000000..ab3e82e --- /dev/null +++ b/syncmymoodle/storage.py @@ -0,0 +1,109 @@ +import gzip +import json +import logging +import os +import tempfile +from pathlib import Path +from typing import Any + +import requests + +logger = logging.getLogger(__name__) + + +def harden_private_file(path: Path, description: str) -> bool: + if not path.exists(): + return True + if path.is_symlink(): + logger.warning("Refusing to use symlinked %s file: %s", description, path) + return False + try: + path.chmod(0o600) + except OSError: + logger.warning( + "Could not restrict permissions for %s file: %s", description, path + ) + return True + + +def write_private_gzip_json(path: Path, payload: Any) -> None: + path = path.expanduser() + path.parent.mkdir(parents=True, exist_ok=True) + + json_bytes = json.dumps(payload, separators=(",", ":")).encode("utf-8") + data = gzip.compress(json_bytes) + + fd, tmp_name = tempfile.mkstemp(prefix=f".{path.name}.", dir=path.parent) + tmp_path = Path(tmp_name) + try: + os.fchmod(fd, 0o600) + with os.fdopen(fd, "wb") as f: + f.write(data) + os.replace(tmp_path, path) + path.chmod(0o600) + finally: + if tmp_path.exists(): + tmp_path.unlink() + + +def read_private_gzip_json(path: Path, description: str) -> Any: + path = path.expanduser() + if not path.exists(): + return None + if not harden_private_file(path, description): + return None + try: + with path.open("rb") as f: + return json.loads(gzip.decompress(f.read()).decode("utf-8")) + except (OSError, gzip.BadGzipFile, UnicodeDecodeError, json.JSONDecodeError): + logger.warning( + "Ignoring legacy or invalid %s file %s. Delete it if this warning repeats.", + description, + path, + ) + return None + + +def cookies_to_data(cookie_jar) -> dict[str, Any]: + cookies = [] + for cookie in cookie_jar: + cookies.append( + { + "name": cookie.name, + "value": cookie.value, + "domain": cookie.domain, + "path": cookie.path, + "secure": cookie.secure, + "expires": cookie.expires, + "rest": getattr(cookie, "_rest", {}), + } + ) + return {"format": "syncmymoodle.cookies.v1", "cookies": cookies} + + +def load_cookies_from_data(cookie_jar, payload: Any) -> None: + if not isinstance(payload, dict): + return + if payload.get("format") != "syncmymoodle.cookies.v1": + logger.warning("Ignoring unsupported cookie file format") + return + + for cookie_data in payload.get("cookies", []): + if not isinstance(cookie_data, dict): + continue + if not cookie_data.get("name"): + continue + cookie = requests.cookies.create_cookie( + name=cookie_data["name"], + value=cookie_data.get("value", ""), + domain=cookie_data.get("domain") or "", + path=cookie_data.get("path") or "/", + secure=bool(cookie_data.get("secure")), + expires=cookie_data.get("expires"), + rest=cookie_data.get("rest") or {}, + ) + cookie_jar.set_cookie(cookie) + + +def save_session_cookies(cookie_file: Path, cookie_jar) -> None: + write_private_gzip_json(cookie_file, cookies_to_data(cookie_jar)) diff --git a/syncmymoodle/totp.py b/syncmymoodle/totp.py new file mode 100644 index 0000000..c660ae8 --- /dev/null +++ b/syncmymoodle/totp.py @@ -0,0 +1,23 @@ +import base64 +import hmac +import struct +import time + +""" +To add TOTP functionality without adding external dependencies. +Code taken from: +https://github.com/susam/mintotp +""" + + +def hotp(key: str, counter: int, digits: int = 6, digest: str = "sha1") -> str: + key_bytes = base64.b32decode(key.upper() + "=" * ((8 - len(key)) % 8)) + counter_bytes = struct.pack(">Q", counter) + mac = hmac.new(key_bytes, counter_bytes, digest).digest() + offset = mac[-1] & 0x0F + binary = struct.unpack(">L", mac[offset : offset + 4])[0] & 0x7FFFFFFF + return str(binary)[-digits:].zfill(digits) + + +def totp(key: str, time_step: int = 30, digits: int = 6, digest: str = "sha1") -> str: + return hotp(key, int(time.time() / time_step), digits, digest) diff --git a/tests/helpers.py b/tests/helpers.py index a802980..48b6dd0 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -8,7 +8,8 @@ from pathlib import Path from typing import Any -from syncmymoodle.__main__ import Node, SyncMyMoodle +from syncmymoodle.app import SyncMyMoodle +from syncmymoodle.node import Node FIXTURES = Path(__file__).parent / "fixtures" SNAPSHOTS = Path(__file__).parent / "snapshots" diff --git a/tests/test_course_prefix_handling.py b/tests/test_course_prefix_handling.py index a01ebef..0df9cec 100644 --- a/tests/test_course_prefix_handling.py +++ b/tests/test_course_prefix_handling.py @@ -1,6 +1,7 @@ import logging -from syncmymoodle.__main__ import Node, SyncMyMoodle +from syncmymoodle.app import SyncMyMoodle +from syncmymoodle.node import Node def format_course_name(handling, name): @@ -37,7 +38,7 @@ def test_non_matching_names_are_preserved(): def test_invalid_mode_preserves_course_name(caplog): - with caplog.at_level(logging.WARNING, logger="syncmymoodle.__main__"): + with caplog.at_level(logging.WARNING, logger="syncmymoodle.app"): assert format_course_name("invalid", "(VO) Analysis") == "(VO) Analysis" assert any(record.levelno == logging.WARNING for record in caplog.records) diff --git a/tests/test_download_behavior.py b/tests/test_download_behavior.py index 98bac6d..23eb93f 100644 --- a/tests/test_download_behavior.py +++ b/tests/test_download_behavior.py @@ -1,7 +1,7 @@ import hashlib import os -from syncmymoodle.__main__ import Node +from syncmymoodle.node import Node from .helpers import FakeResponse, FakeSession, build_single_file_tree, make_syncer diff --git a/tests/test_storage_safety.py b/tests/test_storage_safety.py index ea6f02f..25e1994 100644 --- a/tests/test_storage_safety.py +++ b/tests/test_storage_safety.py @@ -2,7 +2,8 @@ import json import stat -from syncmymoodle.__main__ import Node +from syncmymoodle.node import Node +from syncmymoodle.storage import read_private_gzip_json, write_private_gzip_json from .helpers import FakeSession, make_syncer @@ -19,10 +20,9 @@ def test_sanitized_node_path_stays_inside_basedir(tmp_path): def test_private_gzip_json_roundtrip_uses_private_permissions(tmp_path): - syncer = make_syncer() target = tmp_path / "session" - syncer._write_private_gzip_json(target, {"format": "test", "value": 1}) + write_private_gzip_json(target, {"format": "test", "value": 1}) assert stat.S_IMODE(target.stat().st_mode) == 0o600 with target.open("rb") as handle: @@ -30,7 +30,7 @@ def test_private_gzip_json_roundtrip_uses_private_permissions(tmp_path): "format": "test", "value": 1, } - assert syncer._read_private_gzip_json(target, "test data") == { + assert read_private_gzip_json(target, "test data") == { "format": "test", "value": 1, } diff --git a/tests/test_sync_fixtures.py b/tests/test_sync_fixtures.py index d6b6815..3cdc1e0 100644 --- a/tests/test_sync_fixtures.py +++ b/tests/test_sync_fixtures.py @@ -1,4 +1,4 @@ -from syncmymoodle.__main__ import Node +from syncmymoodle.node import Node from .helpers import ( FakeResponse,